This gist is used for the Pigeon embodiment project to get a video stream from a Misty robot and present it to pigeons.
Tasks:
- Get Misty camera feed
- Post Misty camera feed in tkinter button
- Pass tkinter button click X Y coords to Misty motion
| import requests | |
| import websockets | |
| from tkinter import Tk | |
| from tkinter import ttk | |
| from PIL import ImageTk, Image | |
| import io | |
| import threading | |
| import asyncio | |
| class MistyHelper: | |
| def __init__(self, ip: str) -> None: | |
| self.ip = ip | |
| self.stream = None | |
| def get_camera(self): | |
| """Get Misty's camera setting | |
| Returns: | |
| str: True or False | |
| """ | |
| camera = requests.get(f"http://{self.ip}/api/services/camera") | |
| return camera.json()["result"] | |
| def get_av(self): | |
| """Get Misty's AV setting | |
| Returns: | |
| str: True or False | |
| """ | |
| av = requests.get(f"http://{self.ip}/api/services/avstreaming") | |
| return av.json()["result"] | |
| def start_stream(self, port: int): | |
| """Start a misty streaming at port | |
| Args: | |
| port (int): The port to use for Misty's rtsp stream | |
| Returns: | |
| requests.Response: The response from Misty starting the stream | |
| """ | |
| if self.get_camera() != "True": | |
| # Enable streaming first | |
| print("Enabling streaming...") | |
| enablePath = f"http://{self.ip}/api/services/camera/enable" | |
| enableStream = requests.post(enablePath) | |
| print(f"Enabling: {enableStream.status_code} {enableStream.json()}") | |
| startPath = f"http://{self.ip}/api/videostreaming/start" | |
| startedStream = requests.post( | |
| startPath, | |
| params={ | |
| "Port": port, | |
| "Rotation": 90, | |
| "Width": 1280, | |
| "Height": 960, | |
| "Quality": 10, | |
| }, | |
| ) | |
| print(f"Starting stream: {startedStream.status_code} {startedStream.json()}") | |
| if startedStream.status_code == 200: | |
| print("Set stream") | |
| self.stream = f"ws://{self.ip}:{port}" | |
| return startedStream | |
| def stop_stream(self): | |
| """Stop Misty's stream | |
| Returns: | |
| requests.Response: The response from Misty stopping the stream | |
| """ | |
| path = f"http://{self.ip}/api/videostreaming/stop" | |
| stoppedStream = requests.post(path) | |
| if stoppedStream.status_code == 200: | |
| self.stream = None | |
| print(f"Stopped stream: {stoppedStream.status_code} {stoppedStream.json()}") | |
| return stoppedStream | |
| def drive(self, x, y): | |
| """Move the misty towards an x, y coordinage | |
| Args: | |
| x (int): The horizontal destination | |
| y (int): The vertical destination | |
| """ | |
| # TODO: Update calculations for x,y to velocities | |
| # TODO: Figure out which coordinates we would want to make the robot go backwards (since backwards is never shown) | |
| drivePath = f"http://{self.ip}/api/drive/time" | |
| driveResponse = requests.post( | |
| drivePath, | |
| params={ | |
| "LinearVelocity": y, | |
| "AngularVelocity": x, | |
| "timems": 1000, | |
| }, | |
| ) | |
| print(f"Drive response: {driveResponse.status_code} {driveResponse.json()}") | |
| return driveResponse | |
| class MistyPigeonApp: | |
| def __init__(self, root, misty_ip, ws_port): | |
| self.root = root | |
| self.root.title("Misty Piegon Control") | |
| self.root.bind("<Button 1>", self.button_click) | |
| # Create a button with a placeholder image | |
| self.placeholder_image = Image.new("RGB", (1280, 960), color="gray") | |
| self.tk_image = ImageTk.PhotoImage(self.placeholder_image) | |
| self.button = ttk.Button(root, image=self.tk_image) | |
| self.button.pack() | |
| self.misty_ip = misty_ip | |
| self.ws_port = ws_port | |
| self.connectedMisty = MistyHelper(misty_ip) | |
| self.started = self.connectedMisty.start_stream(ws_port) | |
| self.websocket_thread = threading.Thread(target=self.run_websocket_client) | |
| self.websocket_thread.daemon = True | |
| self.websocket_thread.start() | |
| async def receive_image(self): | |
| uri = f"ws://{self.misty_ip}:{self.ws_port}" # Replace with your WebSocket server URI | |
| async with websockets.connect(uri) as websocket: | |
| while True: | |
| # Receive image data from WebSocket | |
| image_data = await websocket.recv() | |
| # Convert the received data to an image | |
| image = Image.open(io.BytesIO(image_data)) | |
| # Update the button image | |
| self.tk_image = ImageTk.PhotoImage(image) | |
| self.button.config(image=self.tk_image) | |
| def run_websocket_client(self): | |
| asyncio.run(self.receive_image()) | |
| def button_click(self, eventorigin): | |
| self.connectedMisty.drive( | |
| (eventorigin.x - (1280 / 2)) / (1280 / 2) * 100, | |
| ((960 / 2) - eventorigin.y) / (960 / 2) * 100, | |
| ) | |
| if __name__ == "__main__": | |
| misty_ip = "192.168.0.50" | |
| ws_port = "5678" | |
| root = Tk() | |
| app = MistyPigeonApp(root, misty_ip, ws_port) | |
| root.mainloop() |