Last active
March 15, 2026 15:42
-
-
Save cemizm/037f02827f5818baa77749876205943c to your computer and use it in GitHub Desktop.
Aqara FP300 Presence Sensor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Docs / install guide (custom ZHA quirks + FP300 example): | |
| # https://meshstack.de/post/home-assistant/zha-custom-quirks/ | |
| # | |
| # Upstream PR: https://github.com/zigpy/zha-device-handlers/pull/4504 | |
| # Tracking issue: https://github.com/zigpy/zha-device-handlers/issues/4487 | |
| """Quirk for Aqara lumi.sensor_occupy.agl8.""" | |
| import asyncio | |
| from contextlib import suppress | |
| from typing import Any, Final | |
| from zigpy import types as t | |
| from zigpy.quirks.v2 import QuirkBuilder, ReportingConfig | |
| from zigpy.quirks.v2.homeassistant import ( | |
| PERCENTAGE, | |
| EntityType, | |
| UnitOfLength, | |
| UnitOfTemperature, | |
| UnitOfTime, | |
| ) | |
| from zigpy.quirks.v2.homeassistant.binary_sensor import BinarySensorDeviceClass | |
| from zigpy.quirks.v2.homeassistant.number import NumberDeviceClass | |
| from zigpy.quirks.v2.homeassistant.sensor import SensorDeviceClass, SensorStateClass | |
| from zigpy.typing import UNDEFINED, UndefinedType | |
| from zigpy.zcl import foundation | |
| from zigpy.zcl.foundation import BaseAttributeDefs, DataTypeId, ZCLAttributeDef | |
| from zhaquirks import LocalDataCluster | |
| from zhaquirks.xiaomi import ( | |
| BATTERY_PERCENTAGE_REMAINING_ATTRIBUTE, | |
| BATTERY_VOLTAGE_MV, | |
| XiaomiAqaraE1Cluster, | |
| XiaomiPowerConfiguration, | |
| ) | |
| # Manufacturer-specific attribute keys present in the non-standard AQARA payloads | |
| AQARA_MANUFACTURER_CODE: Final = 0x115F | |
| MANU_ATTR_BATTERY_VOLTAGE: Final = "0xff01-23" | |
| MANU_ATTR_BATTERY_PERCENT: Final = "0xff01-24" | |
| # | |
| # Enums matching Zigbee2MQTT converter semantics | |
| # | |
| class MotionSensitivity(t.enum8): | |
| """Presence / motion sensitivity.""" | |
| LOW = 1 | |
| MEDIUM = 2 | |
| HIGH = 3 | |
| class PresenceDetectionMode(t.enum8): | |
| """Which sensors are used for presence.""" | |
| BOTH = 0 | |
| MMWAVE_ONLY = 1 | |
| PIR_ONLY = 2 | |
| class TempHumiditySampling(t.enum8): | |
| """Sampling frequency for temperature & humidity.""" | |
| OFF = 0 | |
| LOW = 1 | |
| MEDIUM = 2 | |
| HIGH = 3 | |
| CUSTOM = 4 | |
| class ReportMode(t.enum8): | |
| """Reporting mode for temp/humidity/illuminance in custom mode.""" | |
| THRESHOLD = 1 | |
| INTERVAL = 2 | |
| THRESHOLD_AND_INTERVAL = 3 | |
| class LightSampling(t.enum8): | |
| """Sampling frequency for illuminance.""" | |
| OFF = 0 | |
| LOW = 1 | |
| MEDIUM = 2 | |
| HIGH = 3 | |
| CUSTOM = 4 | |
| class FP300PowerConfigurationVoltage(XiaomiPowerConfiguration): | |
| """FP300 battery handling based on voltage-derived percentage. | |
| FP300 may report battery percentage directly, but this can stay at 100 for long | |
| periods. The implementation aligns with Z2M behavior and derives battery | |
| percentage from voltage. | |
| This is a trade-off: the direct device percentage can be smoother, while voltage | |
| can fluctuate with load/temperature but provides better progression in practice. | |
| """ | |
| MIN_VOLTS_MV = 2850 | |
| MAX_VOLTS_MV = 3000 | |
| def battery_reported(self, voltage_mv: int) -> None: | |
| """Handle FP300 battery voltage units for voltage + percentage updates. | |
| FP300 reports key 0xff01-23 in 0.01V units (e.g. 306 -> 3.06V). | |
| Displayed voltage follows the raw report while percentage is derived from | |
| mV to align with the configured voltage curve. | |
| """ | |
| self._update_attribute(self.BATTERY_VOLTAGE_ATTR, round(voltage_mv / 100, 1)) | |
| self._update_battery_percentage(voltage_mv * 10) | |
| def battery_percent_reported(self, battery_percent: int) -> None: | |
| """Ignore direct percentage report; use voltage-derived percentage only. | |
| The corresponding TLV key is still parsed and mapped by the FP300 quirk, | |
| but it is intentionally not used as the active battery source today. | |
| This keeps compatibility if firmware behavior changes later: switching | |
| back to direct percentage becomes a small policy change here, without | |
| reworking TLV parsing/mapping. | |
| """ | |
| # | |
| # Manufacturer specific cluster (0xFCC0) | |
| # | |
| class AqaraFP300ManuCluster(XiaomiAqaraE1Cluster): | |
| """Aqara FP300 manufacturer specific cluster (0xFCC0).""" | |
| class AttributeDefs(BaseAttributeDefs): | |
| """Attribute definitions for Aqara FP300 manu cluster.""" | |
| # | |
| # Presence / motion | |
| # | |
| presence: Final = ZCLAttributeDef( | |
| id=0x0142, | |
| type=t.Bool, | |
| zcl_type=DataTypeId.uint8, | |
| access="rp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| pir_detection: Final = ZCLAttributeDef( | |
| id=0x014D, | |
| type=t.Bool, | |
| zcl_type=DataTypeId.uint8, | |
| access="rp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| motion_sensitivity: Final = ZCLAttributeDef( | |
| id=0x010C, | |
| type=MotionSensitivity, | |
| zcl_type=DataTypeId.uint8, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| absence_delay_timer: Final = ZCLAttributeDef( | |
| id=0x0197, | |
| type=t.uint32_t, | |
| zcl_type=DataTypeId.uint32, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| pir_detection_interval: Final = ZCLAttributeDef( | |
| id=0x014F, | |
| type=t.uint16_t, | |
| zcl_type=DataTypeId.uint16, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| presence_detection_options: Final = ZCLAttributeDef( | |
| id=0x0199, | |
| type=PresenceDetectionMode, | |
| zcl_type=DataTypeId.uint8, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| detection_range_raw: Final = ZCLAttributeDef( | |
| id=0x019A, | |
| type=t.LVBytes, | |
| zcl_type=DataTypeId.octstr, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| # | |
| # AI helpers | |
| # | |
| ai_interference_source_selfidentification: Final = ZCLAttributeDef( | |
| id=0x015E, | |
| type=t.uint8_t, | |
| zcl_type=DataTypeId.uint8, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| ai_sensitivity_adaptive: Final = ZCLAttributeDef( | |
| id=0x015D, | |
| type=t.uint8_t, | |
| zcl_type=DataTypeId.uint8, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| # | |
| # Target distance / tracking | |
| # | |
| target_distance: Final = ZCLAttributeDef( | |
| id=0x015F, | |
| type=t.uint32_t, | |
| zcl_type=DataTypeId.uint32, | |
| access="rp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| track_target_distance: Final = ZCLAttributeDef( | |
| id=0x0198, | |
| type=t.uint8_t, | |
| zcl_type=DataTypeId.uint8, | |
| access="w", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| # | |
| # Temp/humidity sampling + reporting | |
| # | |
| temp_humidity_sampling: Final = ZCLAttributeDef( | |
| id=0x0170, | |
| type=TempHumiditySampling, | |
| zcl_type=DataTypeId.uint8, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| temp_humidity_sampling_period: Final = ZCLAttributeDef( | |
| id=0x0162, | |
| type=t.uint32_t, | |
| zcl_type=DataTypeId.uint32, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| temp_reporting_interval: Final = ZCLAttributeDef( | |
| id=0x0163, | |
| type=t.uint32_t, | |
| zcl_type=DataTypeId.uint32, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| temp_reporting_threshold: Final = ZCLAttributeDef( | |
| id=0x0164, | |
| type=t.uint16_t, | |
| zcl_type=DataTypeId.uint16, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| temp_reporting_mode: Final = ZCLAttributeDef( | |
| id=0x0165, | |
| type=ReportMode, | |
| zcl_type=DataTypeId.uint8, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| humidity_reporting_interval: Final = ZCLAttributeDef( | |
| id=0x016A, | |
| type=t.uint32_t, | |
| zcl_type=DataTypeId.uint32, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| humidity_reporting_threshold: Final = ZCLAttributeDef( | |
| id=0x016B, | |
| type=t.uint16_t, | |
| zcl_type=DataTypeId.uint16, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| humidity_reporting_mode: Final = ZCLAttributeDef( | |
| id=0x016C, | |
| type=ReportMode, | |
| zcl_type=DataTypeId.uint8, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| # | |
| # Illuminance sampling + reporting | |
| # | |
| light_sampling: Final = ZCLAttributeDef( | |
| id=0x0192, | |
| type=LightSampling, | |
| zcl_type=DataTypeId.uint8, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| light_sampling_period: Final = ZCLAttributeDef( | |
| id=0x0193, | |
| type=t.uint32_t, | |
| zcl_type=DataTypeId.uint32, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| light_reporting_interval: Final = ZCLAttributeDef( | |
| id=0x0194, | |
| type=t.uint32_t, | |
| zcl_type=DataTypeId.uint32, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| light_reporting_threshold: Final = ZCLAttributeDef( | |
| id=0x0195, | |
| type=t.uint16_t, | |
| zcl_type=DataTypeId.uint16, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| light_reporting_mode: Final = ZCLAttributeDef( | |
| id=0x0196, | |
| type=ReportMode, | |
| zcl_type=DataTypeId.uint8, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| # | |
| # LED behavior | |
| # | |
| led_disabled_night: Final = ZCLAttributeDef( | |
| id=0x0203, | |
| type=t.Bool, | |
| zcl_type=DataTypeId.bool_, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| # | |
| # Spatial learning / restart (FP1E-style maintenance actions) | |
| # | |
| spatial_learning: Final = ZCLAttributeDef( | |
| id=0x0157, | |
| type=t.uint8_t, | |
| zcl_type=DataTypeId.uint8, | |
| access="w", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| restart_device: Final = ZCLAttributeDef( | |
| id=0x00E8, | |
| type=t.Bool, | |
| zcl_type=DataTypeId.bool_, | |
| access="w", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| def _parse_aqara_attributes(self, value: Any) -> dict[str, Any]: | |
| """Parse Aqara TLV attributes and apply FP300-specific key mapping. | |
| Design decision: | |
| - The shared Xiaomi base parser should stay decode-only (generic TLV -> | |
| key/value extraction). | |
| - Device-specific interpretation belongs in derived classes. | |
| FP300 logic is intentionally not added as a switch/case branch in the | |
| base class, because that would couple unrelated Xiaomi devices, increase | |
| regression risk, and require central changes whenever one device key's | |
| semantics or firmware behavior changes. | |
| This keeps responsibilities clear: base class = transport/parsing, | |
| derived class = semantic mapping for that device. | |
| Note: mapping keeps both battery-related TLV keys (voltage + percentage). | |
| Current battery policy consumes voltage-derived | |
| percentage, but retaining the mapped direct-percent key keeps future | |
| firmware-policy switches simple. | |
| """ | |
| attributes = super()._parse_aqara_attributes(value) | |
| if MANU_ATTR_BATTERY_VOLTAGE in attributes: | |
| attributes[BATTERY_VOLTAGE_MV] = attributes.pop(MANU_ATTR_BATTERY_VOLTAGE) | |
| if MANU_ATTR_BATTERY_PERCENT in attributes: | |
| attributes[BATTERY_PERCENTAGE_REMAINING_ATTRIBUTE] = attributes.pop( | |
| MANU_ATTR_BATTERY_PERCENT | |
| ) | |
| return attributes | |
| def _update_attribute(self, attrid: int, value: Any) -> Any: | |
| """Mirror 0x019A to local range attrs, then run normal cluster update. | |
| Design decision: | |
| - Forwarding to `FP300DetectionRangeCluster` is done directly here to | |
| keep decode/derive behavior deterministic for report and write paths. | |
| - `super()._update_attribute()` does not return a status, so there is no | |
| meaningful return-value check before forwarding. | |
| - Attribute*Event listeners are intentionally not used as the primary | |
| bridge because runtime updates are split across different event types | |
| (written vs. reported/updated). Keeping the raw->derived transform inline | |
| here keeps the behavior explicit and avoids hidden listener-coupling complexity. | |
| """ | |
| if attrid == self.AttributeDefs.detection_range_raw.id: | |
| dr_cluster = self.endpoint.in_clusters.get( | |
| FP300DetectionRangeCluster.cluster_id | |
| ) | |
| if dr_cluster is not None: | |
| dr_cluster._update_from_raw(value) | |
| return super()._update_attribute(attrid, value) | |
| class FP300DetectionRangeCluster(LocalDataCluster): | |
| """Local cluster for FP300 detection-range handling. | |
| The device exposes detection range as one manufacturer-specific raw payload | |
| (0x019A, octet string with prefix + 24-bit mask). ZHA/HA benefits from a | |
| clearer configuration surface (six 1 m switches + numeric mask), while all | |
| writes still have to end up as that one raw attribute on the device. | |
| Design decision: | |
| - Keep device transport details in the manufacturer cluster. | |
| - Keep UI-oriented virtual attributes in a dedicated LocalDataCluster. | |
| - Local virtual attributes use explicit AQARA manufacturer_code to keep | |
| cache keys stable on this manufacturer-specific cluster during reload. | |
| Synthetic switch/mask attributes are intentionally not added to the | |
| manufacturer cluster or handle this only via builder converters, because | |
| that mixes unrelated responsibilities, makes read-modify-write behavior less | |
| explicit, and is harder to reason about for cache/state synchronization. | |
| """ | |
| cluster_id = 0xFC30 | |
| _PREFIX_VALUE: Final = 0x0300 | |
| _PREFIX_BYTES: Final = _PREFIX_VALUE.to_bytes(2, "little") | |
| _FULL_MASK: Final = (1 << 24) - 1 | |
| _SEGMENT_MASK: Final = (1 << 4) - 1 | |
| _RAW_ATTR_ID: Final = AqaraFP300ManuCluster.AttributeDefs.detection_range_raw.id | |
| class AttributeDefs(BaseAttributeDefs): | |
| """Attribute definitions for FP300 detection range cluster.""" | |
| prefix: Final = ZCLAttributeDef( | |
| id=0x0000, | |
| type=t.uint16_t, | |
| zcl_type=DataTypeId.uint16, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| range_0_1m: Final = ZCLAttributeDef( | |
| id=0x0001, | |
| type=t.Bool, | |
| zcl_type=DataTypeId.bool_, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| range_1_2m: Final = ZCLAttributeDef( | |
| id=0x0002, | |
| type=t.Bool, | |
| zcl_type=DataTypeId.bool_, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| range_2_3m: Final = ZCLAttributeDef( | |
| id=0x0003, | |
| type=t.Bool, | |
| zcl_type=DataTypeId.bool_, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| range_3_4m: Final = ZCLAttributeDef( | |
| id=0x0004, | |
| type=t.Bool, | |
| zcl_type=DataTypeId.bool_, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| range_4_5m: Final = ZCLAttributeDef( | |
| id=0x0005, | |
| type=t.Bool, | |
| zcl_type=DataTypeId.bool_, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| range_5_6m: Final = ZCLAttributeDef( | |
| id=0x0006, | |
| type=t.Bool, | |
| zcl_type=DataTypeId.bool_, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| detection_range_mask: Final = ZCLAttributeDef( | |
| id=0x0007, | |
| type=t.uint32_t, | |
| zcl_type=DataTypeId.uint32, | |
| access="rwp", | |
| manufacturer_code=AQARA_MANUFACTURER_CODE, | |
| ) | |
| _SEGMENTS: Final[tuple[tuple[int, int], ...]] = ( | |
| (AttributeDefs.range_0_1m.id, 0), | |
| (AttributeDefs.range_1_2m.id, 4), | |
| (AttributeDefs.range_2_3m.id, 8), | |
| (AttributeDefs.range_3_4m.id, 12), | |
| (AttributeDefs.range_4_5m.id, 16), | |
| (AttributeDefs.range_5_6m.id, 20), | |
| ) | |
| _MASK_ATTR_ID: Final = AttributeDefs.detection_range_mask.id | |
| def __init__(self, *args, **kwargs): | |
| """Init.""" | |
| super().__init__(*args, **kwargs) | |
| # Serialize read-modify-write updates to avoid lost updates when multiple | |
| # switch writes are issued concurrently. | |
| self._write_mutex = asyncio.Lock() | |
| def _update_from_raw(self, raw: t.LVBytes | bytes | bytearray | None) -> None: | |
| """Update local detection range from raw 0x019A buffer. | |
| The payload is interpreted as: | |
| - bytes 0..1: fixed prefix, little-endian uint16 | |
| - bytes 2..4: detection mask, little-endian 24-bit integer | |
| LE parsing is used to match observed device behavior and Z2M semantics. | |
| """ | |
| if isinstance(raw, (t.LVBytes, bytes, bytearray)): | |
| data = bytes(raw) | |
| else: | |
| data = b"" | |
| if len(data) >= 5: | |
| prefix = int.from_bytes(data[0:2], "little") | |
| mask = int.from_bytes(data[2:5], "little") & self._FULL_MASK | |
| else: | |
| prefix = self._PREFIX_VALUE | |
| mask = self._FULL_MASK | |
| super()._update_attribute(self.AttributeDefs.prefix.id, prefix) | |
| super()._update_attribute(self._MASK_ATTR_ID, mask) | |
| for attr_id, start_bit in self._SEGMENTS: | |
| seg_mask = self._SEGMENT_MASK << start_bit | |
| enabled = (mask & seg_mask) != 0 | |
| super()._update_attribute(attr_id, bool(enabled)) | |
| def _current_mask(self) -> int: | |
| """Return current 24-bit mask from cache, defaulting to fully enabled.""" | |
| return ( | |
| int(self._attr_cache.get(self._MASK_ATTR_ID, self._FULL_MASK)) | |
| & self._FULL_MASK | |
| ) | |
| def _resolve_mask(self, new_attrs: dict[int, Any]) -> int: | |
| """Resolve effective mask: direct mask wins; switches update only touched nibbles.""" | |
| if self._MASK_ATTR_ID in new_attrs: | |
| with suppress(TypeError, ValueError): | |
| return int(new_attrs[self._MASK_ATTR_ID]) & self._FULL_MASK | |
| new_mask = self._current_mask() | |
| for attr_id, start_bit in self._SEGMENTS: | |
| if attr_id not in new_attrs: | |
| continue | |
| nibble_mask = self._SEGMENT_MASK << start_bit | |
| if bool(new_attrs[attr_id]): | |
| new_mask |= nibble_mask | |
| else: | |
| new_mask &= ~nibble_mask | |
| return new_mask | |
| def _build_raw(self, mask: int) -> t.LVBytes: | |
| """Build raw 0x019A using fixed 16-bit LE prefix + resolved 24-bit mask.""" | |
| return t.LVBytes(self._PREFIX_BYTES + mask.to_bytes(3, "little")) | |
| def _segment_attrs_from_mask(self, mask: int) -> dict[int, bool]: | |
| """Build segment-switch values from a 24-bit detection-range mask.""" | |
| return { | |
| attr_id: bool(mask & (self._SEGMENT_MASK << start_bit)) | |
| for attr_id, start_bit in self._SEGMENTS | |
| } | |
| @staticmethod | |
| def _raw_write_succeeded(raw_result: Any, raw_attr_id: int) -> bool: | |
| """Return True if raw write confirms success for the raw target attribute. | |
| zigpy may return either: | |
| - global success (one record with attrid=None), or | |
| - per-attribute status records. | |
| """ | |
| records = raw_result[0] if isinstance(raw_result, list) and raw_result else [] | |
| if not records: | |
| return False | |
| if len(records) == 1 and records[0].attrid is None: | |
| return records[0].status == foundation.Status.SUCCESS | |
| return any( | |
| record.attrid == raw_attr_id and record.status == foundation.Status.SUCCESS | |
| for record in records | |
| ) | |
| async def write_attributes( | |
| self, | |
| attributes: dict[str | int | foundation.ZCLAttributeDef, Any], | |
| manufacturer: int | UndefinedType | None = UNDEFINED, | |
| **kwargs, | |
| ) -> list[list[foundation.WriteAttributesStatusRecord]]: | |
| """Write detection-range attrs by updating raw first, then local cache on success. | |
| Writes are intentionally serialized with a mutex because the operation is a | |
| read-modify-write sequence on one shared 24-bit mask. | |
| Alternatives considered: | |
| - Rejecting concurrent writes with ACTION_DENIED/INCONSISTENT avoids | |
| waiting but forces callers to retry and can drop rapid UI toggles. | |
| - Cancelling an in-flight write is unsafe because the Zigbee request may | |
| already be in flight or queued for a sleepy device. | |
| Unknown attributes are filtered instead of hard-failing. This mirrors | |
| LocalDataCluster behavior and keeps mixed/partial writes robust. | |
| Local virtual attrs are updated only after the raw device write succeeds. | |
| This avoids optimistic cache drift when the sleepy-device write times out. | |
| """ | |
| resolved_attrs: dict[int, Any] = {} | |
| for attr, value in attributes.items(): | |
| try: | |
| attrid = self.find_attribute(attr).id | |
| except KeyError: | |
| continue | |
| resolved_attrs[attrid] = value | |
| if not resolved_attrs: | |
| return await super().write_attributes( | |
| resolved_attrs, manufacturer=manufacturer, **kwargs | |
| ) | |
| async with self._write_mutex: | |
| new_mask = self._resolve_mask(resolved_attrs) | |
| raw = self._build_raw(new_mask) | |
| target_attrs = { | |
| self.AttributeDefs.prefix.id: self._PREFIX_VALUE, | |
| self._MASK_ATTR_ID: new_mask, | |
| } | |
| manu = self.endpoint.in_clusters.get(AqaraFP300ManuCluster.cluster_id) | |
| if manu is None: | |
| return [ | |
| [ | |
| foundation.WriteAttributesStatusRecord( | |
| foundation.Status.FAILURE, attrid=attr_id | |
| ) | |
| for attr_id in resolved_attrs | |
| ] | |
| ] | |
| raw_result = await manu.write_attributes( | |
| {AqaraFP300ManuCluster.AttributeDefs.detection_range_raw.id: raw}, | |
| manufacturer=manufacturer, | |
| ) | |
| if not self._raw_write_succeeded(raw_result, self._RAW_ATTR_ID): | |
| return raw_result | |
| target_attrs.update(self._segment_attrs_from_mask(new_mask)) | |
| return await super().write_attributes( | |
| target_attrs, manufacturer=manufacturer, **kwargs | |
| ) | |
| # | |
| # QuirkBuilder definition | |
| # | |
| FP300_QUIRK = ( | |
| QuirkBuilder("Aqara", "lumi.sensor_occupy.agl8") | |
| .friendly_name(manufacturer="Aqara", model="Presence Sensor FP300") | |
| .replaces(AqaraFP300ManuCluster) | |
| .adds(FP300PowerConfigurationVoltage) | |
| .adds(FP300DetectionRangeCluster) | |
| # Main occupancy entity (mmWave) | |
| .binary_sensor( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.presence.name, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| device_class=BinarySensorDeviceClass.OCCUPANCY, | |
| entity_type=EntityType.STANDARD, | |
| reporting_config=ReportingConfig( | |
| min_interval=1, | |
| max_interval=300, | |
| reportable_change=1, | |
| ), | |
| translation_key="occupancy", | |
| fallback_name="Occupancy", | |
| ) | |
| # Diagnostic PIR detection | |
| .binary_sensor( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.pir_detection.name, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| device_class=BinarySensorDeviceClass.MOTION, | |
| entity_type=EntityType.DIAGNOSTIC, | |
| reporting_config=ReportingConfig( | |
| min_interval=1, | |
| max_interval=300, | |
| reportable_change=1, | |
| ), | |
| initially_disabled=True, | |
| translation_key="pir_detection", | |
| fallback_name="PIR detection", | |
| ) | |
| # Target distance (from fp1eTargetDistance) | |
| .sensor( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.target_distance.name, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| device_class=SensorDeviceClass.DISTANCE, | |
| state_class=SensorStateClass.MEASUREMENT, | |
| unit=UnitOfLength.METERS, | |
| multiplier=0.01, # raw = meters * 100 | |
| entity_type=EntityType.DIAGNOSTIC, | |
| translation_key="target_distance", | |
| fallback_name="Target distance", | |
| ) | |
| # Button: start tracking current target distance | |
| .write_attr_button( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.track_target_distance.name, | |
| attribute_value=1, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| entity_type=EntityType.CONFIG, | |
| translation_key="track_target_distance", | |
| fallback_name="Start target distance tracking", | |
| ) | |
| # Motion / presence config | |
| .enum( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.motion_sensitivity.name, | |
| enum_class=MotionSensitivity, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| entity_type=EntityType.CONFIG, | |
| translation_key="motion_sensitivity", | |
| fallback_name="Motion sensitivity", | |
| ) | |
| .enum( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.presence_detection_options.name, | |
| enum_class=PresenceDetectionMode, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| entity_type=EntityType.CONFIG, | |
| translation_key="presence_detection_options", | |
| fallback_name="Presence detection options", | |
| ) | |
| .number( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.absence_delay_timer.name, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| device_class=NumberDeviceClass.DURATION, | |
| entity_type=EntityType.CONFIG, | |
| min_value=10, | |
| max_value=300, | |
| step=5, | |
| unit=UnitOfTime.SECONDS, | |
| translation_key="absence_delay_timer", | |
| fallback_name="Absence delay timer", | |
| ) | |
| .number( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.pir_detection_interval.name, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| device_class=NumberDeviceClass.DURATION, | |
| entity_type=EntityType.CONFIG, | |
| min_value=2, | |
| max_value=300, | |
| step=1, | |
| unit=UnitOfTime.SECONDS, | |
| translation_key="pir_detection_interval", | |
| fallback_name="PIR detection interval", | |
| ) | |
| # AI helper switches | |
| .switch( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.ai_interference_source_selfidentification.name, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| entity_type=EntityType.CONFIG, | |
| translation_key="ai_interference_source_selfidentification", | |
| fallback_name="AI interference source self-identification", | |
| ) | |
| .switch( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.ai_sensitivity_adaptive.name, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| entity_type=EntityType.CONFIG, | |
| translation_key="ai_sensitivity_adaptive", | |
| fallback_name="AI adaptive sensitivity", | |
| ) | |
| # Temp/humidity sampling & reporting | |
| .enum( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.temp_humidity_sampling.name, | |
| enum_class=TempHumiditySampling, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| entity_type=EntityType.CONFIG, | |
| translation_key="temp_humidity_sampling", | |
| fallback_name="Temp & humidity sampling", | |
| ) | |
| .number( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.temp_humidity_sampling_period.name, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| device_class=NumberDeviceClass.DURATION, | |
| entity_type=EntityType.CONFIG, | |
| min_value=0.5, | |
| max_value=3600.0, | |
| step=0.5, | |
| multiplier=0.001, # ms -> s | |
| unit=UnitOfTime.SECONDS, | |
| translation_key="temp_humidity_sampling_period", | |
| fallback_name="Temp & humidity sampling period", | |
| ) | |
| .number( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.temp_reporting_interval.name, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| device_class=NumberDeviceClass.DURATION, | |
| entity_type=EntityType.CONFIG, | |
| min_value=600, | |
| max_value=3600, | |
| step=600, | |
| multiplier=0.001, | |
| unit=UnitOfTime.SECONDS, | |
| translation_key="temp_reporting_interval", | |
| fallback_name="Temperature reporting interval", | |
| ) | |
| .number( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.temp_reporting_threshold.name, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| device_class=NumberDeviceClass.TEMPERATURE, | |
| entity_type=EntityType.CONFIG, | |
| min_value=0.2, | |
| max_value=3.0, | |
| step=0.1, | |
| multiplier=0.01, | |
| unit=UnitOfTemperature.CELSIUS, | |
| translation_key="temp_reporting_threshold", | |
| fallback_name="Temperature reporting threshold", | |
| ) | |
| .enum( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.temp_reporting_mode.name, | |
| enum_class=ReportMode, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| entity_type=EntityType.CONFIG, | |
| translation_key="temp_reporting_mode", | |
| fallback_name="Temperature reporting mode", | |
| ) | |
| .number( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.humidity_reporting_interval.name, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| device_class=NumberDeviceClass.DURATION, | |
| entity_type=EntityType.CONFIG, | |
| min_value=600, | |
| max_value=3600, | |
| step=600, | |
| multiplier=0.001, | |
| unit=UnitOfTime.SECONDS, | |
| translation_key="humidity_reporting_interval", | |
| fallback_name="Humidity reporting interval", | |
| ) | |
| .number( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.humidity_reporting_threshold.name, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| device_class=NumberDeviceClass.HUMIDITY, | |
| entity_type=EntityType.CONFIG, | |
| min_value=2.0, | |
| max_value=20.0, | |
| step=0.5, | |
| multiplier=0.01, | |
| unit=PERCENTAGE, | |
| translation_key="humidity_reporting_threshold", | |
| fallback_name="Humidity reporting threshold", | |
| ) | |
| .enum( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.humidity_reporting_mode.name, | |
| enum_class=ReportMode, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| entity_type=EntityType.CONFIG, | |
| translation_key="humidity_reporting_mode", | |
| fallback_name="Humidity reporting mode", | |
| ) | |
| # Illuminance sampling & reporting | |
| .enum( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.light_sampling.name, | |
| enum_class=LightSampling, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| entity_type=EntityType.CONFIG, | |
| translation_key="light_sampling", | |
| fallback_name="Light sampling", | |
| ) | |
| .number( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.light_sampling_period.name, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| device_class=NumberDeviceClass.DURATION, | |
| entity_type=EntityType.CONFIG, | |
| min_value=0.5, | |
| max_value=3600.0, | |
| step=0.5, | |
| multiplier=0.001, | |
| unit=UnitOfTime.SECONDS, | |
| translation_key="light_sampling_period", | |
| fallback_name="Light sampling period", | |
| ) | |
| .number( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.light_reporting_interval.name, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| device_class=NumberDeviceClass.DURATION, | |
| entity_type=EntityType.CONFIG, | |
| min_value=20, | |
| max_value=3600, | |
| step=20, | |
| multiplier=0.001, | |
| unit=UnitOfTime.SECONDS, | |
| translation_key="light_reporting_interval", | |
| fallback_name="Light reporting interval", | |
| ) | |
| .number( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.light_reporting_threshold.name, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| # Percentage change; omit device_class. | |
| entity_type=EntityType.CONFIG, | |
| min_value=3.0, | |
| max_value=20.0, | |
| step=0.5, | |
| multiplier=0.01, | |
| unit=PERCENTAGE, | |
| translation_key="light_reporting_threshold", | |
| fallback_name="Light reporting threshold", | |
| ) | |
| .enum( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.light_reporting_mode.name, | |
| enum_class=ReportMode, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| entity_type=EntityType.CONFIG, | |
| translation_key="light_reporting_mode", | |
| fallback_name="Light reporting mode", | |
| ) | |
| .switch( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.led_disabled_night.name, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| entity_type=EntityType.CONFIG, | |
| translation_key="led_disabled_night", | |
| fallback_name="LED disabled at night", | |
| ) | |
| # Maintenance buttons | |
| .write_attr_button( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.spatial_learning.name, | |
| attribute_value=1, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| entity_type=EntityType.CONFIG, | |
| translation_key="spatial_learning", | |
| fallback_name="Start spatial learning", | |
| ) | |
| .write_attr_button( | |
| attribute_name=AqaraFP300ManuCluster.AttributeDefs.restart_device.name, | |
| attribute_value=1, | |
| cluster_id=AqaraFP300ManuCluster.cluster_id, | |
| endpoint_id=1, | |
| entity_type=EntityType.CONFIG, | |
| translation_key="restart_device", | |
| fallback_name="Restart device", | |
| ) | |
| .number( | |
| attribute_name=FP300DetectionRangeCluster.AttributeDefs.detection_range_mask.name, | |
| cluster_id=FP300DetectionRangeCluster.cluster_id, | |
| endpoint_id=1, | |
| entity_type=EntityType.CONFIG, | |
| min_value=0, | |
| max_value=0xFFFFFF, | |
| step=1, | |
| mode="box", | |
| translation_key="detection_range_mask", | |
| fallback_name="Detection range mask", | |
| ) | |
| # Detection range switches | |
| .switch( | |
| attribute_name=FP300DetectionRangeCluster.AttributeDefs.range_0_1m.name, | |
| cluster_id=FP300DetectionRangeCluster.cluster_id, | |
| endpoint_id=1, | |
| entity_type=EntityType.CONFIG, | |
| translation_key="detection_range_0_1m", | |
| fallback_name="Detection range 0-1 m", | |
| ) | |
| .switch( | |
| attribute_name=FP300DetectionRangeCluster.AttributeDefs.range_1_2m.name, | |
| cluster_id=FP300DetectionRangeCluster.cluster_id, | |
| endpoint_id=1, | |
| entity_type=EntityType.CONFIG, | |
| translation_key="detection_range_1_2m", | |
| fallback_name="Detection range 1-2 m", | |
| ) | |
| .switch( | |
| attribute_name=FP300DetectionRangeCluster.AttributeDefs.range_2_3m.name, | |
| cluster_id=FP300DetectionRangeCluster.cluster_id, | |
| endpoint_id=1, | |
| entity_type=EntityType.CONFIG, | |
| translation_key="detection_range_2_3m", | |
| fallback_name="Detection range 2-3 m", | |
| ) | |
| .switch( | |
| attribute_name=FP300DetectionRangeCluster.AttributeDefs.range_3_4m.name, | |
| cluster_id=FP300DetectionRangeCluster.cluster_id, | |
| endpoint_id=1, | |
| entity_type=EntityType.CONFIG, | |
| translation_key="detection_range_3_4m", | |
| fallback_name="Detection range 3-4 m", | |
| ) | |
| .switch( | |
| attribute_name=FP300DetectionRangeCluster.AttributeDefs.range_4_5m.name, | |
| cluster_id=FP300DetectionRangeCluster.cluster_id, | |
| endpoint_id=1, | |
| entity_type=EntityType.CONFIG, | |
| translation_key="detection_range_4_5m", | |
| fallback_name="Detection range 4-5 m", | |
| ) | |
| .switch( | |
| attribute_name=FP300DetectionRangeCluster.AttributeDefs.range_5_6m.name, | |
| cluster_id=FP300DetectionRangeCluster.cluster_id, | |
| endpoint_id=1, | |
| entity_type=EntityType.CONFIG, | |
| translation_key="detection_range_5_6m", | |
| fallback_name="Detection range 5-6 m", | |
| ) | |
| .add_to_registry() | |
| ) |
Author
thanks for sharing works great
you're welcome, glad it works great.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
thanks for sharing works great