Skip to content

Instantly share code, notes, and snippets.

@taschik
Created January 21, 2026 16:30
Show Gist options
  • Select an option

  • Save taschik/ffebd3c685b4392a57bfa8d79a0d2ac1 to your computer and use it in GitHub Desktop.

Select an option

Save taschik/ffebd3c685b4392a57bfa8d79a0d2ac1 to your computer and use it in GitHub Desktop.
ts0601 TZE284_rlytpmij ZHA Quirk
# Moes/Tuya TS0601 (_TZE284_rlytpmij) — ZHA quirk
# Based on zigbee2mqtt converter with all sensors and improved reliability
# Exposes: Climate, Floor Temperature, Valve State, and other sensors
import logging
from enum import Enum
from typing import Any, Optional
from zigpy.quirks import CustomDevice
from zigpy.profiles import zha
from zigpy.zcl.clusters.general import Basic, Identify, PowerConfiguration, Ota, Groups, Scenes, OnOff
from zigpy.zcl.clusters.hvac import Thermostat, RunningState
from zigpy.zcl.clusters.measurement import TemperatureMeasurement
_LOGGER = logging.getLogger(__name__)
# ---------- Try builder API (quirks v2) ----------
_builder_ok = False
try:
from zigpy.quirks.v2 import BinarySensorDeviceClass, EntityType
from zigpy.quirks.v2.homeassistant.sensor import SensorDeviceClass, SensorStateClass
from zigpy.quirks.v2.homeassistant import UnitOfTemperature
from zigpy.types import t
from zigpy.zcl import foundation
from zhaquirks.tuya import TUYA_SET_TIME, TuyaTimePayload
from zhaquirks.tuya.builder import TuyaQuirkBuilder
from zhaquirks.tuya.mcu import TuyaAttributesCluster, TuyaMCUCluster
_builder_ok = True
except Exception as e:
_LOGGER.warning("Builder API not available: %s", e)
_builder_ok = False
# ---------- Builder-based implementation ----------
if _builder_ok:
# Preset enum for DP 2 (auto=0, manual=1, eco=2)
class ThermostatPreset(t.enum8):
"""Thermostat preset modes."""
Auto = 0x00
Manual = 0x01
Eco = 0x02
class TuyaThermostatCluster(Thermostat, TuyaAttributesCluster):
"""Tuya Thermostat cluster - Heating only."""
_CONSTANT_ATTRIBUTES = {
Thermostat.AttributeDefs.ctrl_sequence_of_oper.id:
Thermostat.ControlSequenceOfOperation.Heating_Only
}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.add_unsupported_attribute(Thermostat.AttributeDefs.setpoint_change_source.id)
self.add_unsupported_attribute(Thermostat.AttributeDefs.setpoint_change_source_timestamp.id)
self.add_unsupported_attribute(Thermostat.AttributeDefs.pi_heating_demand.id)
class NoManufTimeNoVersionRespTuyaMCUCluster(TuyaMCUCluster):
"""Tuya MCU Cluster with fixed time handling."""
class ServerCommandDefs(TuyaMCUCluster.ServerCommandDefs):
set_time = foundation.ZCLCommandDef(
id=TUYA_SET_TIME,
schema={"time": TuyaTimePayload},
is_manufacturer_specific=False,
)
def handle_mcu_version_response(self, payload: TuyaMCUCluster.MCUVersion) -> foundation.Status:
return foundation.Status.SUCCESS
# Converter for running state - handles various value types
# zigbee2mqtt: 0=heat, 1=idle
def convert_running_state(value):
"""Convert DP 47 value to RunningState."""
_LOGGER.debug("DP47 running_state raw value: %s (type: %s)", value, type(value))
# Handle various representations of "heating" (value 0)
if value in (0, False, "heat", "Heat", b'\x00'):
_LOGGER.debug("DP47 -> Heat_State_On")
return RunningState.Heat_State_On
# Everything else is idle
_LOGGER.debug("DP47 -> Idle")
return RunningState.Idle
# Build the quirk - core thermostat functionality
(
TuyaQuirkBuilder("_TZE284_rlytpmij", "TS0601")
# DP 1: System Mode (bool -> Heat/Off)
.tuya_dp(
dp_id=1,
ep_attribute=TuyaThermostatCluster.ep_attribute,
attribute_name=TuyaThermostatCluster.AttributeDefs.system_mode.name,
converter=lambda x: Thermostat.SystemMode.Heat if x else Thermostat.SystemMode.Off,
dp_converter=lambda x: x == Thermostat.SystemMode.Heat,
)
# DP 111: Setpoint (0.1°C -> ZCL 0.01°C)
.tuya_dp(
dp_id=111,
ep_attribute=TuyaThermostatCluster.ep_attribute,
attribute_name=TuyaThermostatCluster.AttributeDefs.occupied_heating_setpoint.name,
converter=lambda x: int(x * 10),
dp_converter=lambda x: int(x // 10),
)
# DP 117: Local temperature (0.1°C -> ZCL 0.01°C)
.tuya_dp(
dp_id=117,
ep_attribute=TuyaThermostatCluster.ep_attribute,
attribute_name=TuyaThermostatCluster.AttributeDefs.local_temperature.name,
converter=lambda x: int(x * 10),
)
# DP 47: Running state (0=heat, 1=idle per zigbee2mqtt)
.tuya_dp(
dp_id=47,
ep_attribute=TuyaThermostatCluster.ep_attribute,
attribute_name=TuyaThermostatCluster.AttributeDefs.running_state.name,
converter=convert_running_state,
)
# DP 101: Floor temperature sensor
.tuya_sensor(
dp_id=101,
attribute_name="floor_temperature",
type=t.int16s,
divisor=10,
device_class=SensorDeviceClass.TEMPERATURE,
state_class=SensorStateClass.MEASUREMENT,
unit=UnitOfTemperature.CELSIUS,
translation_key="floor_temperature",
fallback_name="Floor Temperature",
)
# DP 39: Child lock switch
.tuya_switch(
dp_id=39,
attribute_name="child_lock",
translation_key="child_lock",
fallback_name="Child Lock",
entity_type=EntityType.CONFIG,
)
# DP 103: Antifreeze switch
.tuya_switch(
dp_id=103,
attribute_name="antifreeze",
translation_key="antifreeze",
fallback_name="Antifreeze",
entity_type=EntityType.CONFIG,
)
# DP 2: Preset mode as separate select entity (auto=0, manual=1, eco=2)
.tuya_enum(
dp_id=2,
attribute_name="preset_mode",
enum_class=ThermostatPreset,
translation_key="preset_mode",
fallback_name="Preset Mode",
)
.adds(TuyaThermostatCluster)
.skip_configuration()
.add_to_registry(replacement_cluster=NoManufTimeNoVersionRespTuyaMCUCluster)
)
else:
# ---------- EF00 fallback (older environment) ----------
try:
from zhaquirks.tuya import TuyaManufCluster
except Exception as e:
TuyaManufCluster = None
_LOGGER.warning("TuyaManufCluster not available: %s", e)
# DP mappings matching zigbee2mqtt
DP_SYSTEM_MODE = 1
DP_PRESET = 2
DP_BACKLIGHT = 3
DP_CALIBRATION = 19
DP_SENSOR = 32
DP_CHILD_LOCK = 39
DP_TEMP_SCALE = 46
DP_RUNNING_STATE = 47
DP_FLOOR_TEMP = 101
DP_ANTIFREEZE = 103
DP_PROGRAM_MODE = 104
DP_DEADZONE = 106
DP_ECO_TEMP = 107
DP_SCHEDULE = 108
DP_SETPOINT = 111
DP_MAX_TEMP = 114
DP_MIN_TEMP = 116
DP_LOCAL_TEMP = 117
def _tuya_temp_to_zcl(v) -> int:
"""Convert Tuya temp (0.1°C) to ZCL (0.01°C)."""
if isinstance(v, (bytes, bytearray)):
try:
v = int.from_bytes(v, "big")
except Exception:
v = 0
return int(v) * 10
class MoesTuyaCluster(TuyaManufCluster):
"""EF00 cluster with full DP parsing and ZCL mapping."""
cluster_id = 0xEF00
ep_attribute = "tuya_ef00"
# State storage for additional sensors
_floor_temperature: Optional[int] = None
_valve_state: Optional[bool] = None
_preset: Optional[str] = None
_sensor_type: Optional[str] = None
_child_lock: Optional[bool] = None
_antifreeze: Optional[bool] = None
_eco_temperature: Optional[int] = None
_deadzone_temperature: Optional[int] = None
_min_temp_limit: Optional[int] = None
_max_temp_limit: Optional[int] = None
_backlight: Optional[str] = None
_program_mode: Optional[str] = None
_temp_scale: Optional[str] = None
@staticmethod
def set_time(*args, **kwargs):
return None
@staticmethod
def set_time_local_offset(*args, **kwargs):
return None
def handle_cluster_request(self, hdr, args, dst_addressing=None):
if not hasattr(self, "set_time") or self.set_time is None:
self.set_time = lambda *a, **kw: None
if not hasattr(self, "set_time_local_offset") or self.set_time_local_offset is None:
self.set_time_local_offset = lambda *a, **kw: None
return super().handle_cluster_request(hdr, args, dst_addressing=dst_addressing)
def _parse_dp_payload(self, data: bytes):
"""Parse Tuya DP payload."""
idx, ln, out = 0, len(data), []
while idx + 2 <= ln:
dp_id = data[idx]
dp_type = data[idx + 1]
idx += 2
if idx >= ln:
break
length = data[idx]
if (idx + 1 + length) > ln and (idx + 2) <= ln:
length = (length << 8) | data[idx + 1]
idx += 2
else:
idx += 1
if idx + length > ln:
break
raw = data[idx:idx + length]
idx += length
if dp_type == 0x01:
val = bool(raw[0]) if raw else False
elif dp_type == 0x02:
val = int.from_bytes(raw, "big", signed=(len(raw) == 2))
elif dp_type == 0x04:
val = raw[0] if raw else 0
else:
val = raw
out.append((dp_id, dp_type, val, raw))
return out
def _apply(self, dp_id, dp_type, val, raw):
"""Apply DP value to ZCL attribute."""
ep1 = getattr(self.endpoint.device, "endpoints", {}).get(1)
thermo: Thermostat = getattr(ep1, "thermostat", None) if ep1 else None
tmeas: TemperatureMeasurement = getattr(ep1, "temperature", None) if ep1 else None
def up(cluster, attr, value):
if cluster is None:
return
try:
cluster._update_attribute(attr, value)
_LOGGER.debug("Updated attr 0x%04X to %s", attr, value)
except Exception as e:
_LOGGER.debug("attr 0x%04X update failed: %s", attr, e)
# System mode (DP 1)
if dp_id == DP_SYSTEM_MODE and thermo:
sys_mode = Thermostat.SystemMode.Heat if val else Thermostat.SystemMode.Off
up(thermo, 0x001C, sys_mode)
# Running state (DP 47) - zigbee2mqtt: 0=heat, 1=idle
elif dp_id == DP_RUNNING_STATE and thermo:
# Handle various representations of "heating" (value 0)
heat_on = val in (0, False, "heat", "Heat", b'\x00')
running = RunningState.Heat_State_On if heat_on else RunningState.Idle
up(thermo, 0x0029, running)
self._valve_state = heat_on
_LOGGER.debug("DP47: val=%s (type=%s) -> running=%s, valve=%s", val, type(val), running, "OPEN" if heat_on else "CLOSED")
# Floor temperature (DP 101)
elif dp_id == DP_FLOOR_TEMP:
self._floor_temperature = _tuya_temp_to_zcl(val)
# Setpoint (DP 111)
elif dp_id == DP_SETPOINT and thermo:
up(thermo, 0x0012, _tuya_temp_to_zcl(val))
# Local temperature (DP 117)
elif dp_id == DP_LOCAL_TEMP:
temp = _tuya_temp_to_zcl(val)
if thermo:
up(thermo, 0x0000, temp)
if tmeas:
up(tmeas, 0x0000, temp)
# Temperature calibration (DP 19)
elif dp_id == DP_CALIBRATION and thermo:
up(thermo, 0x0010, int(val))
# Preset (DP 2)
elif dp_id == DP_PRESET:
self._preset = {0: "auto", 1: "manual", 2: "eco"}.get(val, "manual")
# Child lock (DP 39)
elif dp_id == DP_CHILD_LOCK:
self._child_lock = bool(val)
# Antifreeze (DP 103)
elif dp_id == DP_ANTIFREEZE:
self._antifreeze = bool(val)
# ECO temp (DP 107)
elif dp_id == DP_ECO_TEMP:
self._eco_temperature = int(val)
# Deadzone (DP 106)
elif dp_id == DP_DEADZONE:
self._deadzone_temperature = _tuya_temp_to_zcl(val)
# Min/Max temp limits (DP 116/114)
elif dp_id == DP_MIN_TEMP:
self._min_temp_limit = _tuya_temp_to_zcl(val)
elif dp_id == DP_MAX_TEMP:
self._max_temp_limit = _tuya_temp_to_zcl(val)
def cluster_command(self, tsn, command_id, args):
try:
raw = bytes(args[0]) if args and isinstance(args[0], (bytes, bytearray)) else bytes(args or b"")
payload = raw
for cut in (0, 2, 4):
test = raw[cut:] if len(raw) > cut else raw
if self._parse_dp_payload(test):
payload = test
break
for dp_id, dp_type, val, raw_dp in self._parse_dp_payload(payload):
_LOGGER.debug("Tuya DP recv: id=%s type=%s val=%s", dp_id, dp_type, val)
self._apply(dp_id, dp_type, val, raw_dp)
except Exception as e:
_LOGGER.exception("EF00 parse error: %s", e)
try:
return super().cluster_command(tsn, command_id, args)
except Exception:
return
class MoesThermostat(CustomDevice):
"""Fallback device class for older ZHA versions."""
signature = {
"models_info": [("_TZE284_rlytpmij", "TS0601")],
"endpoints": {
1: {
"profile_id": 0x0104,
"device_type": 0x0051,
"input_clusters": [0x0000, 0x0004, 0x0005, 0xED00, 0xEF00],
"output_clusters": [0x000A, 0x0019],
},
242: {
"profile_id": 0xA1E0,
"device_type": 0x0061,
"input_clusters": [],
"output_clusters": [0x0021],
},
},
}
replacement = {
"endpoints": {
1: {
"profile_id": 0x0104,
"device_type": 0x0301,
"input_clusters": [
0x0000, 0x0003, 0x0001, 0x0004, 0x0005,
0x0201, 0x0402,
MoesTuyaCluster,
],
"in_clusters": [
0x0000, 0x0003, 0x0001, 0x0004, 0x0005,
0x0201, 0x0402,
MoesTuyaCluster,
],
"output_clusters": [0x0019],
"out_clusters": [0x0019],
},
242: {
"profile_id": 0xA1E0,
"device_type": 0x0061,
"input_clusters": [],
"in_clusters": [],
"output_clusters": [0x0021],
"out_clusters": [0x0021],
},
}
}
def __init__(self, *a, **kw):
super().__init__(*a, **kw)
self._patch_ef00()
def setup(self):
self._patch_ef00()
def _patch_ef00(self):
try:
ep1 = self.endpoints.get(1)
ef00 = None
for attr in ("tuya_ef00", "tuya_cluster", "tuya_manufacturer", "manufacturer_specific"):
ef00 = getattr(ep1, attr, None)
if ef00 is not None:
break
if ef00 is None:
return
ef00.set_time = lambda *a, **kw: None
ef00.set_time_local_offset = lambda *a, **kw: None
except Exception as e:
_LOGGER.debug("EF00 patch failed: %s", e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment