Created
October 14, 2025 09:07
-
-
Save dreamer1048576/ccf1aa46bce4a22dd9d2f07a2b27948b to your computer and use it in GitHub Desktop.
Person Sensor - @micropython Module SourceCode - for OpenMV H7
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # PersonSensor.py | |
| #### Person Sensor *** v2 *** !! Not ```v1``` !! | |
| ##### Have to change I2C address to 0x44 on v1, 0x62 on v2 | |
| ## https://www.taiwansensor.com.tw/product/person-sensor-by-useful-sensors-%e5%be%ae%e5%9e%8b-ai-%e4%ba%ba%e8%87%89%e8%be%a8%e8%ad%98-%e4%ba%ba%e8%87%89%e8%bf%bd%e8%b9%a4-%e6%a8%a1%e7%b5%84-ai-%e5%85%8d%e8%a8%93%e7%b7%b4/ | |
| ## exam on OpenMV H7+ ; Pure I2C, No Interrupt Pin | |
| # ## https://github.com/moonshine-ai/person_sensor_circuit_python/blob/main/code.py | |
| ## ERR: `I2C read error: 'I2C' object has no attribute 'readfrom_into'` on OpenMV H7+ w/ pyb.I2C, not machine.I2C | |
| # Hardware Connect via OpenMV H7+ & Person Sensor v2 | |
| ## Person Sensor ; OpenMV H7+ | |
| ## VCC ; VDD(3.3V) | |
| ## GND ; GND | |
| ## SCL ; SCL2(P4/PB10) | |
| ## SDA ; SDA2(P5/PB11) | |
| ## INT ; NC | |
| ## ----------------------------------------------- | |
| import time | |
| import struct | |
| # Constants | |
| PERSON_SENSOR_I2C_ADDR = 0x62 ## v1.0 0x44 | |
| LED_REG = 0x07 | |
| TRIGGER_REG = 0x00 | |
| # Formats taken from CircuitPython example | |
| _PERSON_HEADER_FMT = "BBH" # pad1, pad2, payload_bytes | |
| _PERSON_HEADER_SIZE = struct.calcsize(_PERSON_HEADER_FMT) | |
| _PERSON_FACE_FMT = "BBBBBBbB" # box_conf, left, top, right, bottom, id_conf, id, is_facing | |
| _PERSON_FACE_SIZE = struct.calcsize(_PERSON_FACE_FMT) | |
| _PERSON_FACE_MAX = 4 | |
| # total packet: header + num_faces(1) + up to 4 faces + checksum(2) | |
| _PERSON_RESULT_FMT = _PERSON_HEADER_FMT + "B" + (_PERSON_FACE_FMT * _PERSON_FACE_MAX) + "H" | |
| _PERSON_RESULT_SIZE = struct.calcsize(_PERSON_RESULT_FMT) | |
| class PersonSensor: | |
| def __init__(self, i2c, addr=PERSON_SENSOR_I2C_ADDR, blink_times=2, on_val=1, off_val=0, blink_ms=120): | |
| self.i2c = i2c | |
| self.addr = addr | |
| # detect available I2C methods | |
| self._has_mem = hasattr(i2c, "mem_read") and hasattr(i2c, "mem_write") | |
| self._has_writeto_mem = hasattr(i2c, "writeto_mem") | |
| self._has_writeto = hasattr(i2c, "writeto") | |
| self._has_readinto = hasattr(i2c, "readinto") | |
| self._has_readfrom_into = hasattr(i2c, "readfrom_into") | |
| # buffer sized to expected full packet | |
| self._buf = bytearray(_PERSON_RESULT_SIZE) | |
| # blink LED on init; failures are printed but not fatal | |
| for _ in range(blink_times): | |
| try: | |
| self._write_reg(LED_REG, on_val) | |
| except Exception as e: | |
| print("LED ON failed:", e) | |
| time.sleep_ms(blink_ms) | |
| try: | |
| self._write_reg(LED_REG, off_val) | |
| except Exception as e: | |
| print("LED OFF failed:", e) | |
| time.sleep_ms(blink_ms) | |
| # low-level single-byte write to register | |
| def _write_reg(self, reg, value): | |
| if self._has_mem: | |
| return self.i2c.mem_write(value, self.addr, reg) | |
| if self._has_writeto_mem: | |
| return self.i2c.writeto_mem(self.addr, reg, bytes([value])) | |
| if self._has_writeto: | |
| return self.i2c.writeto(self.addr, bytes([reg, value])) | |
| raise RuntimeError("No supported I2C write method on this platform") | |
| # low-level read nbytes from register into a returned bytearray | |
| def _read_reg_into(self, reg, nbytes): | |
| if self._has_mem: | |
| return self.i2c.mem_read(nbytes, self.addr, reg) | |
| # fallback using writeto(reg) then readinto(buf) | |
| buf = bytearray(nbytes) | |
| if self._has_writeto and self._has_readinto: | |
| self.i2c.writeto(self.addr, bytes([reg])) | |
| self.i2c.readinto(buf) | |
| return buf | |
| if self._has_readfrom_into: | |
| self.i2c.readfrom_into(self.addr, buf) | |
| return buf | |
| raise RuntimeError("No supported I2C read method on this platform") | |
| # Original API: read() returns (num_faces, faces_list, checksum) | |
| def read(self): | |
| # trigger payload preparation | |
| try: | |
| self._write_reg(TRIGGER_REG, 1) | |
| except Exception as e: | |
| raise RuntimeError("trigger write failed: {}".format(e)) | |
| # small delay for sensor to prepare payload (matches CircuitPython behaviour) | |
| time.sleep_ms(20) | |
| # read full expected result into buffer | |
| try: | |
| data = self._read_reg_into(TRIGGER_REG, _PERSON_RESULT_SIZE) | |
| except Exception as e: | |
| raise RuntimeError("read failed: {}".format(e)) | |
| # ensure we have a mutable buffer for struct unpack_from | |
| if not isinstance(data, (bytearray, bytes)): | |
| data = bytearray(data) | |
| offset = 0 | |
| try: | |
| pad1, pad2, payload_bytes = struct.unpack_from(_PERSON_HEADER_FMT, data, offset) | |
| except struct.error: | |
| raise RuntimeError("header unpack failed") | |
| offset += _PERSON_HEADER_SIZE | |
| # single byte: num_faces | |
| try: | |
| (num_faces_raw,) = struct.unpack_from("B", data, offset) | |
| except struct.error: | |
| raise RuntimeError("num_faces unpack failed") | |
| num_faces = int(num_faces_raw) | |
| offset += 1 | |
| faces = [] | |
| for i in range(min(num_faces, _PERSON_FACE_MAX)): | |
| try: | |
| unpacked = struct.unpack_from(_PERSON_FACE_FMT, data, offset) | |
| except struct.error: | |
| break | |
| offset += _PERSON_FACE_SIZE | |
| (box_conf, left, top, right, bottom, id_conf, id_val, is_facing) = unpacked | |
| face = { | |
| "box_confidence": box_conf, | |
| "box_left": left, | |
| "box_top": top, | |
| "box_right": right, | |
| "box_bottom": bottom, | |
| "id_confidence": id_conf, | |
| "id": id_val, | |
| "is_facing": is_facing, | |
| } | |
| faces.append(face) | |
| # checksum (unsigned short) — parse if available | |
| checksum = None | |
| try: | |
| (checksum,) = struct.unpack_from("H", data, offset) | |
| except struct.error: | |
| checksum = None | |
| return num_faces, faces, checksum | |
| #################### | |
| # # Example usage (MicroPython / OpenMV M7/H7 style) | |
| # if __name__ == "__main__": | |
| # # For machine.I2C (OpenMV H7/H7+) | |
| # # from machine import I2C, Pin | |
| # # i2c = I2C(1, scl=Pin(22), sda=Pin(21), freq=100000) ## or freq=400000 | |
| # i2c = I2C(1, scl=Pin(22), sda=Pin(21), freq=100000) | |
| # | |
| # # For pyb.I2C (OpenMV M7) | |
| # # from pyb import I2C | |
| # # i2c = I2C(2, I2C.MASTER, baudrate=100000) ## or baudrate=400000 | |
| # | |
| # # Replace above with your platform's I2C init and pass the i2c instance below. | |
| # try: | |
| # i2c # assume i2c exists in caller context | |
| # except NameError: | |
| # raise SystemExit("Instantiate 'i2c' for your board before running this example") | |
| # | |
| # person = PersonSensor(i2c) | |
| # | |
| # while True: | |
| # try: | |
| # num, faces, checksum = person.read() | |
| # if (num > 0): | |
| # print("[person] faces:", num , ":", faces) | |
| # else: | |
| # print("[person] num_faces:", num) | |
| # except Exception as e: | |
| # print("I2C read error:", e) | |
| # time.sleep(PERSON_SENSOR_DELAY) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment