Created
July 19, 2025 02:56
-
-
Save jakubriedl/713fdbd68123208ee4d57cafa8d0decb to your computer and use it in GitHub Desktop.
TinyBase ws sync message partitioning
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import type { Message as MessageType } from "tinybase/synchronizers" | |
| import { z } from "zod" | |
| const UNDEFINED = "\uFFFC" | |
| export const messageEnvelopeSchema = z.preprocess( | |
| (val) => { | |
| if (typeof val !== "string") return val | |
| const items = val.split("\n") | |
| return [...items.slice(0, 4), items.slice(4).join("\n")] | |
| }, | |
| z.tuple([ | |
| z.string(), // actorId | |
| z.preprocess((val) => (val === "" ? null : val), z.string().nullable()), // requestId | |
| z.preprocess( | |
| // chunk position and count | |
| (val) => (typeof val === "string" ? val.split("/") : val), | |
| z.tuple([z.coerce.number(), z.coerce.number()]), | |
| ), | |
| z.coerce.number(), // messageType | |
| z.string(), // chunk | |
| ]), | |
| ) | |
| export type MessageEnvelope = z.infer<typeof messageEnvelopeSchema> | |
| export type DecodedMessage = [ | |
| MessageEnvelope[0], | |
| MessageEnvelope[1], | |
| MessageEnvelope[2], | |
| MessageEnvelope[3], | |
| unknown, | |
| ] | |
| export const serializePayload = (obj: unknown): string => | |
| JSON.stringify(obj, (_key, value) => | |
| value === undefined ? UNDEFINED : value, | |
| ) | |
| export const deserializePayload = (str: string): any => | |
| JSON.parse(str, (_key, value) => (value === UNDEFINED ? undefined : value)) | |
| // message format | |
| // <actorId>\n<requestId>\n<chunkPosition>/<totalChunks>\n<messageType>\n<chunk> | |
| export const createMessages = ( | |
| actorId: string | null, | |
| requestId: string | null, | |
| messageType: MessageType, | |
| body: any, | |
| ): string[] => { | |
| const payload = serializePayload(body) | |
| // split payload into chunks of 1MB | |
| const chunks = [] | |
| // using more conservative 1MB = 1000 * 1000 bytes to allow for header and some overhead | |
| for (let i = 0; i < payload.length; i += 1000 * 1000) { | |
| chunks.push(payload.slice(i, i + 1000 * 1000)) | |
| } | |
| return chunks.map( | |
| (chunk, index) => | |
| `${actorId ?? ""}\n${requestId ?? ""}\n${index + 1}/${chunks.length}\n${messageType}\n${chunk}`, | |
| ) | |
| } | |
| export type MessageBuffer = { | |
| chunks: string[] | |
| timeout: NodeJS.Timeout | null | |
| resolves: ((value: any) => void)[] | |
| rejects: ((reason: any) => void)[] | |
| } | |
| export const unpartitionMessage = async ( | |
| message: string, | |
| messagesBuffer: Map<string, MessageBuffer>, | |
| ): Promise<[string, string | null, [number, number], number, any]> => { | |
| const [ | |
| targetActorId, | |
| requestId, | |
| [chunkPosition, totalChunks], | |
| messageType, | |
| chunk, | |
| ] = messageEnvelopeSchema.parse(message) | |
| if (totalChunks === 1) | |
| return [ | |
| targetActorId, | |
| requestId, | |
| [chunkPosition, totalChunks], | |
| messageType, | |
| deserializePayload(chunk), | |
| ] | |
| if (!requestId) { | |
| console.error("partitioned messages require requestId", message) | |
| return Promise.reject(new Error("partitioned messages require requestId")) | |
| } | |
| const messageBuffer = messagesBuffer.get(requestId) ?? { | |
| chunks: [], | |
| timeout: null, | |
| resolves: [] as ((value: any) => void)[], | |
| rejects: [] as ((reason: any) => void)[], | |
| } | |
| // add promise to the be resolved when all chunks are received | |
| const promise = new Promise<ReturnType<typeof unpartitionMessage>>( | |
| (resolve, reject) => { | |
| messageBuffer.resolves.push(resolve) | |
| messageBuffer.rejects.push(reject) | |
| }, | |
| ) | |
| // reset timeout on every chunk | |
| if (messageBuffer.timeout) clearTimeout(messageBuffer.timeout) | |
| // aggregate chunks and resolve if that's all | |
| messageBuffer.chunks[chunkPosition - 1] = chunk | |
| messagesBuffer.set(requestId, messageBuffer) | |
| if (messageBuffer.chunks.filter(Boolean).length === totalChunks) { | |
| messagesBuffer.delete(requestId) | |
| messageBuffer.resolves.forEach((resolve) => | |
| resolve([ | |
| targetActorId, | |
| requestId, | |
| [chunkPosition, totalChunks], | |
| messageType, | |
| deserializePayload(messageBuffer.chunks.join("")), | |
| ]), | |
| ) | |
| } else { | |
| // we don't have all chunks yet, so we need to wait for more, but give it 10 seconds or GC | |
| messageBuffer.timeout = setTimeout(() => { | |
| messagesBuffer.delete(requestId) | |
| const err = new Error( | |
| `Websocket message buffer timeout. requestId:${requestId}`, | |
| ) | |
| messageBuffer.rejects.forEach((reject) => reject(err)) | |
| }, 10000) | |
| } | |
| return promise | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // this is just sync relevant snippet from the DO integration as it's heavily customised otherwise | |
| const synchronizer = createCustomSynchronizer( | |
| this.store, | |
| (targetActorId, requestId, message, body) => | |
| createMessages(targetActorId, requestId, message, body).map((message) => | |
| this.handleMessage(SERVER_ACTOR_ID, message), | |
| ), | |
| (receive: Receive) => { | |
| this.forwardToServerActor = async (message: string) => { | |
| try { | |
| const [ | |
| targetActorId, | |
| requestId, | |
| [chunkPosition, totalChunks], | |
| messageType, | |
| payload, | |
| ] = await this.unpartitionMessage(message) | |
| // we want to run receive only once when we have all chunks | |
| // but the bufferMessage will resolve all messages | |
| if (chunkPosition !== totalChunks) return | |
| return receive(targetActorId, requestId, messageType, payload) | |
| } catch (e) { | |
| return | |
| } | |
| } | |
| }, | |
| () => {}, | |
| 1, | |
| ) | |
| private messagesBuffer = new Map<string, MessageBuffer>() | |
| private async unpartitionMessage(message: string): Promise<DecodedMessage> { | |
| return unpartitionMessage(message, this.messagesBuffer) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { MergeableStore } from "tinybase" | |
| import { | |
| createCustomSynchronizer, | |
| Message, | |
| Receive, | |
| Send, | |
| } from "tinybase/synchronizers" | |
| import { | |
| type createWsSynchronizer as createWsSynchronizerDecl, | |
| WebSocketTypes, | |
| WsSynchronizer, | |
| } from "tinybase/synchronizers/synchronizer-ws-client" | |
| import { createMessages, MessageBuffer, unpartitionMessage } from "./helpers" | |
| const MESSAGE = "message" | |
| const OPEN = "open" | |
| const ERROR = "error" | |
| const UTF8 = "utf8" | |
| type Id = string | |
| type IdOrNull = Id | null | |
| export const createWsSynchronizer = (async < | |
| WebSocketType extends WebSocketTypes, | |
| >( | |
| store: MergeableStore, | |
| webSocket: WebSocketType, | |
| requestTimeoutSeconds: number = 1, | |
| onSend?: Send, | |
| onReceive?: Receive, | |
| onIgnoredError?: (error: any) => void, | |
| ) => { | |
| const messagesBuffer = new Map<string, MessageBuffer>() | |
| const addEventListener = ( | |
| event: keyof WebSocketEventMap, | |
| handler: (...args: any[]) => void, | |
| ) => { | |
| webSocket.addEventListener(event, handler) | |
| return () => webSocket.removeEventListener(event, handler) | |
| } | |
| const registerReceive = (receive: Receive) => | |
| addEventListener(MESSAGE, async ({ data }) => { | |
| const [ | |
| sourceActorId, | |
| requestId, | |
| [chunkPosition, totalChunks], | |
| messageType, | |
| payload, | |
| ] = await unpartitionMessage(data.toString(UTF8), messagesBuffer) | |
| // we want to trigger receive only once when we have all chunks | |
| if (chunkPosition !== totalChunks) return | |
| return receive(sourceActorId, requestId, messageType, payload) | |
| }) | |
| const send = ( | |
| toClientId: IdOrNull, | |
| requestId: IdOrNull, | |
| messageType: Message, | |
| body: any, | |
| ): void => { | |
| const messages = createMessages(toClientId, requestId, messageType, body) | |
| for (const message of messages) webSocket.send(message) | |
| } | |
| const destroy = (): void => { | |
| webSocket.close() | |
| } | |
| const synchronizer = createCustomSynchronizer( | |
| store, | |
| send, | |
| registerReceive, | |
| destroy, | |
| requestTimeoutSeconds, | |
| onSend, | |
| onReceive, | |
| onIgnoredError, | |
| // @ts-ignore | |
| { getWebSocket: () => webSocket }, | |
| ) as WsSynchronizer<any> | |
| return new Promise((resolve) => { | |
| if (webSocket.readyState != webSocket.OPEN) { | |
| const onAttempt = (error?: any) => { | |
| if (error) { | |
| onIgnoredError?.(error) | |
| } | |
| removeOpenListener() | |
| removeErrorListener() | |
| resolve(synchronizer) | |
| } | |
| const removeOpenListener = addEventListener(OPEN, () => onAttempt()) | |
| const removeErrorListener = addEventListener(ERROR, onAttempt) | |
| } else { | |
| resolve(synchronizer) | |
| } | |
| }) | |
| }) as typeof createWsSynchronizerDecl |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment