Skip to content

Instantly share code, notes, and snippets.

@jakubriedl
Created July 19, 2025 02:56
Show Gist options
  • Select an option

  • Save jakubriedl/713fdbd68123208ee4d57cafa8d0decb to your computer and use it in GitHub Desktop.

Select an option

Save jakubriedl/713fdbd68123208ee4d57cafa8d0decb to your computer and use it in GitHub Desktop.
TinyBase ws sync message partitioning
import type { Message as MessageType } from "tinybase/synchronizers"
import { z } from "zod"
const UNDEFINED = "\uFFFC"
export const messageEnvelopeSchema = z.preprocess(
(val) => {
if (typeof val !== "string") return val
const items = val.split("\n")
return [...items.slice(0, 4), items.slice(4).join("\n")]
},
z.tuple([
z.string(), // actorId
z.preprocess((val) => (val === "" ? null : val), z.string().nullable()), // requestId
z.preprocess(
// chunk position and count
(val) => (typeof val === "string" ? val.split("/") : val),
z.tuple([z.coerce.number(), z.coerce.number()]),
),
z.coerce.number(), // messageType
z.string(), // chunk
]),
)
export type MessageEnvelope = z.infer<typeof messageEnvelopeSchema>
export type DecodedMessage = [
MessageEnvelope[0],
MessageEnvelope[1],
MessageEnvelope[2],
MessageEnvelope[3],
unknown,
]
export const serializePayload = (obj: unknown): string =>
JSON.stringify(obj, (_key, value) =>
value === undefined ? UNDEFINED : value,
)
export const deserializePayload = (str: string): any =>
JSON.parse(str, (_key, value) => (value === UNDEFINED ? undefined : value))
// message format
// <actorId>\n<requestId>\n<chunkPosition>/<totalChunks>\n<messageType>\n<chunk>
export const createMessages = (
actorId: string | null,
requestId: string | null,
messageType: MessageType,
body: any,
): string[] => {
const payload = serializePayload(body)
// split payload into chunks of 1MB
const chunks = []
// using more conservative 1MB = 1000 * 1000 bytes to allow for header and some overhead
for (let i = 0; i < payload.length; i += 1000 * 1000) {
chunks.push(payload.slice(i, i + 1000 * 1000))
}
return chunks.map(
(chunk, index) =>
`${actorId ?? ""}\n${requestId ?? ""}\n${index + 1}/${chunks.length}\n${messageType}\n${chunk}`,
)
}
export type MessageBuffer = {
chunks: string[]
timeout: NodeJS.Timeout | null
resolves: ((value: any) => void)[]
rejects: ((reason: any) => void)[]
}
export const unpartitionMessage = async (
message: string,
messagesBuffer: Map<string, MessageBuffer>,
): Promise<[string, string | null, [number, number], number, any]> => {
const [
targetActorId,
requestId,
[chunkPosition, totalChunks],
messageType,
chunk,
] = messageEnvelopeSchema.parse(message)
if (totalChunks === 1)
return [
targetActorId,
requestId,
[chunkPosition, totalChunks],
messageType,
deserializePayload(chunk),
]
if (!requestId) {
console.error("partitioned messages require requestId", message)
return Promise.reject(new Error("partitioned messages require requestId"))
}
const messageBuffer = messagesBuffer.get(requestId) ?? {
chunks: [],
timeout: null,
resolves: [] as ((value: any) => void)[],
rejects: [] as ((reason: any) => void)[],
}
// add promise to the be resolved when all chunks are received
const promise = new Promise<ReturnType<typeof unpartitionMessage>>(
(resolve, reject) => {
messageBuffer.resolves.push(resolve)
messageBuffer.rejects.push(reject)
},
)
// reset timeout on every chunk
if (messageBuffer.timeout) clearTimeout(messageBuffer.timeout)
// aggregate chunks and resolve if that's all
messageBuffer.chunks[chunkPosition - 1] = chunk
messagesBuffer.set(requestId, messageBuffer)
if (messageBuffer.chunks.filter(Boolean).length === totalChunks) {
messagesBuffer.delete(requestId)
messageBuffer.resolves.forEach((resolve) =>
resolve([
targetActorId,
requestId,
[chunkPosition, totalChunks],
messageType,
deserializePayload(messageBuffer.chunks.join("")),
]),
)
} else {
// we don't have all chunks yet, so we need to wait for more, but give it 10 seconds or GC
messageBuffer.timeout = setTimeout(() => {
messagesBuffer.delete(requestId)
const err = new Error(
`Websocket message buffer timeout. requestId:${requestId}`,
)
messageBuffer.rejects.forEach((reject) => reject(err))
}, 10000)
}
return promise
}
// this is just sync relevant snippet from the DO integration as it's heavily customised otherwise
const synchronizer = createCustomSynchronizer(
this.store,
(targetActorId, requestId, message, body) =>
createMessages(targetActorId, requestId, message, body).map((message) =>
this.handleMessage(SERVER_ACTOR_ID, message),
),
(receive: Receive) => {
this.forwardToServerActor = async (message: string) => {
try {
const [
targetActorId,
requestId,
[chunkPosition, totalChunks],
messageType,
payload,
] = await this.unpartitionMessage(message)
// we want to run receive only once when we have all chunks
// but the bufferMessage will resolve all messages
if (chunkPosition !== totalChunks) return
return receive(targetActorId, requestId, messageType, payload)
} catch (e) {
return
}
}
},
() => {},
1,
)
private messagesBuffer = new Map<string, MessageBuffer>()
private async unpartitionMessage(message: string): Promise<DecodedMessage> {
return unpartitionMessage(message, this.messagesBuffer)
}
import { MergeableStore } from "tinybase"
import {
createCustomSynchronizer,
Message,
Receive,
Send,
} from "tinybase/synchronizers"
import {
type createWsSynchronizer as createWsSynchronizerDecl,
WebSocketTypes,
WsSynchronizer,
} from "tinybase/synchronizers/synchronizer-ws-client"
import { createMessages, MessageBuffer, unpartitionMessage } from "./helpers"
const MESSAGE = "message"
const OPEN = "open"
const ERROR = "error"
const UTF8 = "utf8"
type Id = string
type IdOrNull = Id | null
export const createWsSynchronizer = (async <
WebSocketType extends WebSocketTypes,
>(
store: MergeableStore,
webSocket: WebSocketType,
requestTimeoutSeconds: number = 1,
onSend?: Send,
onReceive?: Receive,
onIgnoredError?: (error: any) => void,
) => {
const messagesBuffer = new Map<string, MessageBuffer>()
const addEventListener = (
event: keyof WebSocketEventMap,
handler: (...args: any[]) => void,
) => {
webSocket.addEventListener(event, handler)
return () => webSocket.removeEventListener(event, handler)
}
const registerReceive = (receive: Receive) =>
addEventListener(MESSAGE, async ({ data }) => {
const [
sourceActorId,
requestId,
[chunkPosition, totalChunks],
messageType,
payload,
] = await unpartitionMessage(data.toString(UTF8), messagesBuffer)
// we want to trigger receive only once when we have all chunks
if (chunkPosition !== totalChunks) return
return receive(sourceActorId, requestId, messageType, payload)
})
const send = (
toClientId: IdOrNull,
requestId: IdOrNull,
messageType: Message,
body: any,
): void => {
const messages = createMessages(toClientId, requestId, messageType, body)
for (const message of messages) webSocket.send(message)
}
const destroy = (): void => {
webSocket.close()
}
const synchronizer = createCustomSynchronizer(
store,
send,
registerReceive,
destroy,
requestTimeoutSeconds,
onSend,
onReceive,
onIgnoredError,
// @ts-ignore
{ getWebSocket: () => webSocket },
) as WsSynchronizer<any>
return new Promise((resolve) => {
if (webSocket.readyState != webSocket.OPEN) {
const onAttempt = (error?: any) => {
if (error) {
onIgnoredError?.(error)
}
removeOpenListener()
removeErrorListener()
resolve(synchronizer)
}
const removeOpenListener = addEventListener(OPEN, () => onAttempt())
const removeErrorListener = addEventListener(ERROR, onAttempt)
} else {
resolve(synchronizer)
}
})
}) as typeof createWsSynchronizerDecl
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment