Created
January 21, 2026 16:30
-
-
Save taschik/ffebd3c685b4392a57bfa8d79a0d2ac1 to your computer and use it in GitHub Desktop.
ts0601 TZE284_rlytpmij ZHA Quirk
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Moes/Tuya TS0601 (_TZE284_rlytpmij) — ZHA quirk | |
| # Based on zigbee2mqtt converter with all sensors and improved reliability | |
| # Exposes: Climate, Floor Temperature, Valve State, and other sensors | |
| import logging | |
| from enum import Enum | |
| from typing import Any, Optional | |
| from zigpy.quirks import CustomDevice | |
| from zigpy.profiles import zha | |
| from zigpy.zcl.clusters.general import Basic, Identify, PowerConfiguration, Ota, Groups, Scenes, OnOff | |
| from zigpy.zcl.clusters.hvac import Thermostat, RunningState | |
| from zigpy.zcl.clusters.measurement import TemperatureMeasurement | |
| _LOGGER = logging.getLogger(__name__) | |
| # ---------- Try builder API (quirks v2) ---------- | |
| _builder_ok = False | |
| try: | |
| from zigpy.quirks.v2 import BinarySensorDeviceClass, EntityType | |
| from zigpy.quirks.v2.homeassistant.sensor import SensorDeviceClass, SensorStateClass | |
| from zigpy.quirks.v2.homeassistant import UnitOfTemperature | |
| from zigpy.types import t | |
| from zigpy.zcl import foundation | |
| from zhaquirks.tuya import TUYA_SET_TIME, TuyaTimePayload | |
| from zhaquirks.tuya.builder import TuyaQuirkBuilder | |
| from zhaquirks.tuya.mcu import TuyaAttributesCluster, TuyaMCUCluster | |
| _builder_ok = True | |
| except Exception as e: | |
| _LOGGER.warning("Builder API not available: %s", e) | |
| _builder_ok = False | |
| # ---------- Builder-based implementation ---------- | |
| if _builder_ok: | |
| # Preset enum for DP 2 (auto=0, manual=1, eco=2) | |
| class ThermostatPreset(t.enum8): | |
| """Thermostat preset modes.""" | |
| Auto = 0x00 | |
| Manual = 0x01 | |
| Eco = 0x02 | |
| class TuyaThermostatCluster(Thermostat, TuyaAttributesCluster): | |
| """Tuya Thermostat cluster - Heating only.""" | |
| _CONSTANT_ATTRIBUTES = { | |
| Thermostat.AttributeDefs.ctrl_sequence_of_oper.id: | |
| Thermostat.ControlSequenceOfOperation.Heating_Only | |
| } | |
| def __init__(self, *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| self.add_unsupported_attribute(Thermostat.AttributeDefs.setpoint_change_source.id) | |
| self.add_unsupported_attribute(Thermostat.AttributeDefs.setpoint_change_source_timestamp.id) | |
| self.add_unsupported_attribute(Thermostat.AttributeDefs.pi_heating_demand.id) | |
| class NoManufTimeNoVersionRespTuyaMCUCluster(TuyaMCUCluster): | |
| """Tuya MCU Cluster with fixed time handling.""" | |
| class ServerCommandDefs(TuyaMCUCluster.ServerCommandDefs): | |
| set_time = foundation.ZCLCommandDef( | |
| id=TUYA_SET_TIME, | |
| schema={"time": TuyaTimePayload}, | |
| is_manufacturer_specific=False, | |
| ) | |
| def handle_mcu_version_response(self, payload: TuyaMCUCluster.MCUVersion) -> foundation.Status: | |
| return foundation.Status.SUCCESS | |
| # Converter for running state - handles various value types | |
| # zigbee2mqtt: 0=heat, 1=idle | |
| def convert_running_state(value): | |
| """Convert DP 47 value to RunningState.""" | |
| _LOGGER.debug("DP47 running_state raw value: %s (type: %s)", value, type(value)) | |
| # Handle various representations of "heating" (value 0) | |
| if value in (0, False, "heat", "Heat", b'\x00'): | |
| _LOGGER.debug("DP47 -> Heat_State_On") | |
| return RunningState.Heat_State_On | |
| # Everything else is idle | |
| _LOGGER.debug("DP47 -> Idle") | |
| return RunningState.Idle | |
| # Build the quirk - core thermostat functionality | |
| ( | |
| TuyaQuirkBuilder("_TZE284_rlytpmij", "TS0601") | |
| # DP 1: System Mode (bool -> Heat/Off) | |
| .tuya_dp( | |
| dp_id=1, | |
| ep_attribute=TuyaThermostatCluster.ep_attribute, | |
| attribute_name=TuyaThermostatCluster.AttributeDefs.system_mode.name, | |
| converter=lambda x: Thermostat.SystemMode.Heat if x else Thermostat.SystemMode.Off, | |
| dp_converter=lambda x: x == Thermostat.SystemMode.Heat, | |
| ) | |
| # DP 111: Setpoint (0.1°C -> ZCL 0.01°C) | |
| .tuya_dp( | |
| dp_id=111, | |
| ep_attribute=TuyaThermostatCluster.ep_attribute, | |
| attribute_name=TuyaThermostatCluster.AttributeDefs.occupied_heating_setpoint.name, | |
| converter=lambda x: int(x * 10), | |
| dp_converter=lambda x: int(x // 10), | |
| ) | |
| # DP 117: Local temperature (0.1°C -> ZCL 0.01°C) | |
| .tuya_dp( | |
| dp_id=117, | |
| ep_attribute=TuyaThermostatCluster.ep_attribute, | |
| attribute_name=TuyaThermostatCluster.AttributeDefs.local_temperature.name, | |
| converter=lambda x: int(x * 10), | |
| ) | |
| # DP 47: Running state (0=heat, 1=idle per zigbee2mqtt) | |
| .tuya_dp( | |
| dp_id=47, | |
| ep_attribute=TuyaThermostatCluster.ep_attribute, | |
| attribute_name=TuyaThermostatCluster.AttributeDefs.running_state.name, | |
| converter=convert_running_state, | |
| ) | |
| # DP 101: Floor temperature sensor | |
| .tuya_sensor( | |
| dp_id=101, | |
| attribute_name="floor_temperature", | |
| type=t.int16s, | |
| divisor=10, | |
| device_class=SensorDeviceClass.TEMPERATURE, | |
| state_class=SensorStateClass.MEASUREMENT, | |
| unit=UnitOfTemperature.CELSIUS, | |
| translation_key="floor_temperature", | |
| fallback_name="Floor Temperature", | |
| ) | |
| # DP 39: Child lock switch | |
| .tuya_switch( | |
| dp_id=39, | |
| attribute_name="child_lock", | |
| translation_key="child_lock", | |
| fallback_name="Child Lock", | |
| entity_type=EntityType.CONFIG, | |
| ) | |
| # DP 103: Antifreeze switch | |
| .tuya_switch( | |
| dp_id=103, | |
| attribute_name="antifreeze", | |
| translation_key="antifreeze", | |
| fallback_name="Antifreeze", | |
| entity_type=EntityType.CONFIG, | |
| ) | |
| # DP 2: Preset mode as separate select entity (auto=0, manual=1, eco=2) | |
| .tuya_enum( | |
| dp_id=2, | |
| attribute_name="preset_mode", | |
| enum_class=ThermostatPreset, | |
| translation_key="preset_mode", | |
| fallback_name="Preset Mode", | |
| ) | |
| .adds(TuyaThermostatCluster) | |
| .skip_configuration() | |
| .add_to_registry(replacement_cluster=NoManufTimeNoVersionRespTuyaMCUCluster) | |
| ) | |
| else: | |
| # ---------- EF00 fallback (older environment) ---------- | |
| try: | |
| from zhaquirks.tuya import TuyaManufCluster | |
| except Exception as e: | |
| TuyaManufCluster = None | |
| _LOGGER.warning("TuyaManufCluster not available: %s", e) | |
| # DP mappings matching zigbee2mqtt | |
| DP_SYSTEM_MODE = 1 | |
| DP_PRESET = 2 | |
| DP_BACKLIGHT = 3 | |
| DP_CALIBRATION = 19 | |
| DP_SENSOR = 32 | |
| DP_CHILD_LOCK = 39 | |
| DP_TEMP_SCALE = 46 | |
| DP_RUNNING_STATE = 47 | |
| DP_FLOOR_TEMP = 101 | |
| DP_ANTIFREEZE = 103 | |
| DP_PROGRAM_MODE = 104 | |
| DP_DEADZONE = 106 | |
| DP_ECO_TEMP = 107 | |
| DP_SCHEDULE = 108 | |
| DP_SETPOINT = 111 | |
| DP_MAX_TEMP = 114 | |
| DP_MIN_TEMP = 116 | |
| DP_LOCAL_TEMP = 117 | |
| def _tuya_temp_to_zcl(v) -> int: | |
| """Convert Tuya temp (0.1°C) to ZCL (0.01°C).""" | |
| if isinstance(v, (bytes, bytearray)): | |
| try: | |
| v = int.from_bytes(v, "big") | |
| except Exception: | |
| v = 0 | |
| return int(v) * 10 | |
| class MoesTuyaCluster(TuyaManufCluster): | |
| """EF00 cluster with full DP parsing and ZCL mapping.""" | |
| cluster_id = 0xEF00 | |
| ep_attribute = "tuya_ef00" | |
| # State storage for additional sensors | |
| _floor_temperature: Optional[int] = None | |
| _valve_state: Optional[bool] = None | |
| _preset: Optional[str] = None | |
| _sensor_type: Optional[str] = None | |
| _child_lock: Optional[bool] = None | |
| _antifreeze: Optional[bool] = None | |
| _eco_temperature: Optional[int] = None | |
| _deadzone_temperature: Optional[int] = None | |
| _min_temp_limit: Optional[int] = None | |
| _max_temp_limit: Optional[int] = None | |
| _backlight: Optional[str] = None | |
| _program_mode: Optional[str] = None | |
| _temp_scale: Optional[str] = None | |
| @staticmethod | |
| def set_time(*args, **kwargs): | |
| return None | |
| @staticmethod | |
| def set_time_local_offset(*args, **kwargs): | |
| return None | |
| def handle_cluster_request(self, hdr, args, dst_addressing=None): | |
| if not hasattr(self, "set_time") or self.set_time is None: | |
| self.set_time = lambda *a, **kw: None | |
| if not hasattr(self, "set_time_local_offset") or self.set_time_local_offset is None: | |
| self.set_time_local_offset = lambda *a, **kw: None | |
| return super().handle_cluster_request(hdr, args, dst_addressing=dst_addressing) | |
| def _parse_dp_payload(self, data: bytes): | |
| """Parse Tuya DP payload.""" | |
| idx, ln, out = 0, len(data), [] | |
| while idx + 2 <= ln: | |
| dp_id = data[idx] | |
| dp_type = data[idx + 1] | |
| idx += 2 | |
| if idx >= ln: | |
| break | |
| length = data[idx] | |
| if (idx + 1 + length) > ln and (idx + 2) <= ln: | |
| length = (length << 8) | data[idx + 1] | |
| idx += 2 | |
| else: | |
| idx += 1 | |
| if idx + length > ln: | |
| break | |
| raw = data[idx:idx + length] | |
| idx += length | |
| if dp_type == 0x01: | |
| val = bool(raw[0]) if raw else False | |
| elif dp_type == 0x02: | |
| val = int.from_bytes(raw, "big", signed=(len(raw) == 2)) | |
| elif dp_type == 0x04: | |
| val = raw[0] if raw else 0 | |
| else: | |
| val = raw | |
| out.append((dp_id, dp_type, val, raw)) | |
| return out | |
| def _apply(self, dp_id, dp_type, val, raw): | |
| """Apply DP value to ZCL attribute.""" | |
| ep1 = getattr(self.endpoint.device, "endpoints", {}).get(1) | |
| thermo: Thermostat = getattr(ep1, "thermostat", None) if ep1 else None | |
| tmeas: TemperatureMeasurement = getattr(ep1, "temperature", None) if ep1 else None | |
| def up(cluster, attr, value): | |
| if cluster is None: | |
| return | |
| try: | |
| cluster._update_attribute(attr, value) | |
| _LOGGER.debug("Updated attr 0x%04X to %s", attr, value) | |
| except Exception as e: | |
| _LOGGER.debug("attr 0x%04X update failed: %s", attr, e) | |
| # System mode (DP 1) | |
| if dp_id == DP_SYSTEM_MODE and thermo: | |
| sys_mode = Thermostat.SystemMode.Heat if val else Thermostat.SystemMode.Off | |
| up(thermo, 0x001C, sys_mode) | |
| # Running state (DP 47) - zigbee2mqtt: 0=heat, 1=idle | |
| elif dp_id == DP_RUNNING_STATE and thermo: | |
| # Handle various representations of "heating" (value 0) | |
| heat_on = val in (0, False, "heat", "Heat", b'\x00') | |
| running = RunningState.Heat_State_On if heat_on else RunningState.Idle | |
| up(thermo, 0x0029, running) | |
| self._valve_state = heat_on | |
| _LOGGER.debug("DP47: val=%s (type=%s) -> running=%s, valve=%s", val, type(val), running, "OPEN" if heat_on else "CLOSED") | |
| # Floor temperature (DP 101) | |
| elif dp_id == DP_FLOOR_TEMP: | |
| self._floor_temperature = _tuya_temp_to_zcl(val) | |
| # Setpoint (DP 111) | |
| elif dp_id == DP_SETPOINT and thermo: | |
| up(thermo, 0x0012, _tuya_temp_to_zcl(val)) | |
| # Local temperature (DP 117) | |
| elif dp_id == DP_LOCAL_TEMP: | |
| temp = _tuya_temp_to_zcl(val) | |
| if thermo: | |
| up(thermo, 0x0000, temp) | |
| if tmeas: | |
| up(tmeas, 0x0000, temp) | |
| # Temperature calibration (DP 19) | |
| elif dp_id == DP_CALIBRATION and thermo: | |
| up(thermo, 0x0010, int(val)) | |
| # Preset (DP 2) | |
| elif dp_id == DP_PRESET: | |
| self._preset = {0: "auto", 1: "manual", 2: "eco"}.get(val, "manual") | |
| # Child lock (DP 39) | |
| elif dp_id == DP_CHILD_LOCK: | |
| self._child_lock = bool(val) | |
| # Antifreeze (DP 103) | |
| elif dp_id == DP_ANTIFREEZE: | |
| self._antifreeze = bool(val) | |
| # ECO temp (DP 107) | |
| elif dp_id == DP_ECO_TEMP: | |
| self._eco_temperature = int(val) | |
| # Deadzone (DP 106) | |
| elif dp_id == DP_DEADZONE: | |
| self._deadzone_temperature = _tuya_temp_to_zcl(val) | |
| # Min/Max temp limits (DP 116/114) | |
| elif dp_id == DP_MIN_TEMP: | |
| self._min_temp_limit = _tuya_temp_to_zcl(val) | |
| elif dp_id == DP_MAX_TEMP: | |
| self._max_temp_limit = _tuya_temp_to_zcl(val) | |
| def cluster_command(self, tsn, command_id, args): | |
| try: | |
| raw = bytes(args[0]) if args and isinstance(args[0], (bytes, bytearray)) else bytes(args or b"") | |
| payload = raw | |
| for cut in (0, 2, 4): | |
| test = raw[cut:] if len(raw) > cut else raw | |
| if self._parse_dp_payload(test): | |
| payload = test | |
| break | |
| for dp_id, dp_type, val, raw_dp in self._parse_dp_payload(payload): | |
| _LOGGER.debug("Tuya DP recv: id=%s type=%s val=%s", dp_id, dp_type, val) | |
| self._apply(dp_id, dp_type, val, raw_dp) | |
| except Exception as e: | |
| _LOGGER.exception("EF00 parse error: %s", e) | |
| try: | |
| return super().cluster_command(tsn, command_id, args) | |
| except Exception: | |
| return | |
| class MoesThermostat(CustomDevice): | |
| """Fallback device class for older ZHA versions.""" | |
| signature = { | |
| "models_info": [("_TZE284_rlytpmij", "TS0601")], | |
| "endpoints": { | |
| 1: { | |
| "profile_id": 0x0104, | |
| "device_type": 0x0051, | |
| "input_clusters": [0x0000, 0x0004, 0x0005, 0xED00, 0xEF00], | |
| "output_clusters": [0x000A, 0x0019], | |
| }, | |
| 242: { | |
| "profile_id": 0xA1E0, | |
| "device_type": 0x0061, | |
| "input_clusters": [], | |
| "output_clusters": [0x0021], | |
| }, | |
| }, | |
| } | |
| replacement = { | |
| "endpoints": { | |
| 1: { | |
| "profile_id": 0x0104, | |
| "device_type": 0x0301, | |
| "input_clusters": [ | |
| 0x0000, 0x0003, 0x0001, 0x0004, 0x0005, | |
| 0x0201, 0x0402, | |
| MoesTuyaCluster, | |
| ], | |
| "in_clusters": [ | |
| 0x0000, 0x0003, 0x0001, 0x0004, 0x0005, | |
| 0x0201, 0x0402, | |
| MoesTuyaCluster, | |
| ], | |
| "output_clusters": [0x0019], | |
| "out_clusters": [0x0019], | |
| }, | |
| 242: { | |
| "profile_id": 0xA1E0, | |
| "device_type": 0x0061, | |
| "input_clusters": [], | |
| "in_clusters": [], | |
| "output_clusters": [0x0021], | |
| "out_clusters": [0x0021], | |
| }, | |
| } | |
| } | |
| def __init__(self, *a, **kw): | |
| super().__init__(*a, **kw) | |
| self._patch_ef00() | |
| def setup(self): | |
| self._patch_ef00() | |
| def _patch_ef00(self): | |
| try: | |
| ep1 = self.endpoints.get(1) | |
| ef00 = None | |
| for attr in ("tuya_ef00", "tuya_cluster", "tuya_manufacturer", "manufacturer_specific"): | |
| ef00 = getattr(ep1, attr, None) | |
| if ef00 is not None: | |
| break | |
| if ef00 is None: | |
| return | |
| ef00.set_time = lambda *a, **kw: None | |
| ef00.set_time_local_offset = lambda *a, **kw: None | |
| except Exception as e: | |
| _LOGGER.debug("EF00 patch failed: %s", e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment