Created
August 17, 2025 18:41
-
-
Save alicilin/0d51c46620bb8206e2f2b1de95b66266 to your computer and use it in GitHub Desktop.
Elasticsearch Connection Class for use in Bun.js (uses Bun fetch API)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| //----------------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| //----------------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| import type { ConnectionRequestResponseAsStream } from '@elastic/transport/connection/BaseConnection' | |
| import type { ConnectionRequestOptionsAsStream } from '@elastic/transport/connection/BaseConnection' | |
| import type { ConnectionRequestResponse } from '@elastic/transport/connection/BaseConnection' | |
| import type { ConnectionRequestOptions } from '@elastic/transport/connection/BaseConnection' | |
| import type { ConnectionRequestParams } from '@elastic/transport/connection/BaseConnection' | |
| import type { ConnectionOptions } from '@elastic/transport/connection/BaseConnection' | |
| //----------------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| //----------------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| import { isBinary } from '@elastic/transport/connection/BaseConnection' | |
| import { RequestAbortedError } from '@elastic/transport/errors' | |
| import { ConnectionError } from '@elastic/transport/errors' | |
| import { BaseConnection } from '@elastic/transport' | |
| import { merge, isNil } from 'lodash' | |
| import type { BodyInit } from 'bun' | |
| import { constants } from 'buffer' | |
| import { Buffer } from 'buffer' | |
| //----------------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| //----------------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| interface BunRequestInit extends RequestInit { | |
| decompress: boolean | |
| } | |
| //------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | |
| //------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | |
| const MAX_STRING_LENGTH = constants.MAX_STRING_LENGTH | |
| const MAX_BUFFER_LENGTH = constants.MAX_LENGTH | |
| const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/ | |
| const REPRGX = /^\?/ig | |
| //----------------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| //----------------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| /** | |
| * A connection to an Elasticsearch bun, managed by the fetch HTTP client library | |
| */ | |
| export class BunConnection extends BaseConnection { | |
| constructor (opts: ConnectionOptions) { | |
| super(opts) | |
| } | |
| override async request (params: ConnectionRequestParams, options: ConnectionRequestOptionsAsStream): Promise<ConnectionRequestResponseAsStream> | |
| override async request (params: ConnectionRequestParams, options: ConnectionRequestOptions): Promise<ConnectionRequestResponse> | |
| override async request (params: ConnectionRequestParams, options: any): Promise<any> { | |
| let url = new URL(`${params.path}?${(params.querystring || '').replace(REPRGX, '')}`, this.url.href); | |
| let maxCompressedResponseSize = options.maxCompressedResponseSize ?? MAX_BUFFER_LENGTH | |
| let maxResponseSize = options.maxResponseSize ?? MAX_STRING_LENGTH | |
| let requestInit: BunRequestInit = { | |
| signal: options.signal ?? AbortSignal.timeout(options.timeout || this.timeout || 1000000), | |
| headers: merge({}, this.headers, params.headers) as Record<string, string>, | |
| method: params.method, body: params.body as BodyInit, keepalive: true, | |
| decompress: false | |
| } | |
| //--------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| //--------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| if (INVALID_PATH_REGEX.test(params.path)) { | |
| throw new TypeError(`ERR_UNESCAPED_CHARACTERS: ${params.path}`) | |
| } | |
| //--------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| //--------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| let response = await fetch(url.href, requestInit); | |
| //--------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| //--------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| if (options.asStream === true) { | |
| return { | |
| statusCode: response.status, | |
| headers: response.headers.toJSON(), | |
| body: response.body | |
| } | |
| } | |
| //--------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| //--------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| let contentEncoding = (response.headers.get('content-encoding') ?? '').toLowerCase() | |
| let isCompressed = contentEncoding.includes('gzip') || contentEncoding.includes('deflate') | |
| let bodyIsBinary = isBinary(response.headers.get('content-type') ?? '') | |
| //--------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| //--------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| if (isNil(response.headers.get('content-length')) === false) { | |
| let contentLength = Number(response.headers.get('content-length')) | |
| if (isCompressed === true && contentLength > maxCompressedResponseSize) { | |
| await response.body!.cancel() | |
| throw new RequestAbortedError(`The content length (${contentLength}) is bigger than the maximum allowed buffer (${maxCompressedResponseSize})`) | |
| } | |
| if (isCompressed === false && contentLength > maxResponseSize) { | |
| await response.body!.cancel() | |
| throw new RequestAbortedError(`The content length (${contentLength}) is bigger than the maximum allowed string (${maxResponseSize})`) | |
| } | |
| } | |
| //--------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| //--------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| this.diagnostic.emit('deserialization', null, options) | |
| //--------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| //--------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| try { | |
| let isBinary = isCompressed || bodyIsBinary; | |
| let decoder = new TextDecoder(); | |
| let chunks: Buffer[] = [] | |
| let currentLength = 0 | |
| //----------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| //----------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| for await (const chunk of response.body as ReadableStream<Uint8Array>) { | |
| //------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| currentLength += isBinary ? chunk.byteLength : decoder.decode(chunk).length; | |
| //------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| //------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| if (isBinary === true && currentLength > maxCompressedResponseSize) { | |
| await response.body!.cancel() | |
| throw new RequestAbortedError(`The content length (${currentLength}) is bigger than the maximum allowed buffer (${maxCompressedResponseSize})`) | |
| } | |
| //------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| //------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| if (isBinary === false && currentLength > maxResponseSize) { | |
| await response.body!.cancel() | |
| throw new RequestAbortedError(`The content length (${currentLength}) is bigger than the maximum allowed buffer (${maxResponseSize})`) | |
| } | |
| //------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| //------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| chunks.push(Buffer.from(chunk)); | |
| } | |
| //----------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| //----------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| let concated = Buffer.concat(chunks) | |
| //----------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| //----------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| return { | |
| statusCode: response.status, headers: response.headers.toJSON(), | |
| body: isBinary ? concated : concated.toString('utf-8') | |
| } | |
| //----------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| //----------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| } catch (err: any) { | |
| if (err.name === 'RequestAbortedError') { | |
| throw err | |
| } | |
| throw new ConnectionError(err.message) | |
| } | |
| } | |
| override async close (): Promise<void> { | |
| } | |
| } |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Example;
`import { BunConnection } from '@app/libs/es/BunConnection';
import { Client } from '@elastic/elasticsearch';
import { env } from 'bun';
export const es = new Client({
node: env.ELASTIC_HOST,
Connection: BunConnection
})`