Skip to content

Instantly share code, notes, and snippets.

@alicilin
Created August 17, 2025 18:41
Show Gist options
  • Select an option

  • Save alicilin/0d51c46620bb8206e2f2b1de95b66266 to your computer and use it in GitHub Desktop.

Select an option

Save alicilin/0d51c46620bb8206e2f2b1de95b66266 to your computer and use it in GitHub Desktop.
Elasticsearch Connection Class for use in Bun.js (uses Bun fetch API)
//-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
//-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
import type { ConnectionRequestResponseAsStream } from '@elastic/transport/connection/BaseConnection'
import type { ConnectionRequestOptionsAsStream } from '@elastic/transport/connection/BaseConnection'
import type { ConnectionRequestResponse } from '@elastic/transport/connection/BaseConnection'
import type { ConnectionRequestOptions } from '@elastic/transport/connection/BaseConnection'
import type { ConnectionRequestParams } from '@elastic/transport/connection/BaseConnection'
import type { ConnectionOptions } from '@elastic/transport/connection/BaseConnection'
//-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
//-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
import { isBinary } from '@elastic/transport/connection/BaseConnection'
import { RequestAbortedError } from '@elastic/transport/errors'
import { ConnectionError } from '@elastic/transport/errors'
import { BaseConnection } from '@elastic/transport'
import { merge, isNil } from 'lodash'
import type { BodyInit } from 'bun'
import { constants } from 'buffer'
import { Buffer } from 'buffer'
//-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
//-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
interface BunRequestInit extends RequestInit {
decompress: boolean
}
//------------------------------------------------------------------------------------------------------------------------------------------------------------------------
//------------------------------------------------------------------------------------------------------------------------------------------------------------------------
const MAX_STRING_LENGTH = constants.MAX_STRING_LENGTH
const MAX_BUFFER_LENGTH = constants.MAX_LENGTH
const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/
const REPRGX = /^\?/ig
//-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
//-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
/**
* A connection to an Elasticsearch bun, managed by the fetch HTTP client library
*/
export class BunConnection extends BaseConnection {
constructor (opts: ConnectionOptions) {
super(opts)
}
override async request (params: ConnectionRequestParams, options: ConnectionRequestOptionsAsStream): Promise<ConnectionRequestResponseAsStream>
override async request (params: ConnectionRequestParams, options: ConnectionRequestOptions): Promise<ConnectionRequestResponse>
override async request (params: ConnectionRequestParams, options: any): Promise<any> {
let url = new URL(`${params.path}?${(params.querystring || '').replace(REPRGX, '')}`, this.url.href);
let maxCompressedResponseSize = options.maxCompressedResponseSize ?? MAX_BUFFER_LENGTH
let maxResponseSize = options.maxResponseSize ?? MAX_STRING_LENGTH
let requestInit: BunRequestInit = {
signal: options.signal ?? AbortSignal.timeout(options.timeout || this.timeout || 1000000),
headers: merge({}, this.headers, params.headers) as Record<string, string>,
method: params.method, body: params.body as BodyInit, keepalive: true,
decompress: false
}
//---------------------------------------------------------------------------------------------------------------------------------------------------------------
//---------------------------------------------------------------------------------------------------------------------------------------------------------------
if (INVALID_PATH_REGEX.test(params.path)) {
throw new TypeError(`ERR_UNESCAPED_CHARACTERS: ${params.path}`)
}
//---------------------------------------------------------------------------------------------------------------------------------------------------------------
//---------------------------------------------------------------------------------------------------------------------------------------------------------------
let response = await fetch(url.href, requestInit);
//---------------------------------------------------------------------------------------------------------------------------------------------------------------
//---------------------------------------------------------------------------------------------------------------------------------------------------------------
if (options.asStream === true) {
return {
statusCode: response.status,
headers: response.headers.toJSON(),
body: response.body
}
}
//---------------------------------------------------------------------------------------------------------------------------------------------------------------
//---------------------------------------------------------------------------------------------------------------------------------------------------------------
let contentEncoding = (response.headers.get('content-encoding') ?? '').toLowerCase()
let isCompressed = contentEncoding.includes('gzip') || contentEncoding.includes('deflate')
let bodyIsBinary = isBinary(response.headers.get('content-type') ?? '')
//---------------------------------------------------------------------------------------------------------------------------------------------------------------
//---------------------------------------------------------------------------------------------------------------------------------------------------------------
if (isNil(response.headers.get('content-length')) === false) {
let contentLength = Number(response.headers.get('content-length'))
if (isCompressed === true && contentLength > maxCompressedResponseSize) {
await response.body!.cancel()
throw new RequestAbortedError(`The content length (${contentLength}) is bigger than the maximum allowed buffer (${maxCompressedResponseSize})`)
}
if (isCompressed === false && contentLength > maxResponseSize) {
await response.body!.cancel()
throw new RequestAbortedError(`The content length (${contentLength}) is bigger than the maximum allowed string (${maxResponseSize})`)
}
}
//---------------------------------------------------------------------------------------------------------------------------------------------------------------
//---------------------------------------------------------------------------------------------------------------------------------------------------------------
this.diagnostic.emit('deserialization', null, options)
//---------------------------------------------------------------------------------------------------------------------------------------------------------------
//---------------------------------------------------------------------------------------------------------------------------------------------------------------
try {
let isBinary = isCompressed || bodyIsBinary;
let decoder = new TextDecoder();
let chunks: Buffer[] = []
let currentLength = 0
//-----------------------------------------------------------------------------------------------------------------------------------------------------------
//-----------------------------------------------------------------------------------------------------------------------------------------------------------
for await (const chunk of response.body as ReadableStream<Uint8Array>) {
//-------------------------------------------------------------------------------------------------------------------------------------------------------
currentLength += isBinary ? chunk.byteLength : decoder.decode(chunk).length;
//-------------------------------------------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------------------------------------------
if (isBinary === true && currentLength > maxCompressedResponseSize) {
await response.body!.cancel()
throw new RequestAbortedError(`The content length (${currentLength}) is bigger than the maximum allowed buffer (${maxCompressedResponseSize})`)
}
//-------------------------------------------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------------------------------------------
if (isBinary === false && currentLength > maxResponseSize) {
await response.body!.cancel()
throw new RequestAbortedError(`The content length (${currentLength}) is bigger than the maximum allowed buffer (${maxResponseSize})`)
}
//-------------------------------------------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------------------------------------------
chunks.push(Buffer.from(chunk));
}
//-----------------------------------------------------------------------------------------------------------------------------------------------------------
//-----------------------------------------------------------------------------------------------------------------------------------------------------------
let concated = Buffer.concat(chunks)
//-----------------------------------------------------------------------------------------------------------------------------------------------------------
//-----------------------------------------------------------------------------------------------------------------------------------------------------------
return {
statusCode: response.status, headers: response.headers.toJSON(),
body: isBinary ? concated : concated.toString('utf-8')
}
//-----------------------------------------------------------------------------------------------------------------------------------------------------------
//-----------------------------------------------------------------------------------------------------------------------------------------------------------
} catch (err: any) {
if (err.name === 'RequestAbortedError') {
throw err
}
throw new ConnectionError(err.message)
}
}
override async close (): Promise<void> {
}
}
@alicilin
Copy link
Author

Example;

`import { BunConnection } from '@app/libs/es/BunConnection';
import { Client } from '@elastic/elasticsearch';
import { env } from 'bun';

export const es = new Client({
node: env.ELASTIC_HOST,
Connection: BunConnection
})`

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment