Created
January 20, 2026 14:24
-
-
Save Akkiesoft/14a459c9097b507c08e06d424266e4f4 to your computer and use it in GitHub Desktop.
Pico2WでHTLをストリーミングしてよだれさんのノート数をMicro Dot pHATで表示するやつ
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # HTLをストリーミングしてよだれさんのノート数を表示するやつ | |
| # Required hardware: | |
| # RPi Pico2W | |
| # uHAT Porter Pico (https://ssci.to/9364) | |
| # Pimoroni Micro Dot pHAT | |
| # Required software: | |
| # CircuitPython == 9.2.6 | |
| # akkie_wifi (https://github.com/Akkiesoft/akkiesoft-pico/blob/main/CircuitPython/cp_networks/networks/akkie_wifi.py) | |
| import board | |
| from busio import I2C | |
| from networks.akkie_wifi import akkie_wifi as network | |
| from akkie_wifi_config import ap_list | |
| from adafruit_requests import Session | |
| import websockets | |
| import json | |
| from microdotphat import MicroDotpHAT | |
| domain = 'misskey.io' | |
| token = '' | |
| # よだれさんのID | |
| target_user_id = '9h74ihid5q' | |
| # Akkiesoft | |
| target_user_id = '9b9xehmu2b' | |
| streaming_url = "wss://%s/streaming?i=%s" % (domain, token) | |
| bus = I2C(board.GP5, board.GP4) | |
| mdp = MicroDotpHAT(bus) | |
| network = network(ap_list) | |
| network.connect() | |
| requests = Session(network.pool, network.ssl_context) | |
| def show_notes_count(user_id): | |
| response = requests.post( | |
| 'https://%s/api/users/show' % domain, | |
| json={ "userId": user_id }) | |
| j = response.json() | |
| mdp.write_string("%6i" % j['notesCount'], kerning=False) | |
| if user_id == '9h74ihid5q': | |
| mdp.set_decimal(2, 1) | |
| mdp.show() | |
| message = json.dumps({ | |
| 'type': 'connect', | |
| 'body': { | |
| 'channel': 'homeTimeline', | |
| 'id': 'htl' | |
| } | |
| }) | |
| mdp.clear() | |
| show_notes_count(target_user_id) | |
| while True: | |
| try: | |
| show_notes_count(target_user_id) | |
| wsession = websockets.Session(network.pool, ssl=network.ssl_context) | |
| with wsession.client(streaming_url) as ws: | |
| ws.send(message) | |
| while True: | |
| data = json.loads(ws.recv()) | |
| user_id = data['body']['body']['user']['id'] | |
| print(user_id, target_user_id) | |
| if user_id == target_user_id: | |
| show_notes_count(target_user_id) | |
| except Exception as e: | |
| print("Error:", e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment