Created
November 24, 2025 21:22
-
-
Save deniska/7ca987351e17540d91028055a637ac42 to your computer and use it in GitHub Desktop.
Simple, untested, slow VNC server in python, displaying a static image
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import dataclasses | |
| from dataclasses import dataclass | |
| import asyncio | |
| import struct | |
| from PIL import Image | |
| VERSION = b'RFB 003.008\n' | |
| WIDTH = 800 | |
| HEIGHT = 600 | |
| PIXEL_FORMAT_STRUCT = '>BB??HHHBBBxxx' | |
| img = Image.open('image.png') | |
| @dataclass | |
| class PixelFormat: | |
| bits_per_pixel: int | |
| depth: int | |
| big_endian_flag: bool | |
| true_color_flag: bool | |
| red_max: int | |
| green_max: int | |
| blue_max: int | |
| red_shift: int | |
| green_shift: int | |
| blue_shift: int | |
| def to_bytes(self): | |
| return struct.pack(PIXEL_FORMAT_STRUCT, *dataclasses.astuple(self)) | |
| @classmethod | |
| def from_bytes(cls, b): | |
| return cls(*struct.unpack(PIXEL_FORMAT_STRUCT, b)) | |
| async def vncserver(reader, writer): | |
| # 7.1.1 ProtocolVersion | |
| writer.write(VERSION) | |
| await writer.drain() | |
| msg = await reader.readuntil(b'\n') | |
| if msg != VERSION: | |
| print(f'Disconnecting client, it sent us unknown message {msg}') | |
| writer.close() | |
| await writer.wait_closed() | |
| return | |
| print('Client gave us the correct version') | |
| # 7.1.2 Security Handshake | |
| writer.write(bytes([1, 1])) # one security method, no auth | |
| await writer.drain() | |
| print('Sent security methods') | |
| msg = await reader.readexactly(1) | |
| if msg != b'\x01': | |
| writer.close() | |
| await writer.wait_closed() | |
| print('Disconnecting client, he did not send us the supported method back') | |
| return | |
| print('Client gave us the expected security method') | |
| # 7.1.3 SecurityResult | |
| writer.write(b'\x00\x00\x00\x00') # SecurityResult OK | |
| await writer.drain() | |
| print('Sent SecurityResult ok') | |
| # 7.3.1 ClientInit | |
| msg = await reader.readexactly(1) | |
| print(f'Client gave us shared-flag = {msg} for ClientInit, we do not care either way') | |
| # 7.3.2 ServerInit | |
| msg = bytearray() | |
| msg += WIDTH.to_bytes(2, 'big') | |
| msg += HEIGHT.to_bytes(2, 'big') | |
| # PIXEL_FORMAT | |
| pixel_format = PixelFormat( | |
| bits_per_pixel=32, | |
| depth=24, | |
| big_endian_flag=False, | |
| true_color_flag=True, | |
| red_max=255, green_max=255, blue_max=255, | |
| red_shift=16, green_shift=8, blue_shift=0, | |
| ) | |
| msg += pixel_format.to_bytes() | |
| msg += u32str(b"simple vnc server") | |
| writer.write(msg) | |
| await writer.drain() | |
| print('Sent server init') | |
| while True: | |
| client_msg_type = await reader.readexactly(1) | |
| if client_msg_type == b'\x00': # SetPixelFormat | |
| print('Client wants to set format') | |
| msg = await reader.readexactly(3 + 16) | |
| pixel_format = PixelFormat.from_bytes(msg[3:]) | |
| print(f'{pixel_format = }') | |
| if (pixel_format.bits_per_pixel != 32 | |
| or pixel_format.depth != 24 | |
| or pixel_format.red_max != 255 | |
| or pixel_format.green_max != 255 | |
| or pixel_format.blue_max != 255 | |
| ): | |
| print('We have a skill issue with the desired format') | |
| writer.close() | |
| await writer.wait_closed() | |
| return | |
| elif client_msg_type == b'\x02': # SetEncodings | |
| msg = await reader.readexactly(3) | |
| num_of_encodings = int.from_bytes(msg[1:], 'big') | |
| print(f'Client sent list of {num_of_encodings}') | |
| msg = await reader.readexactly(4 * num_of_encodings) | |
| encodings = [int.from_bytes(msg[i:i+4], 'big', signed=True) for i in range(0, len(msg), 4)] | |
| print(f'Encodings: {encodings}') | |
| if 0 not in encodings: | |
| print('No 0 encoding (raw), we have a skill issue') | |
| writer.close() | |
| await writer.wait_closed() | |
| return | |
| elif client_msg_type == b'\x03': # FramebufferUpdateRequest | |
| # We will just slurp the whole request, ignore its specifics, | |
| # and send back the whole screen | |
| msg = await reader.readexactly(1 + 2+2 + 2+2) | |
| # 7.6.1 FramebufferUpdate | |
| msg = bytearray([0, 0]) # message type, padding | |
| msg += b'\x00\x01' # number of rectangles | |
| msg += b'\x00\x00' # x | |
| msg += b'\x00\x00' # y | |
| msg += WIDTH.to_bytes(2, 'big') | |
| msg += HEIGHT.to_bytes(2, 'big') | |
| msg += b'\x00\x00\x00\x00' # encoding type, raw | |
| endianness = 'big' if pixel_format.big_endian_flag else 'little' | |
| # in python, this loop is ouch | |
| for y in range(HEIGHT): | |
| for x in range(WIDTH): | |
| px = 0 | |
| r, g, b = img.getpixel((x, y)) | |
| px |= r << pixel_format.red_shift | |
| px |= g << pixel_format.green_shift | |
| px |= b << pixel_format.blue_shift | |
| msg += px.to_bytes(4, endianness) | |
| writer.write(msg) | |
| await writer.drain() | |
| elif client_msg_type == b'\x04': # KeyEvent | |
| print('TODO: KeyEvent') | |
| msg = await reader.readexactly(1 + 2 + 4) | |
| elif client_msg_type == b'\x05': # PointerEvent | |
| print('TODO: PointerEvent') | |
| msg = await reader.readexactly(1 + 2 + 2) | |
| elif client_msg_type == b'\x06': # ClientCutText | |
| msg = await reader.readexactly(3 + 4) | |
| length = int.from_bytes(msg[3:], 'big') | |
| msg = await reader.readexactly(length) | |
| print(f'ClientCutText: {msg}') | |
| writer.close() | |
| await writer.wait_closed() | |
| print('Disconnected client') | |
| async def main(): | |
| print('Listening') | |
| server = await asyncio.start_server(vncserver, host='127.0.0.1', port=5900) | |
| await server.serve_forever() | |
| def u32str(b): | |
| return len(b).to_bytes(4, 'big') + b | |
| if __name__ == '__main__': | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment