Version 6 (#67)

# Version 6

* Bump to pysnmp v7.1 with pysnmp-sync-adapter and legacy_wrappers.
* Using parallel queries
* Manage multiple OID per PDU
* Bump to version 6 and rename snmp_mib() to get_snmp_values()
* Using cluster_varbinds
* Epson L3260 printer is not compatible
* Use fetch_snmp_values()
* Add temporary reset of waste ink level
* Fix bugs and improve documentation
* Refactor D4 commands
This commit is contained in:
Ircama
2025-06-01 17:18:19 +02:00
committed by GitHub
parent 7d6077d6cd
commit 2703b3ccdc
6 changed files with 641 additions and 224 deletions

View File

@@ -32,9 +32,8 @@ COPY . .
RUN pip install --no-cache-dir \ RUN pip install --no-cache-dir \
pyyaml \ pyyaml \
pyasn1==0.4.8 \ pysnmp \
git+https://github.com/etingof/pysnmp.git@master#egg=pysnmp \ pysnmp_sync_adapter \
pyasyncore \
tkcalendar \ tkcalendar \
pyperclip \ pyperclip \
black \ black \

View File

@@ -20,18 +20,33 @@ The software also includes a configurable printer dictionary, which can be easil
start with `@BDC [SP] ST2 [CR] [LF]` ...). @BDC ST2 is used to convey various aspects of the status of the printer, such as errors, paper status, ink and more. The element fields of this format may vary depending on the printer model. The *Epson Printer Configuration Tool* can decode all element fields found in publicly available Epson Programming Manuals of various printer models (a relevant subset of fields used by the Epson printers). start with `@BDC [SP] ST2 [CR] [LF]` ...). @BDC ST2 is used to convey various aspects of the status of the printer, such as errors, paper status, ink and more. The element fields of this format may vary depending on the printer model. The *Epson Printer Configuration Tool* can decode all element fields found in publicly available Epson Programming Manuals of various printer models (a relevant subset of fields used by the Epson printers).
- __Advanced Maintenance Functions__: - __Advanced Maintenance Functions__:
- Open the Web interface of the printer (via the default browser). - Open the Web interface of the printer (via the default browser).
- Temporary reset of the ink waste counter.
The ink waste counters track the amount of ink discarded during maintenance tasks to prevent overflow in the waste ink pads. Once the counters indicate that one of the printer pads is full, the printer will stop working to avoid potential damage or ink spills. The "Printer status" button includes information showing the levels of the waste ink tanks; specifically, two sections are relevant: "Maintenance box information" ("maintenance_box_...") and "Waste Ink Levels" ("waste_ink_levels"). The former has a counter associated for each tank, which indicates the number of temporary resets performed by the user to temporarily restore a disabled printer.
The feature to temporarily reset the ink waste counter is effective if the Maintenance box information reports that the Maintenance Box is full; it temporarily bypasses the ink waste tank full warning, which would otherwise disable printing. It is important to know that this setting is reset upon printer reboot (it does not affect the EEPROM) and can be repeated. Each time the Maintenance box status switches from "full" to "not full", the "ink replacement cleaning counter" is increased. A pad maintenance or tank replacement has to be programmed meanwhile.
- Reset the ink waste counter. - Reset the ink waste counter.
The ink waste counters track the amount of ink discarded during maintenance tasks to prevent overflow in the waste ink pads. Once the counters indicate that one of the printer pads is full, the printer will stop working to avoid potential damage or ink spills. Resetting the ink waste counter extends the printer operation while a pad maintenance or tank replacement is programmed (operation that shall necessarily be pefromed). This feature permanently resets the ink waste counter.
Resetting the ink waste counter extends the printer operation while a physical pad maintenance or tank replacement is programmed (operation that shall necessarily be pefromed).
- Adjust the power-off timer (for energy efficiency). - Adjust the power-off timer (for energy efficiency).
- Change the _First TI Received Time_, - Change the _First TI Received Time_,
The *First TI Received Time* in Epson printers typically refers to the timestamp of the first transmission instruction to the printer. This feature tracks when the printer first operated. The *First TI Received Time* in Epson printers typically refers to the timestamp of the first transmission instruction to the printer. This feature tracks when the printer first operated.
- Change the printer WiFi MAC address and the printer serial number (typically used in specialized scenarios where specific device identifiers are required). - Change the printer WiFi MAC address and the printer serial number (typically used in specialized scenarios where specific device identifiers are required).
- Read and write to EEPROM addresses. - Read and write to EEPROM addresses.
- Dump and analyze sets of EEPROM addresses. - Dump and analyze sets of EEPROM addresses.
- Detect the access key (*read_key* and *write_key*) and some attributes of the printer configuration. - Detect the access key (*read_key* and *write_key*) and some attributes of the printer configuration.
The GUI includes some features that attempt to detect the attributes of an Epson printer whose model is not included in the configuration; such features can also be used with known printers, to detect additional parameters. The GUI includes some features that attempt to detect the attributes of an Epson printer whose model is not included in the configuration; such features can also be used with known printers, to detect additional parameters.
@@ -84,12 +99,7 @@ cd epson_print_conf
pip install -r requirements.txt pip install -r requirements.txt
``` ```
Notes (at the time of writing): This program exploits [pysnmp v7+](https://github.com/lextudio/pysnmp) and [pysnmp-sync-adapter](https://github.com/Ircama/pysnmp-sync-adapter).
- [before pysnmp, install pyasn1 with version 0.4.8 and not 0.5](https://github.com/etingof/pysnmp/issues/440#issuecomment-1544341598)
- [pull pysnmp from the GitHub master branch, not from PyPI](https://stackoverflow.com/questions/54868134/snmp-reading-from-an-oid-with-three-libraries-gives-different-execution-times#comment96532761_54869361)
This program exploits [pysnmp](https://github.com/etingof/pysnmp), basing on the related [documentation](https://pysnmp.readthedocs.io/).
It is tested with Ubuntu / Windows Subsystem for Linux, Windows. It is tested with Ubuntu / Windows Subsystem for Linux, Windows.
@@ -188,10 +198,12 @@ For the following models there is no known way to read the EEPROM via SNMP proto
- [ET-2850, ET-2851, ET-2853, ET-2855, ET-2856](https://github.com/Ircama/epson_print_conf/issues/26) - [ET-2850, ET-2851, ET-2853, ET-2855, ET-2856](https://github.com/Ircama/epson_print_conf/issues/26)
- [ET-4800](https://github.com/Ircama/epson_print_conf/issues/29) with new firmware (older firmware might work) - [ET-4800](https://github.com/Ircama/epson_print_conf/issues/29) with new firmware (older firmware might work)
- [L3250](https://github.com/Ircama/epson_print_conf/issues/35) - [L3250](https://github.com/Ircama/epson_print_conf/issues/35)
- [L3260](https://github.com/Ircama/epson_print_conf/issues/66) with firmware version 05.23.XE21P2
- [L18050](https://github.com/Ircama/epson_print_conf/issues/47) - [L18050](https://github.com/Ircama/epson_print_conf/issues/47)
- [EcoTank ET-2862 with firmware 05.18.XF12OB dated 12/11/2024](https://github.com/Ircama/epson_print_conf/discussions/58) and possibly ET-2860 / 2861 / 2863 / 2865 series. - [EcoTank ET-2862 with firmware 05.18.XF12OB dated 12/11/2024](https://github.com/Ircama/epson_print_conf/discussions/58) and possibly ET-2860 / 2861 / 2863 / 2865 series.
- [XP-2200 with firmware 06.58.IU05P2](https://github.com/Ircama/epson_print_conf/issues/51)
For model XP-2200, check https://github.com/Ircama/epson_print_conf/issues/51 The button "Temporary Reset Waste Ink Levels" should still work with these printers.
### Using the command-line tool ### Using the command-line tool
@@ -475,7 +487,7 @@ Generic query of the status of the printer (regardless of the model):
from epson_print_conf import EpsonPrinter from epson_print_conf import EpsonPrinter
import pprint import pprint
printer = EpsonPrinter(hostname="192.168.1.87") printer = EpsonPrinter(hostname="192.168.1.87")
pprint.pprint(printer.status_parser(printer.snmp_mib("1.3.6.1.4.1.1248.1.2.2.1.1.1.4.1")[1])) pprint.pprint(printer.status_parser(printer.fetch_snmp_values("1.3.6.1.4.1.1248.1.2.2.1.1.1.4.1")[1]))
``` ```
### Byte sequences ### Byte sequences

View File

@@ -13,6 +13,7 @@ Compared libraries and architectures:
- [pysnmp v5.1](https://github.com/lextudio/pysnmp/) synchronous mode, - [pysnmp v5.1](https://github.com/lextudio/pysnmp/) synchronous mode,
- [pysnmp v7.1](https://github.com/lextudio/pysnmp/) asynchronous mode, - [pysnmp v7.1](https://github.com/lextudio/pysnmp/) asynchronous mode,
- [pysnmp v7.1 with pysnmp-sync-adapter](https://github.com/Ircama/pysnmp-sync-adapter) synchronous wrapper, - [pysnmp v7.1 with pysnmp-sync-adapter](https://github.com/Ircama/pysnmp-sync-adapter) synchronous wrapper,
- [pysnmp v7.1 with pysnmp-sync-adapter and cluster_varbinds](https://github.com/Ircama/pysnmp-sync-adapter#cluster_varbinds) for highest performances,
- raw socket SNMPv1 implementation (synchronous mode). - raw socket SNMPv1 implementation (synchronous mode).
- Pure python implementation using the [asn1](https://github.com/andrivet/python-asn1) and the default socket libraries. - Pure python implementation using the [asn1](https://github.com/andrivet/python-asn1) and the default socket libraries.
- [py-snmp-sync](https://github.com/Ircama/py-snmp-sync) synchronous client implemented over PySNMP. - [py-snmp-sync](https://github.com/Ircama/py-snmp-sync) synchronous client implemented over PySNMP.
@@ -36,15 +37,17 @@ To mitigate this, several approaches have been explored:
* [`pysnmp-sync-adapter`](https://github.com/Ircama/pysnmp-sync-adapter): a lightweight compatibility layer wrapping `pysnmp.hlapi.v1arch.asyncio` and `pysnmp.hlapi.v3arch.asyncio` with blocking equivalents (e.g., `get_cmd_sync`). It reuses the asyncio event loop and transport targets, avoiding per-call overhead and achieving optimal performance while maintaining a synchronous API. * [`pysnmp-sync-adapter`](https://github.com/Ircama/pysnmp-sync-adapter): a lightweight compatibility layer wrapping `pysnmp.hlapi.v1arch.asyncio` and `pysnmp.hlapi.v3arch.asyncio` with blocking equivalents (e.g., `get_cmd_sync`). It reuses the asyncio event loop and transport targets, avoiding per-call overhead and achieving optimal performance while maintaining a synchronous API.
* [`py-snmp-sync`](https://github.com/Ircama/py-snmp-sync): offers even better performance by bypassing the asyncio-based API entirely. Instead, it directly uses the lower-level shared components of `pysnmp` that support both sync and async execution. It implements a custom `SyncUdpTransportTarget` based on raw sockets. However, it currently supports only a specialized form of `get_cmd`, limiting general HLAPI compatibility. * [`py-snmp-sync`](https://github.com/Ircama/py-snmp-sync): offers high performance by bypassing the asyncio-based API entirely. Instead, it directly uses the lower-level shared components of `pysnmp` that support both sync and async execution. It implements a custom `SyncUdpTransportTarget` based on raw sockets. However, it currently supports only a specialized form of `get_cmd`, limiting general HLAPI compatibility.
* A separate low-level implementation using ASN.1 and sockets directly is also tested. This approach shows excellent performance for the `get_cmd` request/response pattern but is significantly more complex to maintain and does not support the full SNMP operation set. * A separate low-level implementation using ASN.1 and sockets directly is also tested. This approach shows excellent performance for the `get_cmd` request/response pattern but is significantly more complex to maintain and does not support the full SNMP operation set.
Each approach offers trade-offs between generality, maintainability, and performance. For applications requiring full HLAPI compatibility with minimal refactoring, `pysnmp-sync-adapter` is a practical and efficient choice. For tightly optimized use cases the raw variants can provide superior throughput. Each approach offers trade-offs between generality, maintainability, and performance. For applications requiring full HLAPI compatibility with minimal refactoring, `pysnmp-sync-adapter` is a practical and efficient choice. For tightly optimized use cases the raw variants can provide superior throughput.
Optimal performance is achieved using the `cluster_varbinds` utility from `pysnmp-sync-adapter`, which provides possibly the simplest synchronous interface and includes optimized parallel processing which wraps `asyncio` under the hood.
--- ---
## Code used for the benchmarks ## Code used for the benchmarks and results
### Usage of https://github.com/etingof/pysnmp ### Usage of https://github.com/etingof/pysnmp
@@ -104,8 +107,8 @@ if __name__ == '__main__':
# Usage of https://github.com/pysnmp/pysnmp # Usage of https://github.com/pysnmp/pysnmp
# pip uninstall pysnmp # pip uninstall pysnmp
# pip uninstall pysnmplib
# pip install pyasn1==0.4.8 # pip install pyasn1==0.4.8
# pip install pysnmplib
# Alternative working library: pip install pysnmp==5.1.0 (https://docs.lextudio.com/snmp/) # Alternative working library: pip install pysnmp==5.1.0 (https://docs.lextudio.com/snmp/)
@@ -155,7 +158,7 @@ if __name__ == '__main__':
```python ```python
# Usage of https://github.com/lextudio/pysnmp 7.1 # Usage of https://github.com/lextudio/pysnmp 7.1
# Simulate sync behaviour in an extremely inefficient and slow mode # Simulate sync behaviour via asyncio.run() (extremely inefficient and slow mode)
# pip uninstall pysnmplib # pip uninstall pysnmplib
# pip uninstall pyasn1==0.4.8 # pip uninstall pyasn1==0.4.8
@@ -217,7 +220,7 @@ if __name__ == '__main__':
```python ```python
# Usage of https://github.com/lextudio/pysnmp # Usage of https://github.com/lextudio/pysnmp
# Same as mytest4, but single ObjectType in get_cmd # Using asyncio.gather() for 100 asynch tasks. Single ObjectType in get_cmd
# pip uninstall pysnmplib # pip uninstall pysnmplib
# pip uninstall pysnmp # git+https://github.com/etingof/pysnmp.git@master#egg=pysnmp # pip uninstall pysnmp # git+https://github.com/etingof/pysnmp.git@master#egg=pysnmp
@@ -285,7 +288,8 @@ if __name__ == '__main__':
```python ```python
# Usage of https://github.com/lextudio/pysnmp # Usage of https://github.com/lextudio/pysnmp
# Max performance (same as mytest3, but multiple ObjectType in get_cmd) # Multiple ObjectType in get_cmd
# Using asyncio.gather() for 10 asynch tasks, each including a PDU of 10 OIDs.
# pip uninstall pysnmplib # pip uninstall pysnmplib
# pip uninstall pysnmp # git+https://github.com/etingof/pysnmp.git@master#egg=pysnmp # pip uninstall pysnmp # git+https://github.com/etingof/pysnmp.git@master#egg=pysnmp
@@ -430,6 +434,77 @@ if __name__ == '__main__':
# --- 1.234 seconds --- # --- 1.234 seconds ---
``` ```
### https://github.com/Ircama/pysnmp-sync-adapter#cluster_varbinds over PySNMP.
This simple approach offers the best performances among all tests.
```python
# pip uninstall pysnmplib
# pip install pysnmp-sync-adapter
import sys
import time
import asyncio
import platform
from pysnmp.hlapi.v1arch.asyncio import *
from pysnmp_sync_adapter import (
parallel_get_sync, create_transport, cluster_varbinds
)
def main():
if len(sys.argv) < 2:
print("Usage: python script.py <host>")
sys.exit(1)
if platform.system()=='Windows':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
host = sys.argv[1]
oid_str = '1.3.6.1.2.1.25.3.2.1.3.1'
community = 'public'
# Pre-create the engine once
dispatcher = SnmpDispatcher()
# Pre-create the transport once
transport = create_transport(UdpTransportTarget, (host, 161), timeout=1)
# Pre-create CommunityData once
auth_data = CommunityData(community, mpModel=0)
oid_t = ObjectType(ObjectIdentity(oid_str))
# Create 100 queries using optimized PDU composition
wrapped_queries = [ObjectType(ObjectIdentity(oid_str)) for _ in range(100)]
wrapped_queries = cluster_varbinds(wrapped_queries, max_per_pdu=10)
start = time.time()
for error_ind, error_status, error_index, var_binds in parallel_get_sync(
dispatcher,
auth_data,
transport,
queries=wrapped_queries
):
if error_ind:
print(f"SNMP error: {error_ind}")
quit()
elif error_status:
print(f'{error_status.prettyPrint()} at {error_index and var_binds[int(error_index) - 1][0] or "?"}')
quit()
else:
for oid, val in var_binds:
print(val.prettyPrint())
print(f"--- {time.time() - start:.3f} seconds ---")
if __name__ == '__main__':
main()
# --- 0.410 seconds ---
# --- 0.424 seconds ---
# --- 0.360 seconds ---
# --- 0.423 seconds ---
```
### Usage of the oneliner package, being deprecated in newer versions of pysnmp ### Usage of the oneliner package, being deprecated in newer versions of pysnmp
```python ```python

View File

@@ -6,8 +6,9 @@ Epson Printer Configuration via SNMP (TCP/IP)
""" """
import itertools import itertools
from itertools import chain
import re import re
from typing import Any, List from typing import Any, List, Tuple, Union
import datetime import datetime
import time import time
import textwrap import textwrap
@@ -17,19 +18,19 @@ import os
import yaml import yaml
from pathlib import Path from pathlib import Path
import pickle import pickle
import abc
import hashlib
import struct
# The pysnmp module uses functionality from importlib.util and from pysnmp.hlapi.v1arch.asyncio import *
# importlib.machinery, which were seperated from the importlib module
# in python>=3.11
try:
import importlib.util
import importlib.machinery
except ImportError:
pass
from pysnmp.hlapi.v1arch import * # this imports UdpTransportTarget
from pyasn1.type.univ import OctetString as OctetStringType from pyasn1.type.univ import OctetString as OctetStringType
from itertools import chain from pysnmp_sync_adapter import (
get_cmd_sync,
parallel_get_sync,
create_transport,
cluster_varbinds
)
from pysnmp.proto.errind import RequestTimedOut
class EpsonPrinter: class EpsonPrinter:
@@ -526,8 +527,8 @@ class EpsonPrinter:
"write_key": b'Wakatobi', "write_key": b'Wakatobi',
"printer_head_id_h": range(122, 126), "printer_head_id_h": range(122, 126),
"printer_head_id_f": [129], "printer_head_id_f": [129],
"main_waste": {"oids": [24, 25, 30], "divider": 69}, "main_waste": {"oids": [24, 25], "divider": 69},
"borderless_waste": {"oids": [26, 27, 34], "divider": 32.53}, "borderless_waste": {"oids": [26, 27], "divider": 32.53},
"serial_number": range(192, 202), "serial_number": range(192, 202),
"stats": { "stats": {
"Manual cleaning counter": [147], "Manual cleaning counter": [147],
@@ -542,10 +543,12 @@ class EpsonPrinter:
"Power off timer": [359, 358], "Power off timer": [359, 358],
}, },
"raw_waste_reset": { "raw_waste_reset": {
24: 0, 25: 0, 30: 0, # Data of 1st counter 24: 0, 25: 0, # Data of 1st waste ink level
28: 0, 29: 0, # another store of 1st counter 30: 0, # First maintenance box reset counter
28: 0, 29: 0, # another store of 1st waste ink level
46: 94, # Maintenance required level of 1st counter 46: 94, # Maintenance required level of 1st counter
26: 0, 27: 0, 34: 0, # Data of 2nd counter 26: 0, 27: 0, # Data of 2nd waste ink level
34: 0, # Second maintenance box reset counter
47: 94, # Maintenance required level of 2st counter 47: 94, # Maintenance required level of 2st counter
49: 0 # ? 49: 0 # ?
}, },
@@ -811,6 +814,36 @@ class EpsonPrinter:
"serial_number": range(1604, 1614), "serial_number": range(1604, 1614),
"alias": ["XP-2100", "XP-2151", "XP-2155"], "alias": ["XP-2100", "XP-2151", "XP-2155"],
}, },
"XP-2200": { # 06.51.IU19M506.58.IU05P2
"read_key": [75, 54],
"write_key": b"Kenjeran",
"main_waste": {"oids": [337, 338, 336], "divider": 69.0},
"borderless_waste": {"oids": [339, 340, 336], "divider": 30.49},
"raw_waste_reset": {
336: 0, 337: 0, 338: 0, 339: 0, 340: 0, 341: 0, 343: 94,
342: 0, 344: 94, 28: 0
},
"stats": {
"First TI received time": [9, 8],
"Manual cleaning counter": [203],
"Timer cleaning counter": [205],
"Total print pass counter": [133, 132, 131, 130],
"Total scan counter": [1843, 1842, 1841, 1840],
"Total print page counter": [792, 791, 790, 789],
"Ink replacement counter - Black": [554],
"Ink replacement counter - Cyan": [555],
"Ink replacement counter - Magenta": [556],
"Ink replacement counter - Yellow": [557],
"Maintenance required level of 1st waste ink counter": [343],
"Maintenance required level of 2nd waste ink counter": [344],
"Power off timer 1": [230, 229],
"Power off timer 2": [231, 230],
"Power off timer 3": [262, 261],
},
"serial_number": range(1604, 1614),
"wifi_mac_address": range(1920, 1926),
"alias": ["XP-2205"],
},
"ET-2500": { "ET-2500": {
"read_key": [68, 1], "read_key": [68, 1],
"write_key": b"Gerbera*", "write_key": b"Gerbera*",
@@ -899,7 +932,7 @@ class EpsonPrinter:
MIB_OID_ENTERPRISE = "1.3.6.1.4.1" MIB_OID_ENTERPRISE = "1.3.6.1.4.1"
MIB_EPSON = MIB_OID_ENTERPRISE + ".1248" MIB_EPSON = MIB_OID_ENTERPRISE + ".1248"
OID_PRV_CTRL = "1.2.2.44.1.1.2" OID_PRV_CTRL = "1.2.2.44.1.1.2"
EEPROM_LINK = f'{MIB_EPSON}.{OID_PRV_CTRL}.1' D4_TO_OID = f'{MIB_EPSON}.{OID_PRV_CTRL}.1'
MIB_INFO = { MIB_INFO = {
"Model": f"{MIB_MGMT}.1.25.3.2.1.3.1", "Model": f"{MIB_MGMT}.1.25.3.2.1.3.1",
@@ -929,11 +962,12 @@ class EpsonPrinter:
"IP Address": f"{MIB_EPSON}.1.1.3.1.4.19.1.3.1", "IP Address": f"{MIB_EPSON}.1.1.3.1.4.19.1.3.1",
"IPP_URL_path": f"{MIB_EPSON}.1.1.3.1.4.19.1.4.1", "IPP_URL_path": f"{MIB_EPSON}.1.1.3.1.4.19.1.4.1",
"IPP_URL": f"{MIB_EPSON}.1.1.3.1.4.46.1.2.1", "IPP_URL": f"{MIB_EPSON}.1.1.3.1.4.46.1.2.1",
"LPR_URL": "1.3.6.1.4.1.2699.1.2.1.3.1.1.4.1.1",
"Driver": "1.3.6.1.4.1.1248.1.1.3.1.29.3.1.27.0",
"WiFi": f"{MIB_EPSON}.1.1.3.1.29.2.1.9.0", "WiFi": f"{MIB_EPSON}.1.1.3.1.29.2.1.9.0",
"MAC Addr": f"{MIB_EPSON}.1.1.3.1.1.5.0", "MAC Addr": f"{MIB_EPSON}.1.1.3.1.1.5.0",
"device_id": f"{MIB_OID_ENTERPRISE}.11.2.3.9.1.1.7.0", "device_id": f"{MIB_OID_ENTERPRISE}.11.2.3.9.1.1.7.0",
"Epson device id": f"{MIB_EPSON}.1.2.2.1.1.1.1.1", "Epson device id": f"{MIB_EPSON}.1.2.2.1.1.1.1.1",
"Power Off Timer": f"{EEPROM_LINK}.111.116.2.0.1.1"
} }
MIB_INFO_ADVANCED = { MIB_INFO_ADVANCED = {
@@ -1000,6 +1034,9 @@ class EpsonPrinter:
] ]
if not values['alias']: if not values['alias']:
del values['alias'] del values['alias']
self.MIB_INFO["Power Off Timer"] = self.epctrl_snmp_oid(
"ot", b"\x01\x01"
) # ".111.116.2.0.1.1" (off timer)
self.model = model self.model = model
self.hostname = hostname self.hostname = hostname
self.port = port self.port = port
@@ -1106,8 +1143,10 @@ class EpsonPrinter:
logging.info(f"No value for method '{method}'.") logging.info(f"No value for method '{method}'.")
return stat_set return stat_set
def caesar(self, key, hex=False): def caesar(self, key, hex=False, list=False):
"""Convert the string write key to a sequence of numbers""" """Convert the string write key to a sequence of numbers"""
if list:
return [ 0 if b == 0 else b + 1 for b in key ]
if hex: if hex:
return " ".join( return " ".join(
'00' if b == 0 else '{0:02x}'.format(b + 1) for b in key '00' if b == 0 else '{0:02x}'.format(b + 1) for b in key
@@ -1146,14 +1185,14 @@ class EpsonPrinter:
return None return None
if 'read_key' not in self.parm: if 'read_key' not in self.parm:
return None return None
return ( return self.epctrl_snmp_oid(
f"{self.EEPROM_LINK}" "||", # (7C 7C); "||" stands for EEPROM
".124.124" # || (7C 7C) [
".7.0" # read (07 00) self.parm['read_key'][0],
f".{self.parm['read_key'][0]}" self.parm['read_key'][1],
f".{self.parm['read_key'][1]}" 65, 190, 160, # (read)
".65.190.160" oid, msb
f".{oid}.{msb}" ]
) )
def eeprom_oid_write_address( def eeprom_oid_write_address(
@@ -1180,15 +1219,14 @@ class EpsonPrinter:
'write_key' not in self.parm 'write_key' not in self.parm
or 'read_key' not in self.parm): or 'read_key' not in self.parm):
return None return None
write_op = ( write_op = self.epctrl_snmp_oid(
f"{self.EEPROM_LINK}" "||", # (7C 7C); "||" stands for EEPROM
".124.124" # || 7C 7C [
".16.0" # write (10 00) self.parm['read_key'][0],
f".{self.parm['read_key'][0]}" self.parm['read_key'][1],
f".{self.parm['read_key'][1]}" 66, 189, 33, # 42 BD 21 (write)
".66.189.33" # 42 BD 21 oid, msb, value
f".{oid}.{msb}.{value}" ] + self.caesar(self.parm['write_key'], list=True)
f".{self.caesar(self.parm['write_key'])}"
) )
if self.dry_run: if self.dry_run:
logging.warning("WRITE_DRY_RUN: %s", write_op) logging.warning("WRITE_DRY_RUN: %s", write_op)
@@ -1196,84 +1234,172 @@ class EpsonPrinter:
else: else:
return write_op return write_op
def snmp_mib(self, mib: str, label: str = "unknown") -> (str, Any): def fetch_oid_values(
"""Generic SNMP query, returning value of a MIB.""" self,
oid: Union[str, List[Union[str, List[str]]]],
label: str = "unknown"
) -> Union[
Tuple[str, Any],
List[Tuple[str, Any]]
]:
"""
Query one or more OIDs and return their values.
- If oid is a single string, returns [(type_name, value)].
- If oid is a list of strings or list-of-lists, returns a list of
(type_name, value) in the same order.
Lists of strings are grouped into a single PDU; top-level list runs
in parallel using parallel_get_sync.
"""
# Configfile overrides
if self.mib_dict: if self.mib_dict:
if mib not in self.mib_dict: # singleOID case only
if isinstance(oid, str):
if oid not in self.mib_dict:
logging.error( logging.error(
"MIB '%s' not valued in the configuration file. " "MIB '%s' not in config. Operation: %s", oid, label
"Operation: %s",
mib,
label
) )
return None, False return None, False
return self.mib_dict[mib] return self.mib_dict[oid]
else:
# list case: map through dict
results = []
for element in oid:
if isinstance(element, str):
if element not in self.mib_dict:
logging.error(
"MIB '%s' missing in config. Operation: %s",
element, label
)
results.append((None, False))
else:
results.append(self.mib_dict[element])
else:
# inner list grouping not supported by config
results.append((None, False))
return results
# Build or reuse SNMP network config
if not self.hostname: if not self.hostname:
return None, False return None, False
if (
self.hostname, self.port, self.timeout, self.retries net_val = (self.hostname, self.port, self.timeout, self.retries)
) != self.used_net_val: if net_val != self.used_net_val:
try: try:
self.snmp_conf = ( self.snmp_conf = (
SnmpDispatcher(), SnmpDispatcher(),
CommunityData('public', mpModel=0), CommunityData("public", mpModel=0),
UdpTransportTarget( create_transport(
(self.hostname, self.port, self.timeout, self.retries) UdpTransportTarget,
(self.hostname, self.port),
timeout=self.timeout, retries=self.retries
) )
) )
except Exception as e: except Exception as e:
logging.critical("snmp_mib invalid address: %s", e) logging.critical("fetch_oid_values invalid address: %s", e)
self.used_net_val = () self.used_net_val = ()
return None, False return None, False
self.used_net_val = (
self.hostname, self.port, self.timeout, self.retries self.used_net_val = net_val
)
if not self.snmp_conf: if not self.snmp_conf:
return None, False return None, False
iterator = getCmd(*self.snmp_conf, (mib, None))
for response in iterator: # SNMP lookup
errorIndication, errorStatus, errorIndex, varBinds = response def _single_lookup(single_oid: str) -> Tuple[str, Any]:
if errorIndication: """
logging.info( Internal helper to perform one get_cmd_sync.
"snmp_mib error: %s. MIB: %s. Operation: %s", """
errorIndication, mib, label engine, auth, transport = self.snmp_conf
errorInd, errorStat, errorIdx, varBinds = get_cmd_sync(
engine, auth, transport,
ObjectType(ObjectIdentity(single_oid)),
timeout=self.timeout
) )
if " timed out" in errorIndication: # transport-level timeout?
raise TimeoutError(errorIndication) if isinstance(errorInd, RequestTimedOut):
raise TimeoutError(errorInd)
elif errorInd is not None:
logging.info("fetch_oid_values error: %s. OID: %s. Label: %s",
errorInd, single_oid, label)
return None, False return None, False
elif errorStatus:
# SNMP-level errorStatus
if int(errorStat) != 0:
# find offending OID
bad_oid = varBinds[int(errorIdx) - 1][0] if errorIdx else "?"
logging.info( logging.info(
'snmp_mib PDU error: %s at %s. MIB: %s. Operation: %s', "fetch_oid_values PDU error: %s at %s. OID: %s. Label: %s",
errorStatus.prettyPrint(), errorStat.prettyPrint(), bad_oid, single_oid, label
errorIndex and varBinds[int(errorIndex) - 1][0] or '?',
mib,
label
) )
return None, False return None, False
# unpack the varBinds
final = []
for oid_name, val in varBinds:
if isinstance(val, OctetStringType):
final.append((val.__class__.__name__, val.asOctets()))
else: else:
for varBind in varBinds: final.append((val.__class__.__name__, val.prettyPrint()))
if isinstance(varBind[1], OctetStringType):
return( return final
varBind[1].__class__.__name__,
varBind[1].asOctets() # Dispatch single vs batch
) if isinstance(oid, str):
return _single_lookup(oid)
# list of queries
# normalize list elements → either str or [str,...]
queries = []
for elt in oid:
if isinstance(elt, str):
queries.append([elt]) # singleOID PDU
elif isinstance(elt, (list, tuple)):
queries.append(list(elt)) # groupedOID PDU
else: else:
return( queries.append([])
varBind[1].__class__.__name__,
varBind[1].prettyPrint() # run parallel_get_sync: each inner list packs into one PDU, all run in parallel
engine, auth, transport = self.snmp_conf
# build ObjectType lists
wrapped_queries = [
[ ObjectType(ObjectIdentity(x)) for x in group ]
for group in queries
]
wrapped_queries = cluster_varbinds(wrapped_queries, max_per_pdu=3)
raw_results = parallel_get_sync(
engine,
auth,
transport,
queries=wrapped_queries,
max_parallel=5
) )
logging.info(
"snmp_mib value error: invalid multiple data. " # raw_results is a list of SNMP tuples; map them through the same extraction logic
"MIB: %s. Operation: %s", final = []
mib, for (errI, errS, errX, vbs) in raw_results:
label # transport-level timeout?
) if isinstance(errI, RequestTimedOut):
return None, False raise TimeoutError(errI)
logging.info(
"snmp_mib value error: invalid data. MIB: %s. Operation: %s", # SNMP errorStatus?
label if errI is not None or int(errS) != 0:
) # on error we dont know how many OIDs were in this PDU,
return None, False # but we do know len(vbs), so record a failure for each
final.extend([(None, False)] * len(vbs))
continue
# unpack each var-bind in this PDU, in order
for obj in vbs:
# obj is an ObjectType; obj[1] is the value
val = obj[1]
if isinstance(val, OctetStringType):
final.append((val.__class__.__name__, val.asOctets()))
else:
final.append((val.__class__.__name__, val.prettyPrint()))
return final
def invalid_response(self, response): def invalid_response(self, response):
if response is False: if response is False:
@@ -1282,54 +1408,98 @@ class EpsonPrinter:
def read_eeprom( def read_eeprom(
self, self,
oid: int, oid: Union[int, str, List[Union[int,str]]],
label: str = "unknown method") -> str: label: str = "unknown method"
"""Read a single byte from the Epson EEPROM address 'oid'.""" ) -> Union[str, List[Union[str,None]]]:
logging.debug( """
f"EEPROM_DUMP {label}:\n" Read one or more EEPROM bytes at the given OID(s).
f" ADDRESS: "
f"{self.eeprom_oid_read_address(oid, label=label)}\n" - Single int/str → returns the two-hex-digit string or None.
f" OID: {oid}={hex(oid)}" - List of int/str → returns a list of those strings/None, in order.
) """
tag, response = self.snmp_mib( def _process_response(
self.eeprom_oid_read_address(oid, label=label), label=label tag: Any, response: Any, oid_val: int
) ) -> Union[str, None]:
if not response: """Extract and validate the 'EE:xxxxxx' payload for one response."""
if not response or self.invalid_response(response):
logging.error("Invalid response for OID %s (%s): %r", oid_val, label, response)
return None return None
if self.invalid_response(response):
logging.error( # find the EE:xxxxxx substring
f"Invalid response: '%s' for oid %s (%s)",
repr(response), oid, label
)
return None
logging.debug(" TAG: %s\n RESPONSE: %s", tag, repr(response))
try: try:
response = re.findall( txt = response.decode() if isinstance(
r"EE:[0-9a-fA-F]{6}", response.decode())[0][3:] response, (bytes, bytearray)
except (TypeError, IndexError): ) else response
logging.info(f"Invalid read key.") match = re.search(r"EE:([0-9A-Fa-f]{6})", txt)
return None payload = match.group(1)
chk_addr = response[0:4] except Exception:
value = response[4:6] logging.info(
if int(chk_addr, 16) != oid: "Invalid read key for OID %s (%s)", oid_val, label
raise ValueError(
f"Address and response address are"
f" not equal: {oid} != {chk_addr}"
) )
return value return None
# split into address + value
addr_hex, val_hex = payload[:4], payload[4:]
if int(addr_hex, 16) != oid_val:
logging.critical(
"EEPROM address mismatch: expected %04x != returned %s; %s",
oid_val, addr_hex, label
)
return None
return val_hex.upper()
# Build the address for SNMP
def _addr(o):
return self.eeprom_oid_read_address(o, label=label)
# Call fetch_oid_values (single or batch)
resp = self.fetch_oid_values(
_addr(oid) if not isinstance(oid, list) else [
_addr(o) for o in oid
],
label=label
)
# resp is a list of (tag, response)
if isinstance(oid, int):
tag, response = resp[0]
return _process_response(tag, response, oid)
results: List[Union[str,None]] = []
for o, entry in zip(oid, resp):
tag, response = entry
results.append(_process_response(tag, response, int(o)))
return results
def read_eeprom_many( def read_eeprom_many(
self, self,
oids: list, oids: Union[range, List[Union[int,str]]],
label: str = "unknown method") -> list: label: str = "unknown method"
) -> List[Union[str,None]]:
""" """
Read a list of bytes from the list of Epson EEPROM addresses 'oids'. Read a list of bytes from the Epson EEPROM at addresses in `oids`,
using a single parallel batch SNMP query.
Accepts a list of ints/strs or a range() of ints.
Returns a list of two-hex-digit strings (e.g. "A3") or None,
for each OID, preserving order.
If any element is None, returns [None].
""" """
response = [self.read_eeprom(oid, label=label) for oid in oids] # Normalize a range into a list of ints
for i in response: if isinstance(oids, range):
if i is None: oids = list(oids)
# Delegate to read_eeprom (which handles both single and lists)
results = self.read_eeprom(oids, label=label)
if not isinstance(results, list):
results = [results]
if any(r is None for r in results):
return [None] return [None]
return response
return results
def write_eeprom( def write_eeprom(
self, self,
@@ -1354,7 +1524,7 @@ class EpsonPrinter:
f" OID: {oid}={hex(oid)}\n" f" OID: {oid}={hex(oid)}\n"
f" VALUE: {value} = {hex(int(value))}" f" VALUE: {value} = {hex(int(value))}"
) )
tag, response = self.snmp_mib(oid_string, label=label) tag, response = self.fetch_oid_values(oid_string, label=label)[0]
if response: if response:
logging.debug(" TAG: %s\n RESPONSE: %s", tag, repr(response)) logging.debug(" TAG: %s\n RESPONSE: %s", tag, repr(response))
if not self.dry_run and response and not ":OK;" in repr(response): if not self.dry_run and response and not ":OK;" in repr(response):
@@ -1756,7 +1926,9 @@ class EpsonPrinter:
f"SNMP_DUMP {name}:\n" f"SNMP_DUMP {name}:\n"
f" ADDRESS: {oid}" f" ADDRESS: {oid}"
) )
tag, result = self.snmp_mib(oid, label="get_snmp_info " + name) tag, result = self.fetch_oid_values(
oid, label="get_snmp_info " + name
)[0]
logging.debug(" TAG: %s\n RESPONSE: %s", tag, repr(result)) logging.debug(" TAG: %s\n RESPONSE: %s", tag, repr(result))
if name == "Power Off Timer" and result and result.find( if name == "Power Off Timer" and result and result.find(
@@ -1807,11 +1979,14 @@ class EpsonPrinter:
left_val = val left_val = val
return left_val return left_val
else: else:
try:
return "".join( return "".join(
chr(int(value or "0x3f", 16)) # "0x3f" --> "?" chr(int(value or "0x3f", 16)) # "0x3f" --> "?"
for value in self.read_eeprom_many( for value in self.read_eeprom_many(
self.parm["serial_number"], label="serial_number") self.parm["serial_number"], label="serial_number")
) )
except Exception:
return None
def get_printer_brand(self) -> str: def get_printer_brand(self) -> str:
"""Return the producer name of the printer ("EPSON").""" """Return the producer name of the printer ("EPSON")."""
@@ -1820,12 +1995,15 @@ class EpsonPrinter:
return None return None
if "brand_name" not in self.parm: if "brand_name" not in self.parm:
return None return None
try:
return ''.join( return ''.join(
[chr(int(i or "0x3f", 16)) [chr(int(i or "0x3f", 16))
for i in self.read_eeprom_many( for i in self.read_eeprom_many(
self.parm["brand_name"], label="get_brand_name" self.parm["brand_name"], label="get_brand_name"
) if i != '00'] ) if i != '00']
) )
except Exception:
return None
def get_printer_model(self) -> str: def get_printer_model(self) -> str:
"""Return the model name of the printer.""" """Return the model name of the printer."""
@@ -1834,12 +2012,15 @@ class EpsonPrinter:
return None return None
if "model_name" not in self.parm: if "model_name" not in self.parm:
return None return None
try:
return ''.join( return ''.join(
[chr(int(i or "0x3f", 16)) [chr(int(i or "0x3f", 16))
for i in self.read_eeprom_many( for i in self.read_eeprom_many(
self.parm["model_name"], label="get_model_name" self.parm["model_name"], label="get_model_name"
) if i != '00'] ) if i != '00']
) )
except Exception:
return None
def get_wifi_mac_address(self) -> str: def get_wifi_mac_address(self) -> str:
"""Return the WiFi MAC address of the printer.""" """Return the WiFi MAC address of the printer."""
@@ -1855,7 +2036,7 @@ class EpsonPrinter:
) )
) )
except Exception: except Exception:
return False return None
def get_stats(self, stat_name: str = None) -> str: def get_stats(self, stat_name: str = None) -> str:
"""Return printer statistics.""" """Return printer statistics."""
@@ -1916,13 +2097,16 @@ class EpsonPrinter:
Return firmware version. Return firmware version.
Query firmware version: 1.3.6.1.4.1.1248.1.2.2.44.1.1.2.1.118.105.1.0.0 Query firmware version: 1.3.6.1.4.1.1248.1.2.2.44.1.1.2.1.118.105.1.0.0
""" """
oid = f"{self.EEPROM_LINK}.118.105.1.0.0" # 76 69 01 00 00 oid = self.epctrl_snmp_oid(
"vi", # This command stands for Version Information.
0
)
label = "get_firmware_version" label = "get_firmware_version"
logging.debug( logging.debug(
f"SNMP_DUMP {label}:\n" f"SNMP_DUMP {label}:\n"
f" ADDRESS: {oid}" f" ADDRESS: {oid}"
) )
tag, firmware_string = self.snmp_mib(oid, label=label) tag, firmware_string = self.fetch_oid_values(oid, label=label)[0]
if not firmware_string: if not firmware_string:
return None return None
if self.invalid_response(firmware_string): if self.invalid_response(firmware_string):
@@ -1939,15 +2123,36 @@ class EpsonPrinter:
return firmware + " " + datetime.datetime( return firmware + " " + datetime.datetime(
year, month, day).strftime('%d %b %Y') year, month, day).strftime('%d %b %Y')
def get_device_identification(self) -> str:
oid = self.epctrl_snmp_oid("di", 1) # di = device identification
label = "get_device_identification"
logging.debug(
f"SNMP_DUMP {label}:\n"
f" ADDRESS: {oid}"
)
tag, device_id = self.fetch_oid_values(oid, label=label)[0]
key_map = {
"MFG": "Manufacturer",
"CMD": "Commands",
"MDL": "Model",
"CLS": "Class",
"DES": "Description"
}
return {
key_map.get(k, k): [v for v in vals if v]
for i in device_id.decode()[10:].split(";") if i
for k, *vals in [i.split(":")]
}
def get_cartridges(self) -> str: def get_cartridges(self) -> str:
"""Return list of cartridge types.""" """Return list of cartridge types."""
oid = f"{self.EEPROM_LINK}.105.97.1.0.0" # 69 61 01 00 00 oid = self.epctrl_snmp_oid("ia", 0) # ".105.97.1.0.0" # 69 61 01 00 00 (ink accessories)
label = "get_cartridges" label = "get_cartridges"
logging.debug( logging.debug(
f"SNMP_DUMP {label}:\n" f"SNMP_DUMP {label}:\n"
f" ADDRESS: {oid}" f" ADDRESS: {oid}"
) )
tag, cartridges_string = self.snmp_mib(oid, label=label) tag, cartridges_string = self.fetch_oid_values(oid, label=label)[0]
if self.invalid_response(cartridges_string): if self.invalid_response(cartridges_string):
logging.error( logging.error(
f"Invalid response for %s: '%s'", f"Invalid response for %s: '%s'",
@@ -1992,9 +2197,11 @@ class EpsonPrinter:
Query printer status: 1.3.6.1.4.1.1248.1.2.2.44.1.1.2.1.115.116.1.0.1 Query printer status: 1.3.6.1.4.1.1248.1.2.2.44.1.1.2.1.115.116.1.0.1
or 1.3.6.1.4.1.1248.1.2.2.1.1.1.4.1 or 1.3.6.1.4.1.1248.1.2.2.1.1.1.4.1
""" """
address = f"{self.EEPROM_LINK}.115.116.1.0.1" # 73 74 01 00 01 address = self.epctrl_snmp_oid("st", 1) # ".115.116.1.0.1" # 73 74 01 00 01 (status)
logging.debug(f"PRINTER_STATUS:\n ADDRESS: {address}") logging.debug(f"PRINTER_STATUS:\n ADDRESS: {address}")
tag, result = self.snmp_mib(address, label="get_printer_status") tag, result = self.fetch_oid_values(
address, label="get_printer_status"
)[0]
if not result: if not result:
return None return None
logging.debug(" TAG: %s\n RESPONSE: %s...\n%s", logging.debug(" TAG: %s\n RESPONSE: %s...\n%s",
@@ -2039,23 +2246,26 @@ class EpsonPrinter:
return None return None
if "last_printer_fatal_errors" not in self.parm: if "last_printer_fatal_errors" not in self.parm:
return None return None
try:
return self.read_eeprom_many( return self.read_eeprom_many(
self.parm["last_printer_fatal_errors"], self.parm["last_printer_fatal_errors"],
label="last_printer_fatal_errors" label="last_printer_fatal_errors"
) )
except Exception:
return None
def get_cartridge_information(self) -> str: def get_cartridge_information(self) -> str:
"""Return list of cartridge properties.""" """Return list of cartridge properties."""
response = [] response = []
for i in range(1, 9): for i in range(1, 9):
mib = f"{self.EEPROM_LINK}.105.105.2.0.1." + str(i) # 69 69 02 00 01 mib = self.epctrl_snmp_oid("ii", b"\x01" + bytes([i])) # ".105.105.2.0.1." + str(i) # 69 69 02 00 01 (ink information)
logging.debug( logging.debug(
f"Cartridge {i}:\n" f"Cartridge {i}:\n"
f" ADDRESS: {mib}" f" ADDRESS: {mib}"
) )
tag, cartridge = self.snmp_mib( tag, cartridge = self.fetch_oid_values(
mib, label="get_cartridge_information" mib, label="get_cartridge_information"
) )[0]
logging.debug(" TAG: %s\n RESPONSE: %s", tag, repr(cartridge)) logging.debug(" TAG: %s\n RESPONSE: %s", tag, repr(cartridge))
if not cartridge: if not cartridge:
continue continue
@@ -2153,16 +2363,34 @@ class EpsonPrinter:
logging.error("Cartridge value error: %s.\n%s", e, cartridges) logging.error("Cartridge value error: %s.\n%s", e, cartridges)
return None return None
def dump_eeprom(self, start: int = 0, end: int = 0xFF): def dump_eeprom(self, start: int = 0, end: int = 0xFF) -> dict[int, int]:
""" """
Dump EEPROM data from start to end (less significant byte). Dump EEPROM data from `start` to `end` (inclusive) in a single
parallel SNMP batch read.
Returns a dict mapping each address → int value. If any read fails,
that address maps to None.
""" """
d = {} # Build the list of OIDs
for oid in range(start, end + 1): oids = list(range(start, end + 1))
d[oid] = int(
self.read_eeprom(oid, label="dump_eeprom") or "-0x1", # Fire one parallel batch read
16 # read_eeprom(list) now returns List[str|None]
) hex_results = self.read_eeprom(oids, label="dump_eeprom")
# If the batch call itself errored out (None), fall back or return empty
if hex_results is None:
# All failed; return empty or map everything to None
return {oid: None for oid in oids}
# Map each hexstring (or None) to an int (or None)
d: dict[int, int] = {}
for oid, hx in zip(oids, hex_results):
if hx is None:
d[oid] = None
else:
# hx is like "5A" → int("5A",16)
d[oid] = int(hx, 16)
return d return d
def update_parameter( def update_parameter(
@@ -2214,6 +2442,38 @@ class EpsonPrinter:
return True return True
return False return False
def epctrl_snmp_oid(self, command, payload):
"""
Build the full OID based on EPSON-CTRL D4 (END4) encapsulation
(IEEE1284.4 or Dot4 by Epson encapsulated into an SNMP OID)
http://osr507doc.xinuos.com/en/OSAdminG/OSAdminG_gimp/manual-html/gimpprint_37.html
"""
assert len(command) == 2
if isinstance(payload, int):
payload = bytearray([payload])
elif isinstance(payload, list):
payload = bytes(payload)
cmd = command.encode() + struct.pack('<H', len(payload)) + payload
return self.D4_TO_OID + "." + ".".join(
str(int(i)) for i in cmd
)
def temporary_reset_waste(self, dry_run=False) -> bool:
"""
Thanks to https://codeberg.org/atufi/reinkpy/issues/12#issuecomment-1661250
"""
serial = self.get_serial_number()
sha1 = hashlib.sha1(serial.encode())
oid = self.epctrl_snmp_oid(
"rw", # This command stands for "reset waste".
b'\x01\x00' + # Unknown \x01\x00 (2 bytes)
sha1.digest() # Serial SHA1 hash. Always 20 bytes.
)
if dry_run:
return True
answer = self.fetch_oid_values(oid, label="temp_reset_waste")[0]
return b"rw:01:OK;" in answer[1]
def reset_waste_ink_levels(self, dry_run=False) -> bool: def reset_waste_ink_levels(self, dry_run=False) -> bool:
""" """
Set waste ink levels to the values specified in the configuration. Set waste ink levels to the values specified in the configuration.
@@ -2329,6 +2589,8 @@ class EpsonPrinter:
hex_bytes = self.read_eeprom_many( hex_bytes = self.read_eeprom_many(
eeprom_range, label="detect_serial_number" eeprom_range, label="detect_serial_number"
) )
if hex_bytes is [None]:
return hex_bytes, None
# Convert the hex bytes to characters # Convert the hex bytes to characters
sequence = ''.join(chr(int(byte, 16)) for byte in hex_bytes) sequence = ''.join(chr(int(byte, 16)) for byte in hex_bytes)
# Serial number pattern (10 consecutive uppercase letters or digits) # Serial number pattern (10 consecutive uppercase letters or digits)
@@ -2898,15 +3160,14 @@ if __name__ == "__main__":
) )
if args.dump_eeprom: if args.dump_eeprom:
print_opt = True print_opt = True
for addr, val in printer.dump_eeprom( start = int(ast.literal_eval(args.dump_eeprom[0]))
int(ast.literal_eval(args.dump_eeprom[0])), end = int(ast.literal_eval(args.dump_eeprom[1]))
int(ast.literal_eval(args.dump_eeprom[1])) for addr, val in printer.dump_eeprom(start, end).items():
).items(): if val is None:
print( disp_val = " --"
f"EEPROM_ADDR {hex(addr).rjust(4)} = " else:
f"{str(addr).rjust(3)}: " disp_val = f"{val:#04x}" # 0x00 … 0xFF
f"{val:#04x} = {str(val).rjust(3)}" print(f"EEPROM_ADDR 0x{addr:02X} = {addr:3d}: {disp_val}")
)
if args.query: if args.query:
print_opt = True print_opt = True
if ("stats" in printer.parm and if ("stats" in printer.parm and

View File

@@ -1,7 +1,6 @@
pyyaml pyyaml
pyasn1==0.4.8 pysnmp
git+https://github.com/etingof/pysnmp.git@master#egg=pysnmp pysnmp_sync_adapter
pyasyncore;python_version>="3.12"
tkcalendar tkcalendar
pyperclip pyperclip
black black

89
ui.py
View File

@@ -37,7 +37,7 @@ from find_printers import PrinterScanner
from text_console import TextConsole from text_console import TextConsole
VERSION = "5.3.6" VERSION = "6.0.0"
NO_CONF_ERROR = ( NO_CONF_ERROR = (
" Please select a printer model and a valid IP address," " Please select a printer model and a valid IP address,"
@@ -126,12 +126,12 @@ class EpcTextConsole(TextConsole):
"self.printer.model\n" "self.printer.model\n"
"self.printer.reverse_caesar(b'Hpttzqjv')\n" "self.printer.reverse_caesar(b'Hpttzqjv')\n"
'self.printer.reverse_caesar(bytes.fromhex("48 62 7B 62 6F 6A 62 2B"))\n' 'self.printer.reverse_caesar(bytes.fromhex("48 62 7B 62 6F 6A 62 2B"))\n'
'import pprint;pprint.pprint(self.printer.status_parser(self.printer.snmp_mib("1.3.6.1.4.1.1248.1.2.2.1.1.1.4.1")[1]))\n' 'import pprint;pprint.pprint(self.printer.status_parser(self.printer.fetch_snmp_values("1.3.6.1.4.1.1248.1.2.2.1.1.1.4.1")[1]))\n'
"self.printer.read_eeprom_many([0])\n" "self.printer.read_eeprom_many([0])\n"
"self.printer.read_eeprom(0)\n" "self.printer.read_eeprom(0)\n"
"self.printer.reset_waste_ink_levels()\n" "self.printer.reset_waste_ink_levels()\n"
"self.printer.snmp_mib(self.printer.eeprom_oid_read_address(0))\n" "self.printer.fetch_snmp_values(self.printer.eeprom_oid_read_address(0))\n"
"self.printer.snmp_mib('1.3.6.1.4.1.1248.1.2.2.44.1.1.2.1.124.124.7.0.25.7.65.190.160.0.0')\n" "self.printer.fetch_snmp_values('1.3.6.1.4.1.1248.1.2.2.44.1.1.2.1.124.124.7.0.25.7.65.190.160.0.0')\n"
"self.get_ti_date(cursor=True)" "self.get_ti_date(cursor=True)"
) )
) )
@@ -715,11 +715,11 @@ class EpsonPrinterUI(tk.Tk):
row_n += 1 row_n += 1
button_frame = ttk.Frame(main_frame, padding=PAD) button_frame = ttk.Frame(main_frame, padding=PAD)
button_frame.grid(row=row_n, column=0, pady=PADY, sticky=(tk.W, tk.E)) button_frame.grid(row=row_n, column=0, pady=PADY, sticky=(tk.W, tk.E))
button_frame.columnconfigure((0, 1, 2), weight=1) # expand columns button_frame.columnconfigure((0, 1, 2, 3), weight=1) # expand columns
# Query Printer Status # Query Printer Status
self.status_button = ttk.Button( self.status_button = ttk.Button(
button_frame, text="Printer Status", button_frame, text="Printer\nStatus",
command=self.printer_status, command=self.printer_status,
style="Centered.TButton" style="Centered.TButton"
) )
@@ -730,7 +730,7 @@ class EpsonPrinterUI(tk.Tk):
# Query list of cartridge types # Query list of cartridge types
self.web_interface_button = ttk.Button( self.web_interface_button = ttk.Button(
button_frame, button_frame,
text="Printer Web interface", text="Printer\nWeb interface",
command=self.web_interface, command=self.web_interface,
style="Centered.TButton" style="Centered.TButton"
) )
@@ -741,7 +741,7 @@ class EpsonPrinterUI(tk.Tk):
# Detect configuration values # Detect configuration values
self.detect_configuration_button = ttk.Button( self.detect_configuration_button = ttk.Button(
button_frame, button_frame,
text="Detect Configuration", text="Detect\nConfiguration",
command=self.detect_configuration, command=self.detect_configuration,
style="Centered.TButton" style="Centered.TButton"
) )
@@ -749,6 +749,17 @@ class EpsonPrinterUI(tk.Tk):
row=0, column=2, padx=PADX, pady=PADX, sticky=(tk.W, tk.E) row=0, column=2, padx=PADX, pady=PADX, sticky=(tk.W, tk.E)
) )
# Temporary Reset Waste Ink Levels
self.detect_configuration_button = ttk.Button(
button_frame,
text="Temporary Reset\nWaste Ink Levels",
command=self.temp_reset_waste_ink,
style="Centered.TButton"
)
self.detect_configuration_button.grid(
row=0, column=3, padx=PADX, pady=PADX, sticky=(tk.W, tk.E)
)
# [row 4] Tweak Buttons # [row 4] Tweak Buttons
row_n += 1 row_n += 1
tweak_frame = ttk.Frame(main_frame, padding=PAD) tweak_frame = ttk.Frame(main_frame, padding=PAD)
@@ -1523,6 +1534,7 @@ Web site: https://github.com/Ircama/epson_print_conf
self.update_idletasks() self.update_idletasks()
def get_current_eeprom_values(self, values, label): def get_current_eeprom_values(self, values, label):
values = list(values)
try: try:
org_values = ', '.join( org_values = ', '.join(
"" if v is None else f"{k}: {int(v, 16)}" for k, v in zip( "" if v is None else f"{k}: {int(v, 16)}" for k, v in zip(
@@ -1759,7 +1771,9 @@ Web site: https://github.com/Ircama/epson_print_conf
list_ser_num = [pr_ser_num] list_ser_num = [pr_ser_num]
for i in list_ser_num: for i in list_ser_num:
try: try:
if not self.get_current_eeprom_values(i, "Printer Serial Number"): if not self.get_current_eeprom_values(
i, "Printer Serial Number"
):
self.config(cursor="") self.config(cursor="")
self.update_idletasks() self.update_idletasks()
return return
@@ -3010,6 +3024,63 @@ Web site: https://github.com/Ircama/epson_print_conf
self.config(cursor="") self.config(cursor="")
self.update_idletasks() self.update_idletasks()
def temp_reset_waste_ink(self, cursor=True):
if cursor:
self.config(cursor="watch")
self.update()
current_function_name = inspect.stack()[0][3]
method_to_call = getattr(self, current_function_name)
self.after(100, lambda: method_to_call(cursor=False))
return
self.show_status_text_view()
ip_address = self.ip_var.get()
if (
not self._is_valid_ip(ip_address)
or not self.printer
or not self.printer.parm
or "read_key" not in self.printer.parm
or "write_key" not in self.printer.parm
):
self.status_text.insert(tk.END, '[ERROR]', "error")
self.status_text.insert(tk.END, NO_CONF_ERROR)
self.config(cursor="")
self.update_idletasks()
return
if not self.printer:
return
msg = (
"Confirm Action",
"This feature temporarily bypasses the ink waste tank full warning,"
" which would otherwise disable printing. "
"\n\nThis setting does not persist a reboot. "
"\n\nAre you sure you want to proceed?"
)
response = messagebox.askyesno(*msg, default='no')
if response:
try:
if self.printer.temporary_reset_waste():
self.status_text.insert(tk.END, '[INFO]', "info")
self.status_text.insert(
tk.END,
" Waste ink levels have been temporarily reset."
" You can now print.\n"
)
else:
self.status_text.insert(tk.END, '[ERROR]', "error")
self.status_text.insert(
tk.END,
" Failed to perform the temporary reset of the "
"waste ink levels."
)
except Exception as e:
self.handle_printer_error(e)
else:
self.status_text.insert(
tk.END, f"[WARNING] Waste ink levels reset aborted.\n"
)
self.config(cursor="")
self.update_idletasks()
def start_detect_printers(self): def start_detect_printers(self):
self.show_status_text_view() self.show_status_text_view()
self.status_text.insert(tk.END, '[INFO]', "info") self.status_text.insert(tk.END, '[INFO]', "info")