diff --git a/Dockerfile b/Dockerfile index 774c037..3388bee 100644 --- a/Dockerfile +++ b/Dockerfile @@ -32,9 +32,8 @@ COPY . . RUN pip install --no-cache-dir \ pyyaml \ - pyasn1==0.4.8 \ - git+https://github.com/etingof/pysnmp.git@master#egg=pysnmp \ - pyasyncore \ + pysnmp \ + pysnmp_sync_adapter \ tkcalendar \ pyperclip \ black \ diff --git a/README.md b/README.md index 27307f4..585ca30 100644 --- a/README.md +++ b/README.md @@ -20,18 +20,33 @@ The software also includes a configurable printer dictionary, which can be easil start with `@BDC [SP] ST2 [CR] [LF]` ...). @BDC ST2 is used to convey various aspects of the status of the printer, such as errors, paper status, ink and more. The element fields of this format may vary depending on the printer model. The *Epson Printer Configuration Tool* can decode all element fields found in publicly available Epson Programming Manuals of various printer models (a relevant subset of fields used by the Epson printers). - __Advanced Maintenance Functions__: + - Open the Web interface of the printer (via the default browser). + + - Temporary reset of the ink waste counter. + + The ink waste counters track the amount of ink discarded during maintenance tasks to prevent overflow in the waste ink pads. Once the counters indicate that one of the printer pads is full, the printer will stop working to avoid potential damage or ink spills. The "Printer status" button includes information showing the levels of the waste ink tanks; specifically, two sections are relevant: "Maintenance box information" ("maintenance_box_...") and "Waste Ink Levels" ("waste_ink_levels"). The former has a counter associated for each tank, which indicates the number of temporary resets performed by the user to temporarily restore a disabled printer. + + The feature to temporarily reset the ink waste counter is effective if the Maintenance box information reports that the Maintenance Box is full; it temporarily bypasses the ink waste tank full warning, which would otherwise disable printing. It is important to know that this setting is reset upon printer reboot (it does not affect the EEPROM) and can be repeated. Each time the Maintenance box status switches from "full" to "not full", the "ink replacement cleaning counter" is increased. A pad maintenance or tank replacement has to be programmed meanwhile. + - Reset the ink waste counter. - The ink waste counters track the amount of ink discarded during maintenance tasks to prevent overflow in the waste ink pads. Once the counters indicate that one of the printer pads is full, the printer will stop working to avoid potential damage or ink spills. Resetting the ink waste counter extends the printer operation while a pad maintenance or tank replacement is programmed (operation that shall necessarily be pefromed). + This feature permanently resets the ink waste counter. + + Resetting the ink waste counter extends the printer operation while a physical pad maintenance or tank replacement is programmed (operation that shall necessarily be pefromed). + - Adjust the power-off timer (for energy efficiency). + - Change the _First TI Received Time_, The *First TI Received Time* in Epson printers typically refers to the timestamp of the first transmission instruction to the printer. This feature tracks when the printer first operated. - Change the printer WiFi MAC address and the printer serial number (typically used in specialized scenarios where specific device identifiers are required). + - Read and write to EEPROM addresses. + - Dump and analyze sets of EEPROM addresses. + - Detect the access key (*read_key* and *write_key*) and some attributes of the printer configuration. The GUI includes some features that attempt to detect the attributes of an Epson printer whose model is not included in the configuration; such features can also be used with known printers, to detect additional parameters. @@ -84,12 +99,7 @@ cd epson_print_conf pip install -r requirements.txt ``` -Notes (at the time of writing): - -- [before pysnmp, install pyasn1 with version 0.4.8 and not 0.5](https://github.com/etingof/pysnmp/issues/440#issuecomment-1544341598) -- [pull pysnmp from the GitHub master branch, not from PyPI](https://stackoverflow.com/questions/54868134/snmp-reading-from-an-oid-with-three-libraries-gives-different-execution-times#comment96532761_54869361) - -This program exploits [pysnmp](https://github.com/etingof/pysnmp), basing on the related [documentation](https://pysnmp.readthedocs.io/). +This program exploits [pysnmp v7+](https://github.com/lextudio/pysnmp) and [pysnmp-sync-adapter](https://github.com/Ircama/pysnmp-sync-adapter). It is tested with Ubuntu / Windows Subsystem for Linux, Windows. @@ -188,10 +198,12 @@ For the following models there is no known way to read the EEPROM via SNMP proto - [ET-2850, ET-2851, ET-2853, ET-2855, ET-2856](https://github.com/Ircama/epson_print_conf/issues/26) - [ET-4800](https://github.com/Ircama/epson_print_conf/issues/29) with new firmware (older firmware might work) - [L3250](https://github.com/Ircama/epson_print_conf/issues/35) +- [L3260](https://github.com/Ircama/epson_print_conf/issues/66) with firmware version 05.23.XE21P2 - [L18050](https://github.com/Ircama/epson_print_conf/issues/47) - [EcoTank ET-2862 with firmware 05.18.XF12OB dated 12/11/2024](https://github.com/Ircama/epson_print_conf/discussions/58) and possibly ET-2860 / 2861 / 2863 / 2865 series. +- [XP-2200 with firmware 06.58.IU05P2](https://github.com/Ircama/epson_print_conf/issues/51) -For model XP-2200, check https://github.com/Ircama/epson_print_conf/issues/51 +The button "Temporary Reset Waste Ink Levels" should still work with these printers. ### Using the command-line tool @@ -475,7 +487,7 @@ Generic query of the status of the printer (regardless of the model): from epson_print_conf import EpsonPrinter import pprint printer = EpsonPrinter(hostname="192.168.1.87") -pprint.pprint(printer.status_parser(printer.snmp_mib("1.3.6.1.4.1.1248.1.2.2.1.1.1.4.1")[1])) +pprint.pprint(printer.status_parser(printer.fetch_snmp_values("1.3.6.1.4.1.1248.1.2.2.1.1.1.4.1")[1])) ``` ### Byte sequences diff --git a/SNMP_LIB_PERF_COMP.md b/SNMP_LIB_PERF_COMP.md index b8ab002..ff21512 100644 --- a/SNMP_LIB_PERF_COMP.md +++ b/SNMP_LIB_PERF_COMP.md @@ -13,6 +13,7 @@ Compared libraries and architectures: - [pysnmp v5.1](https://github.com/lextudio/pysnmp/) synchronous mode, - [pysnmp v7.1](https://github.com/lextudio/pysnmp/) asynchronous mode, - [pysnmp v7.1 with pysnmp-sync-adapter](https://github.com/Ircama/pysnmp-sync-adapter) synchronous wrapper, +- [pysnmp v7.1 with pysnmp-sync-adapter and cluster_varbinds](https://github.com/Ircama/pysnmp-sync-adapter#cluster_varbinds) for highest performances, - raw socket SNMPv1 implementation (synchronous mode). - Pure python implementation using the [asn1](https://github.com/andrivet/python-asn1) and the default socket libraries. - [py-snmp-sync](https://github.com/Ircama/py-snmp-sync) synchronous client implemented over PySNMP. @@ -36,15 +37,17 @@ To mitigate this, several approaches have been explored: * [`pysnmp-sync-adapter`](https://github.com/Ircama/pysnmp-sync-adapter): a lightweight compatibility layer wrapping `pysnmp.hlapi.v1arch.asyncio` and `pysnmp.hlapi.v3arch.asyncio` with blocking equivalents (e.g., `get_cmd_sync`). It reuses the asyncio event loop and transport targets, avoiding per-call overhead and achieving optimal performance while maintaining a synchronous API. -* [`py-snmp-sync`](https://github.com/Ircama/py-snmp-sync): offers even better performance by bypassing the asyncio-based API entirely. Instead, it directly uses the lower-level shared components of `pysnmp` that support both sync and async execution. It implements a custom `SyncUdpTransportTarget` based on raw sockets. However, it currently supports only a specialized form of `get_cmd`, limiting general HLAPI compatibility. +* [`py-snmp-sync`](https://github.com/Ircama/py-snmp-sync): offers high performance by bypassing the asyncio-based API entirely. Instead, it directly uses the lower-level shared components of `pysnmp` that support both sync and async execution. It implements a custom `SyncUdpTransportTarget` based on raw sockets. However, it currently supports only a specialized form of `get_cmd`, limiting general HLAPI compatibility. * A separate low-level implementation using ASN.1 and sockets directly is also tested. This approach shows excellent performance for the `get_cmd` request/response pattern but is significantly more complex to maintain and does not support the full SNMP operation set. Each approach offers trade-offs between generality, maintainability, and performance. For applications requiring full HLAPI compatibility with minimal refactoring, `pysnmp-sync-adapter` is a practical and efficient choice. For tightly optimized use cases the raw variants can provide superior throughput. +Optimal performance is achieved using the `cluster_varbinds` utility from `pysnmp-sync-adapter`, which provides possibly the simplest synchronous interface and includes optimized parallel processing which wraps `asyncio` under the hood. + --- -## Code used for the benchmarks +## Code used for the benchmarks and results ### Usage of https://github.com/etingof/pysnmp @@ -104,8 +107,8 @@ if __name__ == '__main__': # Usage of https://github.com/pysnmp/pysnmp # pip uninstall pysnmp -# pip uninstall pysnmplib # pip install pyasn1==0.4.8 +# pip install pysnmplib # Alternative working library: pip install pysnmp==5.1.0 (https://docs.lextudio.com/snmp/) @@ -155,7 +158,7 @@ if __name__ == '__main__': ```python # Usage of https://github.com/lextudio/pysnmp 7.1 -# Simulate sync behaviour in an extremely inefficient and slow mode +# Simulate sync behaviour via asyncio.run() (extremely inefficient and slow mode) # pip uninstall pysnmplib # pip uninstall pyasn1==0.4.8 @@ -217,7 +220,7 @@ if __name__ == '__main__': ```python # Usage of https://github.com/lextudio/pysnmp -# Same as mytest4, but single ObjectType in get_cmd +# Using asyncio.gather() for 100 asynch tasks. Single ObjectType in get_cmd # pip uninstall pysnmplib # pip uninstall pysnmp # git+https://github.com/etingof/pysnmp.git@master#egg=pysnmp @@ -285,7 +288,8 @@ if __name__ == '__main__': ```python # Usage of https://github.com/lextudio/pysnmp -# Max performance (same as mytest3, but multiple ObjectType in get_cmd) +# Multiple ObjectType in get_cmd +# Using asyncio.gather() for 10 asynch tasks, each including a PDU of 10 OIDs. # pip uninstall pysnmplib # pip uninstall pysnmp # git+https://github.com/etingof/pysnmp.git@master#egg=pysnmp @@ -430,6 +434,77 @@ if __name__ == '__main__': # --- 1.234 seconds --- ``` +### https://github.com/Ircama/pysnmp-sync-adapter#cluster_varbinds over PySNMP. + +This simple approach offers the best performances among all tests. + +```python +# pip uninstall pysnmplib +# pip install pysnmp-sync-adapter + +import sys +import time +import asyncio +import platform +from pysnmp.hlapi.v1arch.asyncio import * +from pysnmp_sync_adapter import ( + parallel_get_sync, create_transport, cluster_varbinds +) + +def main(): + if len(sys.argv) < 2: + print("Usage: python script.py ") + sys.exit(1) + + if platform.system()=='Windows': + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + + host = sys.argv[1] + oid_str = '1.3.6.1.2.1.25.3.2.1.3.1' + community = 'public' + + # Pre-create the engine once + dispatcher = SnmpDispatcher() + + # Pre-create the transport once + transport = create_transport(UdpTransportTarget, (host, 161), timeout=1) + + # Pre-create CommunityData once + auth_data = CommunityData(community, mpModel=0) + oid_t = ObjectType(ObjectIdentity(oid_str)) + + # Create 100 queries using optimized PDU composition + wrapped_queries = [ObjectType(ObjectIdentity(oid_str)) for _ in range(100)] + wrapped_queries = cluster_varbinds(wrapped_queries, max_per_pdu=10) + + start = time.time() + for error_ind, error_status, error_index, var_binds in parallel_get_sync( + dispatcher, + auth_data, + transport, + queries=wrapped_queries + ): + if error_ind: + print(f"SNMP error: {error_ind}") + quit() + elif error_status: + print(f'{error_status.prettyPrint()} at {error_index and var_binds[int(error_index) - 1][0] or "?"}') + quit() + else: + for oid, val in var_binds: + print(val.prettyPrint()) + + print(f"--- {time.time() - start:.3f} seconds ---") + +if __name__ == '__main__': + main() + +# --- 0.410 seconds --- +# --- 0.424 seconds --- +# --- 0.360 seconds --- +# --- 0.423 seconds --- +``` + ### Usage of the oneliner package, being deprecated in newer versions of pysnmp ```python diff --git a/epson_print_conf.py b/epson_print_conf.py index 80706e9..139882b 100644 --- a/epson_print_conf.py +++ b/epson_print_conf.py @@ -6,8 +6,9 @@ Epson Printer Configuration via SNMP (TCP/IP) """ import itertools +from itertools import chain import re -from typing import Any, List +from typing import Any, List, Tuple, Union import datetime import time import textwrap @@ -17,19 +18,19 @@ import os import yaml from pathlib import Path import pickle +import abc +import hashlib +import struct -# The pysnmp module uses functionality from importlib.util and -# importlib.machinery, which were seperated from the importlib module -# in python>=3.11 -try: - import importlib.util - import importlib.machinery -except ImportError: - pass -from pysnmp.hlapi.v1arch import * # this imports UdpTransportTarget - +from pysnmp.hlapi.v1arch.asyncio import * from pyasn1.type.univ import OctetString as OctetStringType -from itertools import chain +from pysnmp_sync_adapter import ( + get_cmd_sync, + parallel_get_sync, + create_transport, + cluster_varbinds +) +from pysnmp.proto.errind import RequestTimedOut class EpsonPrinter: @@ -526,8 +527,8 @@ class EpsonPrinter: "write_key": b'Wakatobi', "printer_head_id_h": range(122, 126), "printer_head_id_f": [129], - "main_waste": {"oids": [24, 25, 30], "divider": 69}, - "borderless_waste": {"oids": [26, 27, 34], "divider": 32.53}, + "main_waste": {"oids": [24, 25], "divider": 69}, + "borderless_waste": {"oids": [26, 27], "divider": 32.53}, "serial_number": range(192, 202), "stats": { "Manual cleaning counter": [147], @@ -542,10 +543,12 @@ class EpsonPrinter: "Power off timer": [359, 358], }, "raw_waste_reset": { - 24: 0, 25: 0, 30: 0, # Data of 1st counter - 28: 0, 29: 0, # another store of 1st counter + 24: 0, 25: 0, # Data of 1st waste ink level + 30: 0, # First maintenance box reset counter + 28: 0, 29: 0, # another store of 1st waste ink level 46: 94, # Maintenance required level of 1st counter - 26: 0, 27: 0, 34: 0, # Data of 2nd counter + 26: 0, 27: 0, # Data of 2nd waste ink level + 34: 0, # Second maintenance box reset counter 47: 94, # Maintenance required level of 2st counter 49: 0 # ? }, @@ -811,6 +814,36 @@ class EpsonPrinter: "serial_number": range(1604, 1614), "alias": ["XP-2100", "XP-2151", "XP-2155"], }, + "XP-2200": { # 06.51.IU19M506.58.IU05P2 + "read_key": [75, 54], + "write_key": b"Kenjeran", + "main_waste": {"oids": [337, 338, 336], "divider": 69.0}, + "borderless_waste": {"oids": [339, 340, 336], "divider": 30.49}, + "raw_waste_reset": { + 336: 0, 337: 0, 338: 0, 339: 0, 340: 0, 341: 0, 343: 94, + 342: 0, 344: 94, 28: 0 + }, + "stats": { + "First TI received time": [9, 8], + "Manual cleaning counter": [203], + "Timer cleaning counter": [205], + "Total print pass counter": [133, 132, 131, 130], + "Total scan counter": [1843, 1842, 1841, 1840], + "Total print page counter": [792, 791, 790, 789], + "Ink replacement counter - Black": [554], + "Ink replacement counter - Cyan": [555], + "Ink replacement counter - Magenta": [556], + "Ink replacement counter - Yellow": [557], + "Maintenance required level of 1st waste ink counter": [343], + "Maintenance required level of 2nd waste ink counter": [344], + "Power off timer 1": [230, 229], + "Power off timer 2": [231, 230], + "Power off timer 3": [262, 261], + }, + "serial_number": range(1604, 1614), + "wifi_mac_address": range(1920, 1926), + "alias": ["XP-2205"], + }, "ET-2500": { "read_key": [68, 1], "write_key": b"Gerbera*", @@ -899,7 +932,7 @@ class EpsonPrinter: MIB_OID_ENTERPRISE = "1.3.6.1.4.1" MIB_EPSON = MIB_OID_ENTERPRISE + ".1248" OID_PRV_CTRL = "1.2.2.44.1.1.2" - EEPROM_LINK = f'{MIB_EPSON}.{OID_PRV_CTRL}.1' + D4_TO_OID = f'{MIB_EPSON}.{OID_PRV_CTRL}.1' MIB_INFO = { "Model": f"{MIB_MGMT}.1.25.3.2.1.3.1", @@ -929,11 +962,12 @@ class EpsonPrinter: "IP Address": f"{MIB_EPSON}.1.1.3.1.4.19.1.3.1", "IPP_URL_path": f"{MIB_EPSON}.1.1.3.1.4.19.1.4.1", "IPP_URL": f"{MIB_EPSON}.1.1.3.1.4.46.1.2.1", + "LPR_URL": "1.3.6.1.4.1.2699.1.2.1.3.1.1.4.1.1", + "Driver": "1.3.6.1.4.1.1248.1.1.3.1.29.3.1.27.0", "WiFi": f"{MIB_EPSON}.1.1.3.1.29.2.1.9.0", "MAC Addr": f"{MIB_EPSON}.1.1.3.1.1.5.0", "device_id": f"{MIB_OID_ENTERPRISE}.11.2.3.9.1.1.7.0", "Epson device id": f"{MIB_EPSON}.1.2.2.1.1.1.1.1", - "Power Off Timer": f"{EEPROM_LINK}.111.116.2.0.1.1" } MIB_INFO_ADVANCED = { @@ -1000,6 +1034,9 @@ class EpsonPrinter: ] if not values['alias']: del values['alias'] + self.MIB_INFO["Power Off Timer"] = self.epctrl_snmp_oid( + "ot", b"\x01\x01" + ) # ".111.116.2.0.1.1" (off timer) self.model = model self.hostname = hostname self.port = port @@ -1106,8 +1143,10 @@ class EpsonPrinter: logging.info(f"No value for method '{method}'.") return stat_set - def caesar(self, key, hex=False): + def caesar(self, key, hex=False, list=False): """Convert the string write key to a sequence of numbers""" + if list: + return [ 0 if b == 0 else b + 1 for b in key ] if hex: return " ".join( '00' if b == 0 else '{0:02x}'.format(b + 1) for b in key @@ -1146,14 +1185,14 @@ class EpsonPrinter: return None if 'read_key' not in self.parm: return None - return ( - f"{self.EEPROM_LINK}" - ".124.124" # || (7C 7C) - ".7.0" # read (07 00) - f".{self.parm['read_key'][0]}" - f".{self.parm['read_key'][1]}" - ".65.190.160" - f".{oid}.{msb}" + return self.epctrl_snmp_oid( + "||", # (7C 7C); "||" stands for EEPROM + [ + self.parm['read_key'][0], + self.parm['read_key'][1], + 65, 190, 160, # (read) + oid, msb + ] ) def eeprom_oid_write_address( @@ -1180,15 +1219,14 @@ class EpsonPrinter: 'write_key' not in self.parm or 'read_key' not in self.parm): return None - write_op = ( - f"{self.EEPROM_LINK}" - ".124.124" # || 7C 7C - ".16.0" # write (10 00) - f".{self.parm['read_key'][0]}" - f".{self.parm['read_key'][1]}" - ".66.189.33" # 42 BD 21 - f".{oid}.{msb}.{value}" - f".{self.caesar(self.parm['write_key'])}" + write_op = self.epctrl_snmp_oid( + "||", # (7C 7C); "||" stands for EEPROM + [ + self.parm['read_key'][0], + self.parm['read_key'][1], + 66, 189, 33, # 42 BD 21 (write) + oid, msb, value + ] + self.caesar(self.parm['write_key'], list=True) ) if self.dry_run: logging.warning("WRITE_DRY_RUN: %s", write_op) @@ -1196,84 +1234,172 @@ class EpsonPrinter: else: return write_op - def snmp_mib(self, mib: str, label: str = "unknown") -> (str, Any): - """Generic SNMP query, returning value of a MIB.""" + def fetch_oid_values( + self, + oid: Union[str, List[Union[str, List[str]]]], + label: str = "unknown" + ) -> Union[ + Tuple[str, Any], + List[Tuple[str, Any]] + ]: + """ + Query one or more OIDs and return their values. + + - If oid is a single string, returns [(type_name, value)]. + - If oid is a list of strings or list-of-lists, returns a list of + (type_name, value) in the same order. + + Lists of strings are grouped into a single PDU; top-level list runs + in parallel using parallel_get_sync. + """ + # Config‐file overrides if self.mib_dict: - if mib not in self.mib_dict: - logging.error( - "MIB '%s' not valued in the configuration file. " - "Operation: %s", - mib, - label - ) - return None, False - return self.mib_dict[mib] + # single‐OID case only + if isinstance(oid, str): + if oid not in self.mib_dict: + logging.error( + "MIB '%s' not in config. Operation: %s", oid, label + ) + return None, False + return self.mib_dict[oid] + else: + # list case: map through dict + results = [] + for element in oid: + if isinstance(element, str): + if element not in self.mib_dict: + logging.error( + "MIB '%s' missing in config. Operation: %s", + element, label + ) + results.append((None, False)) + else: + results.append(self.mib_dict[element]) + else: + # inner list grouping not supported by config + results.append((None, False)) + return results + + # Build or reuse SNMP network config if not self.hostname: return None, False - if ( - self.hostname, self.port, self.timeout, self.retries - ) != self.used_net_val: + + net_val = (self.hostname, self.port, self.timeout, self.retries) + if net_val != self.used_net_val: try: self.snmp_conf = ( SnmpDispatcher(), - CommunityData('public', mpModel=0), - UdpTransportTarget( - (self.hostname, self.port, self.timeout, self.retries) + CommunityData("public", mpModel=0), + create_transport( + UdpTransportTarget, + (self.hostname, self.port), + timeout=self.timeout, retries=self.retries ) ) except Exception as e: - logging.critical("snmp_mib invalid address: %s", e) + logging.critical("fetch_oid_values invalid address: %s", e) self.used_net_val = () return None, False - self.used_net_val = ( - self.hostname, self.port, self.timeout, self.retries - ) + + self.used_net_val = net_val + if not self.snmp_conf: return None, False - iterator = getCmd(*self.snmp_conf, (mib, None)) - for response in iterator: - errorIndication, errorStatus, errorIndex, varBinds = response - if errorIndication: - logging.info( - "snmp_mib error: %s. MIB: %s. Operation: %s", - errorIndication, mib, label - ) - if " timed out" in errorIndication: - raise TimeoutError(errorIndication) - return None, False - elif errorStatus: - logging.info( - 'snmp_mib PDU error: %s at %s. MIB: %s. Operation: %s', - errorStatus.prettyPrint(), - errorIndex and varBinds[int(errorIndex) - 1][0] or '?', - mib, - label - ) - return None, False - else: - for varBind in varBinds: - if isinstance(varBind[1], OctetStringType): - return( - varBind[1].__class__.__name__, - varBind[1].asOctets() - ) - else: - return( - varBind[1].__class__.__name__, - varBind[1].prettyPrint() - ) - logging.info( - "snmp_mib value error: invalid multiple data. " - "MIB: %s. Operation: %s", - mib, - label + + # SNMP lookup + def _single_lookup(single_oid: str) -> Tuple[str, Any]: + """ + Internal helper to perform one get_cmd_sync. + """ + engine, auth, transport = self.snmp_conf + errorInd, errorStat, errorIdx, varBinds = get_cmd_sync( + engine, auth, transport, + ObjectType(ObjectIdentity(single_oid)), + timeout=self.timeout ) - return None, False - logging.info( - "snmp_mib value error: invalid data. MIB: %s. Operation: %s", - label + # transport-level timeout? + if isinstance(errorInd, RequestTimedOut): + raise TimeoutError(errorInd) + elif errorInd is not None: + logging.info("fetch_oid_values error: %s. OID: %s. Label: %s", + errorInd, single_oid, label) + return None, False + + # SNMP-level errorStatus + if int(errorStat) != 0: + # find offending OID + bad_oid = varBinds[int(errorIdx) - 1][0] if errorIdx else "?" + logging.info( + "fetch_oid_values PDU error: %s at %s. OID: %s. Label: %s", + errorStat.prettyPrint(), bad_oid, single_oid, label + ) + return None, False + + # unpack the varBinds + final = [] + for oid_name, val in varBinds: + if isinstance(val, OctetStringType): + final.append((val.__class__.__name__, val.asOctets())) + else: + final.append((val.__class__.__name__, val.prettyPrint())) + + return final + + # Dispatch single vs batch + if isinstance(oid, str): + return _single_lookup(oid) + + # list of queries + # normalize list elements → either str or [str,...] + queries = [] + for elt in oid: + if isinstance(elt, str): + queries.append([elt]) # single‐OID PDU + elif isinstance(elt, (list, tuple)): + queries.append(list(elt)) # grouped‐OID PDU + else: + queries.append([]) + + # run parallel_get_sync: each inner list packs into one PDU, all run in parallel + engine, auth, transport = self.snmp_conf + # build ObjectType lists + wrapped_queries = [ + [ ObjectType(ObjectIdentity(x)) for x in group ] + for group in queries + ] + wrapped_queries = cluster_varbinds(wrapped_queries, max_per_pdu=3) + raw_results = parallel_get_sync( + engine, + auth, + transport, + queries=wrapped_queries, + max_parallel=5 ) - return None, False + + # raw_results is a list of SNMP tuples; map them through the same extraction logic + final = [] + for (errI, errS, errX, vbs) in raw_results: + # transport-level timeout? + if isinstance(errI, RequestTimedOut): + raise TimeoutError(errI) + + # SNMP errorStatus? + if errI is not None or int(errS) != 0: + # on error we don’t know how many OIDs were in this PDU, + # but we do know len(vbs), so record a failure for each + final.extend([(None, False)] * len(vbs)) + continue + + # unpack each var-bind in this PDU, in order + for obj in vbs: + # obj is an ObjectType; obj[1] is the value + val = obj[1] + if isinstance(val, OctetStringType): + final.append((val.__class__.__name__, val.asOctets())) + else: + final.append((val.__class__.__name__, val.prettyPrint())) + + return final def invalid_response(self, response): if response is False: @@ -1281,55 +1407,99 @@ class EpsonPrinter: return len(response) < 2 or response[0] != 0 or response[-1] != 12 def read_eeprom( - self, - oid: int, - label: str = "unknown method") -> str: - """Read a single byte from the Epson EEPROM address 'oid'.""" - logging.debug( - f"EEPROM_DUMP {label}:\n" - f" ADDRESS: " - f"{self.eeprom_oid_read_address(oid, label=label)}\n" - f" OID: {oid}={hex(oid)}" + self, + oid: Union[int, str, List[Union[int,str]]], + label: str = "unknown method" + ) -> Union[str, List[Union[str,None]]]: + """ + Read one or more EEPROM bytes at the given OID(s). + + - Single int/str → returns the two-hex-digit string or None. + - List of int/str → returns a list of those strings/None, in order. + """ + def _process_response( + tag: Any, response: Any, oid_val: int + ) -> Union[str, None]: + """Extract and validate the 'EE:xxxxxx' payload for one response.""" + if not response or self.invalid_response(response): + logging.error("Invalid response for OID %s (%s): %r", oid_val, label, response) + return None + + # find the EE:xxxxxx substring + try: + txt = response.decode() if isinstance( + response, (bytes, bytearray) + ) else response + match = re.search(r"EE:([0-9A-Fa-f]{6})", txt) + payload = match.group(1) + except Exception: + logging.info( + "Invalid read key for OID %s (%s)", oid_val, label + ) + return None + + # split into address + value + addr_hex, val_hex = payload[:4], payload[4:] + if int(addr_hex, 16) != oid_val: + logging.critical( + "EEPROM address mismatch: expected %04x != returned %s; %s", + oid_val, addr_hex, label + ) + return None + + return val_hex.upper() + + # Build the address for SNMP + def _addr(o): + return self.eeprom_oid_read_address(o, label=label) + + # Call fetch_oid_values (single or batch) + resp = self.fetch_oid_values( + _addr(oid) if not isinstance(oid, list) else [ + _addr(o) for o in oid + ], + label=label ) - tag, response = self.snmp_mib( - self.eeprom_oid_read_address(oid, label=label), label=label - ) - if not response: - return None - if self.invalid_response(response): - logging.error( - f"Invalid response: '%s' for oid %s (%s)", - repr(response), oid, label - ) - return None - logging.debug(" TAG: %s\n RESPONSE: %s", tag, repr(response)) - try: - response = re.findall( - r"EE:[0-9a-fA-F]{6}", response.decode())[0][3:] - except (TypeError, IndexError): - logging.info(f"Invalid read key.") - return None - chk_addr = response[0:4] - value = response[4:6] - if int(chk_addr, 16) != oid: - raise ValueError( - f"Address and response address are" - f" not equal: {oid} != {chk_addr}" - ) - return value + # resp is a list of (tag, response) + if isinstance(oid, int): + tag, response = resp[0] + return _process_response(tag, response, oid) + results: List[Union[str,None]] = [] + for o, entry in zip(oid, resp): + tag, response = entry + results.append(_process_response(tag, response, int(o))) + + return results def read_eeprom_many( - self, - oids: list, - label: str = "unknown method") -> list: + self, + oids: Union[range, List[Union[int,str]]], + label: str = "unknown method" + ) -> List[Union[str,None]]: """ - Read a list of bytes from the list of Epson EEPROM addresses 'oids'. + Read a list of bytes from the Epson EEPROM at addresses in `oids`, + using a single parallel batch SNMP query. + + Accepts a list of ints/strs or a range() of ints. + + Returns a list of two-hex-digit strings (e.g. "A3") or None, + for each OID, preserving order. + + If any element is None, returns [None]. """ - response = [self.read_eeprom(oid, label=label) for oid in oids] - for i in response: - if i is None: - return [None] - return response + # Normalize a range into a list of ints + if isinstance(oids, range): + oids = list(oids) + + # Delegate to read_eeprom (which handles both single and lists) + results = self.read_eeprom(oids, label=label) + if not isinstance(results, list): + results = [results] + + if any(r is None for r in results): + return [None] + + return results def write_eeprom( self, @@ -1354,7 +1524,7 @@ class EpsonPrinter: f" OID: {oid}={hex(oid)}\n" f" VALUE: {value} = {hex(int(value))}" ) - tag, response = self.snmp_mib(oid_string, label=label) + tag, response = self.fetch_oid_values(oid_string, label=label)[0] if response: logging.debug(" TAG: %s\n RESPONSE: %s", tag, repr(response)) if not self.dry_run and response and not ":OK;" in repr(response): @@ -1756,7 +1926,9 @@ class EpsonPrinter: f"SNMP_DUMP {name}:\n" f" ADDRESS: {oid}" ) - tag, result = self.snmp_mib(oid, label="get_snmp_info " + name) + tag, result = self.fetch_oid_values( + oid, label="get_snmp_info " + name + )[0] logging.debug(" TAG: %s\n RESPONSE: %s", tag, repr(result)) if name == "Power Off Timer" and result and result.find( @@ -1807,11 +1979,14 @@ class EpsonPrinter: left_val = val return left_val else: - return "".join( - chr(int(value or "0x3f", 16)) # "0x3f" --> "?" - for value in self.read_eeprom_many( - self.parm["serial_number"], label="serial_number") - ) + try: + return "".join( + chr(int(value or "0x3f", 16)) # "0x3f" --> "?" + for value in self.read_eeprom_many( + self.parm["serial_number"], label="serial_number") + ) + except Exception: + return None def get_printer_brand(self) -> str: """Return the producer name of the printer ("EPSON").""" @@ -1820,12 +1995,15 @@ class EpsonPrinter: return None if "brand_name" not in self.parm: return None - return ''.join( - [chr(int(i or "0x3f", 16)) - for i in self.read_eeprom_many( - self.parm["brand_name"], label="get_brand_name" - ) if i != '00'] - ) + try: + return ''.join( + [chr(int(i or "0x3f", 16)) + for i in self.read_eeprom_many( + self.parm["brand_name"], label="get_brand_name" + ) if i != '00'] + ) + except Exception: + return None def get_printer_model(self) -> str: """Return the model name of the printer.""" @@ -1834,12 +2012,15 @@ class EpsonPrinter: return None if "model_name" not in self.parm: return None - return ''.join( - [chr(int(i or "0x3f", 16)) - for i in self.read_eeprom_many( - self.parm["model_name"], label="get_model_name" - ) if i != '00'] - ) + try: + return ''.join( + [chr(int(i or "0x3f", 16)) + for i in self.read_eeprom_many( + self.parm["model_name"], label="get_model_name" + ) if i != '00'] + ) + except Exception: + return None def get_wifi_mac_address(self) -> str: """Return the WiFi MAC address of the printer.""" @@ -1855,7 +2036,7 @@ class EpsonPrinter: ) ) except Exception: - return False + return None def get_stats(self, stat_name: str = None) -> str: """Return printer statistics.""" @@ -1916,13 +2097,16 @@ class EpsonPrinter: Return firmware version. Query firmware version: 1.3.6.1.4.1.1248.1.2.2.44.1.1.2.1.118.105.1.0.0 """ - oid = f"{self.EEPROM_LINK}.118.105.1.0.0" # 76 69 01 00 00 + oid = self.epctrl_snmp_oid( + "vi", # This command stands for Version Information. + 0 + ) label = "get_firmware_version" logging.debug( f"SNMP_DUMP {label}:\n" f" ADDRESS: {oid}" ) - tag, firmware_string = self.snmp_mib(oid, label=label) + tag, firmware_string = self.fetch_oid_values(oid, label=label)[0] if not firmware_string: return None if self.invalid_response(firmware_string): @@ -1939,15 +2123,36 @@ class EpsonPrinter: return firmware + " " + datetime.datetime( year, month, day).strftime('%d %b %Y') + def get_device_identification(self) -> str: + oid = self.epctrl_snmp_oid("di", 1) # di = device identification + label = "get_device_identification" + logging.debug( + f"SNMP_DUMP {label}:\n" + f" ADDRESS: {oid}" + ) + tag, device_id = self.fetch_oid_values(oid, label=label)[0] + key_map = { + "MFG": "Manufacturer", + "CMD": "Commands", + "MDL": "Model", + "CLS": "Class", + "DES": "Description" + } + return { + key_map.get(k, k): [v for v in vals if v] + for i in device_id.decode()[10:].split(";") if i + for k, *vals in [i.split(":")] + } + def get_cartridges(self) -> str: """Return list of cartridge types.""" - oid = f"{self.EEPROM_LINK}.105.97.1.0.0" # 69 61 01 00 00 + oid = self.epctrl_snmp_oid("ia", 0) # ".105.97.1.0.0" # 69 61 01 00 00 (ink accessories) label = "get_cartridges" logging.debug( f"SNMP_DUMP {label}:\n" f" ADDRESS: {oid}" ) - tag, cartridges_string = self.snmp_mib(oid, label=label) + tag, cartridges_string = self.fetch_oid_values(oid, label=label)[0] if self.invalid_response(cartridges_string): logging.error( f"Invalid response for %s: '%s'", @@ -1992,9 +2197,11 @@ class EpsonPrinter: Query printer status: 1.3.6.1.4.1.1248.1.2.2.44.1.1.2.1.115.116.1.0.1 or 1.3.6.1.4.1.1248.1.2.2.1.1.1.4.1 """ - address = f"{self.EEPROM_LINK}.115.116.1.0.1" # 73 74 01 00 01 + address = self.epctrl_snmp_oid("st", 1) # ".115.116.1.0.1" # 73 74 01 00 01 (status) logging.debug(f"PRINTER_STATUS:\n ADDRESS: {address}") - tag, result = self.snmp_mib(address, label="get_printer_status") + tag, result = self.fetch_oid_values( + address, label="get_printer_status" + )[0] if not result: return None logging.debug(" TAG: %s\n RESPONSE: %s...\n%s", @@ -2039,23 +2246,26 @@ class EpsonPrinter: return None if "last_printer_fatal_errors" not in self.parm: return None - return self.read_eeprom_many( - self.parm["last_printer_fatal_errors"], - label="last_printer_fatal_errors" - ) + try: + return self.read_eeprom_many( + self.parm["last_printer_fatal_errors"], + label="last_printer_fatal_errors" + ) + except Exception: + return None def get_cartridge_information(self) -> str: """Return list of cartridge properties.""" response = [] for i in range(1, 9): - mib = f"{self.EEPROM_LINK}.105.105.2.0.1." + str(i) # 69 69 02 00 01 + mib = self.epctrl_snmp_oid("ii", b"\x01" + bytes([i])) # ".105.105.2.0.1." + str(i) # 69 69 02 00 01 (ink information) logging.debug( f"Cartridge {i}:\n" f" ADDRESS: {mib}" ) - tag, cartridge = self.snmp_mib( + tag, cartridge = self.fetch_oid_values( mib, label="get_cartridge_information" - ) + )[0] logging.debug(" TAG: %s\n RESPONSE: %s", tag, repr(cartridge)) if not cartridge: continue @@ -2153,16 +2363,34 @@ class EpsonPrinter: logging.error("Cartridge value error: %s.\n%s", e, cartridges) return None - def dump_eeprom(self, start: int = 0, end: int = 0xFF): + def dump_eeprom(self, start: int = 0, end: int = 0xFF) -> dict[int, int]: """ - Dump EEPROM data from start to end (less significant byte). + Dump EEPROM data from `start` to `end` (inclusive) in a single + parallel SNMP batch read. + + Returns a dict mapping each address → int value. If any read fails, + that address maps to None. """ - d = {} - for oid in range(start, end + 1): - d[oid] = int( - self.read_eeprom(oid, label="dump_eeprom") or "-0x1", - 16 - ) + # Build the list of OIDs + oids = list(range(start, end + 1)) + + # Fire one parallel batch read + # read_eeprom(list) now returns List[str|None] + hex_results = self.read_eeprom(oids, label="dump_eeprom") + + # If the batch call itself errored out (None), fall back or return empty + if hex_results is None: + # All failed; return empty or map everything to None + return {oid: None for oid in oids} + + # Map each hex‐string (or None) to an int (or None) + d: dict[int, int] = {} + for oid, hx in zip(oids, hex_results): + if hx is None: + d[oid] = None + else: + # hx is like "5A" → int("5A",16) + d[oid] = int(hx, 16) return d def update_parameter( @@ -2214,6 +2442,38 @@ class EpsonPrinter: return True return False + def epctrl_snmp_oid(self, command, payload): + """ + Build the full OID based on EPSON-CTRL D4 (END4) encapsulation + (IEEE1284.4 or Dot4 by Epson encapsulated into an SNMP OID) + http://osr507doc.xinuos.com/en/OSAdminG/OSAdminG_gimp/manual-html/gimpprint_37.html + """ + assert len(command) == 2 + if isinstance(payload, int): + payload = bytearray([payload]) + elif isinstance(payload, list): + payload = bytes(payload) + cmd = command.encode() + struct.pack(' bool: + """ + Thanks to https://codeberg.org/atufi/reinkpy/issues/12#issuecomment-1661250 + """ + serial = self.get_serial_number() + sha1 = hashlib.sha1(serial.encode()) + oid = self.epctrl_snmp_oid( + "rw", # This command stands for "reset waste". + b'\x01\x00' + # Unknown \x01\x00 (2 bytes) + sha1.digest() # Serial SHA1 hash. Always 20 bytes. + ) + if dry_run: + return True + answer = self.fetch_oid_values(oid, label="temp_reset_waste")[0] + return b"rw:01:OK;" in answer[1] + def reset_waste_ink_levels(self, dry_run=False) -> bool: """ Set waste ink levels to the values specified in the configuration. @@ -2329,6 +2589,8 @@ class EpsonPrinter: hex_bytes = self.read_eeprom_many( eeprom_range, label="detect_serial_number" ) + if hex_bytes is [None]: + return hex_bytes, None # Convert the hex bytes to characters sequence = ''.join(chr(int(byte, 16)) for byte in hex_bytes) # Serial number pattern (10 consecutive uppercase letters or digits) @@ -2898,15 +3160,14 @@ if __name__ == "__main__": ) if args.dump_eeprom: print_opt = True - for addr, val in printer.dump_eeprom( - int(ast.literal_eval(args.dump_eeprom[0])), - int(ast.literal_eval(args.dump_eeprom[1])) - ).items(): - print( - f"EEPROM_ADDR {hex(addr).rjust(4)} = " - f"{str(addr).rjust(3)}: " - f"{val:#04x} = {str(val).rjust(3)}" - ) + start = int(ast.literal_eval(args.dump_eeprom[0])) + end = int(ast.literal_eval(args.dump_eeprom[1])) + for addr, val in printer.dump_eeprom(start, end).items(): + if val is None: + disp_val = " --" + else: + disp_val = f"{val:#04x}" # 0x00 … 0xFF + print(f"EEPROM_ADDR 0x{addr:02X} = {addr:3d}: {disp_val}") if args.query: print_opt = True if ("stats" in printer.parm and diff --git a/requirements.txt b/requirements.txt index 0da1ba4..24c5435 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,6 @@ pyyaml -pyasn1==0.4.8 -git+https://github.com/etingof/pysnmp.git@master#egg=pysnmp -pyasyncore;python_version>="3.12" +pysnmp +pysnmp_sync_adapter tkcalendar pyperclip black diff --git a/ui.py b/ui.py index 75dfcf3..d9c6cbf 100644 --- a/ui.py +++ b/ui.py @@ -37,7 +37,7 @@ from find_printers import PrinterScanner from text_console import TextConsole -VERSION = "5.3.6" +VERSION = "6.0.0" NO_CONF_ERROR = ( " Please select a printer model and a valid IP address," @@ -126,12 +126,12 @@ class EpcTextConsole(TextConsole): "self.printer.model\n" "self.printer.reverse_caesar(b'Hpttzqjv')\n" 'self.printer.reverse_caesar(bytes.fromhex("48 62 7B 62 6F 6A 62 2B"))\n' - 'import pprint;pprint.pprint(self.printer.status_parser(self.printer.snmp_mib("1.3.6.1.4.1.1248.1.2.2.1.1.1.4.1")[1]))\n' + 'import pprint;pprint.pprint(self.printer.status_parser(self.printer.fetch_snmp_values("1.3.6.1.4.1.1248.1.2.2.1.1.1.4.1")[1]))\n' "self.printer.read_eeprom_many([0])\n" "self.printer.read_eeprom(0)\n" "self.printer.reset_waste_ink_levels()\n" - "self.printer.snmp_mib(self.printer.eeprom_oid_read_address(0))\n" - "self.printer.snmp_mib('1.3.6.1.4.1.1248.1.2.2.44.1.1.2.1.124.124.7.0.25.7.65.190.160.0.0')\n" + "self.printer.fetch_snmp_values(self.printer.eeprom_oid_read_address(0))\n" + "self.printer.fetch_snmp_values('1.3.6.1.4.1.1248.1.2.2.44.1.1.2.1.124.124.7.0.25.7.65.190.160.0.0')\n" "self.get_ti_date(cursor=True)" ) ) @@ -715,11 +715,11 @@ class EpsonPrinterUI(tk.Tk): row_n += 1 button_frame = ttk.Frame(main_frame, padding=PAD) button_frame.grid(row=row_n, column=0, pady=PADY, sticky=(tk.W, tk.E)) - button_frame.columnconfigure((0, 1, 2), weight=1) # expand columns + button_frame.columnconfigure((0, 1, 2, 3), weight=1) # expand columns # Query Printer Status self.status_button = ttk.Button( - button_frame, text="Printer Status", + button_frame, text="Printer\nStatus", command=self.printer_status, style="Centered.TButton" ) @@ -730,7 +730,7 @@ class EpsonPrinterUI(tk.Tk): # Query list of cartridge types self.web_interface_button = ttk.Button( button_frame, - text="Printer Web interface", + text="Printer\nWeb interface", command=self.web_interface, style="Centered.TButton" ) @@ -741,7 +741,7 @@ class EpsonPrinterUI(tk.Tk): # Detect configuration values self.detect_configuration_button = ttk.Button( button_frame, - text="Detect Configuration", + text="Detect\nConfiguration", command=self.detect_configuration, style="Centered.TButton" ) @@ -749,6 +749,17 @@ class EpsonPrinterUI(tk.Tk): row=0, column=2, padx=PADX, pady=PADX, sticky=(tk.W, tk.E) ) + # Temporary Reset Waste Ink Levels + self.detect_configuration_button = ttk.Button( + button_frame, + text="Temporary Reset\nWaste Ink Levels", + command=self.temp_reset_waste_ink, + style="Centered.TButton" + ) + self.detect_configuration_button.grid( + row=0, column=3, padx=PADX, pady=PADX, sticky=(tk.W, tk.E) + ) + # [row 4] Tweak Buttons row_n += 1 tweak_frame = ttk.Frame(main_frame, padding=PAD) @@ -1523,6 +1534,7 @@ Web site: https://github.com/Ircama/epson_print_conf self.update_idletasks() def get_current_eeprom_values(self, values, label): + values = list(values) try: org_values = ', '.join( "" if v is None else f"{k}: {int(v, 16)}" for k, v in zip( @@ -1759,7 +1771,9 @@ Web site: https://github.com/Ircama/epson_print_conf list_ser_num = [pr_ser_num] for i in list_ser_num: try: - if not self.get_current_eeprom_values(i, "Printer Serial Number"): + if not self.get_current_eeprom_values( + i, "Printer Serial Number" + ): self.config(cursor="") self.update_idletasks() return @@ -3010,6 +3024,63 @@ Web site: https://github.com/Ircama/epson_print_conf self.config(cursor="") self.update_idletasks() + def temp_reset_waste_ink(self, cursor=True): + if cursor: + self.config(cursor="watch") + self.update() + current_function_name = inspect.stack()[0][3] + method_to_call = getattr(self, current_function_name) + self.after(100, lambda: method_to_call(cursor=False)) + return + self.show_status_text_view() + ip_address = self.ip_var.get() + if ( + not self._is_valid_ip(ip_address) + or not self.printer + or not self.printer.parm + or "read_key" not in self.printer.parm + or "write_key" not in self.printer.parm + ): + self.status_text.insert(tk.END, '[ERROR]', "error") + self.status_text.insert(tk.END, NO_CONF_ERROR) + self.config(cursor="") + self.update_idletasks() + return + if not self.printer: + return + msg = ( + "Confirm Action", + "This feature temporarily bypasses the ink waste tank full warning," + " which would otherwise disable printing. " + "\n\nThis setting does not persist a reboot. " + "\n\nAre you sure you want to proceed?" + ) + response = messagebox.askyesno(*msg, default='no') + if response: + try: + if self.printer.temporary_reset_waste(): + self.status_text.insert(tk.END, '[INFO]', "info") + self.status_text.insert( + tk.END, + " Waste ink levels have been temporarily reset." + " You can now print.\n" + ) + else: + self.status_text.insert(tk.END, '[ERROR]', "error") + self.status_text.insert( + tk.END, + " Failed to perform the temporary reset of the " + "waste ink levels." + ) + except Exception as e: + self.handle_printer_error(e) + else: + self.status_text.insert( + tk.END, f"[WARNING] Waste ink levels reset aborted.\n" + ) + self.config(cursor="") + self.update_idletasks() + def start_detect_printers(self): self.show_status_text_view() self.status_text.insert(tk.END, '[INFO]', "info")