Skip to content

Instantly share code, notes, and snippets.

@manzke
Created May 26, 2025 19:55
Show Gist options
  • Select an option

  • Save manzke/37d69b338086fa50249308b61782b300 to your computer and use it in GitHub Desktop.

Select an option

Save manzke/37d69b338086fa50249308b61782b300 to your computer and use it in GitHub Desktop.
Ai-based OpenSearch Enricher
const fs = require('fs');
const path = require('path');
const { OpenAI } = require('openai');
const { Anthropic } = require('@anthropic-ai/sdk');
const { GoogleGenAI } = require('@google/genai');
const yargs = require('yargs/yargs');
const { hideBin } = require('yargs/helpers');
const dotenv = require('dotenv');
const { Client } = require('@opensearch-project/opensearch');
// Load environment variables - try multiple paths
const possibleEnvPaths = [
path.resolve(__dirname, '.env'), // .env in the same directory as the script
path.resolve(__dirname, '../.env'), // .env in the parent directory
path.resolve(process.cwd(), '.env') // .env in the current working directory
];
// Try loading from each possible path
let envLoaded = false;
for (const envPath of possibleEnvPaths) {
if (fs.existsSync(envPath)) {
console.log(`Loading environment variables from: ${envPath}`);
dotenv.config({ path: envPath });
envLoaded = true;
break;
}
}
if (!envLoaded) {
console.warn('Warning: No .env file found. Make sure to provide environment variables.');
}
// --- Shared Constants ---
const COMMON_ANALYSIS_PROMPT = `Analyze this text content comprehensively. Based on its content, provide the following:
1. A descriptive title (max 10 words).
2. A short description (2-3 sentences).
3. A comprehensive summary.
4. A list of the most important entities (maximum 25 entities). Focus on named entities like people, organizations, locations, products, and key concepts. Only include entities that are clearly important to understanding the content.
5. A possible document type (e.g., article, report, paper, news, blog post, documentation, etc.).
Please format your response as a JSON object with the following keys: 'title', 'description', 'summary', 'entities' (as a list of strings, max 25 items), and 'document_type'.`;
// --- Helper Functions ---
/**
* Parses JSON response text and handles potential errors
* @param {string} responseText - Response text to parse as JSON
* @param {string} serviceName - Name of the service for error reporting
* @returns {object} Parsed JSON object or error object
*/
function parseJsonResponse(responseText, serviceName) {
try {
// Remove code block markers if present
let cleaned = responseText.trim();
if (cleaned.startsWith('```')) {
cleaned = cleaned.replace(/^```[a-zA-Z]*\n?/, '').replace(/```$/, '').trim();
}
return JSON.parse(cleaned);
} catch (error) {
console.error(`Error decoding JSON from ${serviceName} response: ${error.message}`);
console.error(`Raw response text: ${responseText}`);
return {
title: `Error: Could not parse JSON response from ${serviceName}`,
description: responseText,
summary: '',
entities: [],
document_type: 'Error'
};
}
}
/**
* Create an OpenSearch client
* @returns {Client} OpenSearch client instance
*/
function createOpenSearchClient() {
const host = process.env.OPENSEARCH_HOST || 'localhost';
const port = process.env.OPENSEARCH_PORT || 9200;
const protocol = process.env.OPENSEARCH_PROTOCOL || 'http';
const username = process.env.OPENSEARCH_USERNAME;
const password = process.env.OPENSEARCH_PASSWORD;
const auth = username && password ? {
username,
password
} : null;
const ssl = protocol === 'https' ? {
rejectUnauthorized: false // Set to true in production with proper certs
} : null;
return new Client({
node: `${protocol}://${host}:${port}`,
auth,
ssl
});
}
/**
* Run a search query against OpenSearch
* @param {Client} client - OpenSearch client
* @param {string} index - Index to search
* @param {object} query - Query object
* @param {number} size - Number of results to return
* @returns {Promise<Array>} Search results
*/
async function searchOpenSearch(client, index, query, size = 100) {
try {
const response = await client.search({
index,
body: {
query,
size
}
});
if (response.body.hits && response.body.hits.hits) {
return response.body.hits.hits;
} else {
console.error('Unexpected response structure from OpenSearch');
return [];
}
} catch (error) {
console.error(`Error searching OpenSearch: ${error.message}`);
return [];
}
}
/**
* Update a document in OpenSearch
* @param {Client} client - OpenSearch client
* @param {string} index - Index to update
* @param {string} id - Document ID
* @param {object} doc - Document fields to update
* @returns {Promise<boolean>} Success status
*/
async function updateOpenSearchDocument(client, index, id, doc) {
try {
await client.update({
index,
id,
body: {
doc
}
});
return true;
} catch (error) {
console.error(`Error updating document ${id}: ${error.message}`);
return false;
}
}
// --- AI Service Interaction Functions ---
/**
* Analyzes text content using Google Gemini Pro
* @param {string} textContent - Text content to analyze
* @returns {Promise<object>} Analysis result
*/
async function analyzeWithGoogle(textContent) {
console.log(`Analyzing text content with Google Gemini Pro...`);
const apiKey = process.env.GOOGLE_API_KEY;
if (!apiKey) {
console.error("Error: GOOGLE_API_KEY not found in environment variables.");
return {};
}
try {
// Validate API key
if (!apiKey || apiKey.trim() === '') {
console.error("Error: GOOGLE_API_KEY is empty or invalid.");
return {};
}
// Initialize the Google GenAI SDK
const ai = new GoogleGenAI({ apiKey });
const modelName = 'gemini-2.5-flash-preview-05-20'; // 'gemini-2.0-flash';
console.log(`Using Google Gemini model: ${modelName}`);
// Use the SDK's generateContent method
const result = await ai.models.generateContent({
model: modelName,
contents: [
{
role: 'user',
parts: [
{ text: COMMON_ANALYSIS_PROMPT + "\n\nCONTENT TO ANALYZE:\n" + textContent }
]
}
]
});
const responseText = result.text;
if (responseText) {
try {
return parseJsonResponse(responseText, "Google Gemini");
} catch (parseError) {
console.error(`Error processing Gemini response: ${parseError.message}`);
console.error(`Raw response text: ${responseText}`);
return {
title: "Error parsing Gemini response",
description: "The response from Gemini could not be parsed as valid JSON.",
summary: responseText.substring(0, 200) + (responseText.length > 200 ? '...' : ''),
entities: [],
document_type: "Error"
};
}
} else {
console.error("Error: No valid text content in response from Google Gemini.");
return {};
}
} catch (error) {
console.error(`An unexpected error occurred in analyzeWithGoogle (Gemini): ${error.message}`);
return {};
}
}
/**
* Analyzes text content using OpenAI API on Azure
* @param {string} textContent - Text content to analyze
* @returns {Promise<object>} Analysis result
*/
async function analyzeWithOpenAiAzure(textContent) {
console.log(`Analyzing text content with OpenAI on Azure...`);
const azureEndpoint = process.env.AZURE_OPENAI_ENDPOINT;
const apiKey = process.env.AZURE_OPENAI_KEY;
const deploymentName = process.env.AZURE_OPENAI_DEPLOYMENT_NAME;
if (!azureEndpoint || !apiKey || !deploymentName) {
console.error("Error: Azure OpenAI environment variables (AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_KEY, AZURE_OPENAI_DEPLOYMENT_NAME) not fully set.");
return {};
}
try {
const client = new OpenAI({
azure: {
apiKey,
endpoint: azureEndpoint,
deploymentName,
},
apiVersion: '2024-02-01',
});
const response = await client.chat.completions.create({
model: deploymentName,
messages: [
{
role: 'user',
content: COMMON_ANALYSIS_PROMPT + "\n\nCONTENT TO ANALYZE:\n" + textContent
}
],
max_tokens: 2048
});
if (response.choices &&
response.choices[0] &&
response.choices[0].message &&
response.choices[0].message.content) {
const responseText = response.choices[0].message.content;
return parseJsonResponse(responseText, "OpenAI");
} else {
console.error("Error: Unexpected response structure from OpenAI.");
console.error(`Raw response: ${JSON.stringify(response)}`);
return {};
}
} catch (error) {
console.error(`An unexpected error occurred in analyzeWithOpenAiAzure: ${error.message}`);
if (error.status) {
console.error(`Status code: ${error.status}`);
}
if (error.response) {
console.error(`Response: ${JSON.stringify(error.response)}`);
}
return {};
}
}
/**
* Analyzes text content using Anthropic Claude API
* @param {string} textContent - Text content to analyze
* @returns {Promise<object>} Analysis result
*/
async function analyzeWithAnthropic(textContent) {
console.log(`Analyzing text content with Anthropic Claude...`);
const apiKey = process.env.ANTHROPIC_API_KEY;
if (!apiKey) {
console.error("Error: ANTHROPIC_API_KEY not found in environment variables.");
return {};
}
try {
const client = new Anthropic({ apiKey });
const message = await client.messages.create({
model: "claude-3-5-sonnet-20240620",
max_tokens: 2048,
messages: [
{
role: "user",
content: COMMON_ANALYSIS_PROMPT + "\n\nCONTENT TO ANALYZE:\n" + textContent
}
]
});
if (message.content &&
Array.isArray(message.content) &&
message.content[0] &&
message.content[0].text) {
const responseText = message.content[0].text;
return parseJsonResponse(responseText, "Anthropic");
} else {
console.error("Error: Unexpected response structure from Anthropic.");
console.error(`Raw response: ${JSON.stringify(message)}`);
return {};
}
} catch (error) {
console.error(`An unexpected error occurred in analyzeWithAnthropic: ${error.message}`);
if (error.status) {
console.error(`Status code: ${error.status}`);
}
return {};
}
}
/**
* Analyze OpenSearch documents and update them with AI-generated insights
* @param {object} options - Configuration options
*/
async function analyzeOpenSearchDocuments(options) {
const {
index,
query,
contentField,
service,
batchSize = 10,
dryRun = false,
maxContentLength = 15000, // Max content length parameter
minContentLength = 50, // Min content length parameter
overwrite = false // Add overwrite parameter
} = options;
console.log(`\n=== Analyzing OpenSearch documents in index '${index}' ===\n`);
// Create OpenSearch client
const client = createOpenSearchClient();
// Test connection
try {
const info = await client.info();
console.log(`Connected to OpenSearch version ${info.body.version.number}`);
} catch (error) {
console.error(`Failed to connect to OpenSearch: ${error.message}`);
return;
}
// Build the query
let searchQuery = { match_all: {} }; // Default query
if (query) {
try {
if (typeof query === 'string') {
// Assume it's a query string for the content field
searchQuery = {
query_string: {
query,
default_field: contentField
}
};
} else if (typeof query === 'object') {
// Assume it's already a properly formatted query
searchQuery = query;
}
} catch (error) {
console.error(`Error parsing query: ${error.message}`);
return;
}
}
// Get total count of matching documents
const countResponse = await client.count({
index,
body: {
query: searchQuery
}
});
const totalDocuments = countResponse.body.count;
console.log(`Found ${totalDocuments} matching documents`);
if (totalDocuments === 0) {
console.log("No documents to process.");
return;
}
// Process documents in batches
let processedCount = 0;
let updatedCount = 0;
let errorCount = 0;
let skippedCount = 0;
let tooShortCount = 0;
for (let from = 0; from < totalDocuments; from += batchSize) {
const size = Math.min(batchSize, totalDocuments - from);
console.log(`\nProcessing batch ${Math.floor(from/batchSize) + 1} (documents ${from+1}-${from+size} of ${totalDocuments})...`);
// Search for a batch of documents
const searchResponse = await client.search({
index,
body: {
query: searchQuery,
from,
size
}
});
const hits = searchResponse.body.hits.hits;
// Process each document in the batch
for (const hit of hits) {
processedCount++;
const documentId = hit._id;
const document = hit._source;
// Check if document has already been processed and should be skipped
if (!overwrite && document.summary_texts && Array.isArray(document.summary_texts) && document.summary_texts.length > 0) {
console.log(`Document ${documentId} already has summary_texts field, skipping. Use --overwrite to force processing.`);
skippedCount++;
continue;
}
// Skip if content field is missing
if (!document[contentField]) {
console.log(`Document ${documentId} has no '${contentField}' field, skipping...`);
errorCount++;
continue;
}
// Handle different content field types (string, array, object)
let content = document[contentField];
// If content is an array, join the elements
if (Array.isArray(content)) {
console.log(`Content field is an array with ${content.length} elements`);
content = content.join("\n");
}
// If content is an object, stringify it
else if (typeof content === 'object' && content !== null) {
console.log(`Content field is an object`);
content = JSON.stringify(content, null, 2);
}
// If it's already a string, use it directly
else if (typeof content === 'string') {
console.log(`Content field is a string`);
}
// For any other type, convert to string
else {
console.log(`Content field is type: ${typeof content}, converting to string`);
content = String(content);
}
console.log(`\nProcessing document ${documentId} (${processedCount}/${totalDocuments})...`);
console.log(`Content length: ${content.length} characters`);
if (content.length < 100) {
console.log(`Content is "${content}"`);
} else {
console.log(`Content preview: "${content.substring(0, 100)}..."`);
}
// Skip if content is too short to analyze meaningfully
if (content.length < minContentLength) {
console.log(`Content length (${content.length}) is below minimum threshold (${minContentLength}), skipping...`);
tooShortCount++;
continue;
}
// Truncate content if it's too long to avoid token limits
if (content.length > maxContentLength) {
console.log(`Content exceeds maximum length (${content.length} > ${maxContentLength}), truncating...`);
// Take the first and last portions to preserve context
const firstPart = content.substring(0, maxContentLength * 0.7);
const lastPart = content.substring(content.length - maxContentLength * 0.3);
content = firstPart + "\n\n[...content truncated for length...]\n\n" + lastPart;
console.log(`Truncated content length: ${content.length} characters`);
}
// Analyze with the selected AI service
let analysisResult;
try {
switch (service) {
case 'google':
analysisResult = await analyzeWithGoogle(content);
break;
case 'openai':
analysisResult = await analyzeWithOpenAiAzure(content);
break;
case 'anthropic':
analysisResult = await analyzeWithAnthropic(content);
break;
default:
console.error(`Error: Unknown service '${service}'`);
errorCount++;
continue;
}
} catch (error) {
console.error(`Error analyzing document ${documentId}: ${error.message}`);
errorCount++;
continue;
}
// Skip if analysis failed
if (!analysisResult || Object.keys(analysisResult).length === 0) {
console.log(`Analysis failed for document ${documentId}, skipping...`);
errorCount++;
continue;
}
// Create update document
const updateDoc = {
summary_texts: [analysisResult.summary || ""],
entities_labels: (analysisResult.entities || []).slice(0, 25) // Ensure max 25 entities
};
// Log if entities were limited
if (analysisResult.entities && analysisResult.entities.length > 25) {
console.log(`Limited entities from ${analysisResult.entities.length} to 25`);
}
// Add other fields if needed
if (options.includeTitle && analysisResult.title) {
updateDoc.title = analysisResult.title;
}
if (options.includeDescription && analysisResult.description) {
updateDoc.description_texts = [analysisResult.description];
}
if (options.includeDocumentType && analysisResult.document_type) {
updateDoc.documentType = analysisResult.document_type;
}
// Log the first 100 characters of the summary or the full summary if it's shorter
const summaryPreview = typeof updateDoc.summary_texts[0] === 'string' && updateDoc.summary_texts[0].length > 100
? updateDoc.summary_texts[0].substring(0, 100) + '...'
: updateDoc.summary_texts[0];
console.log(`Summary: ${summaryPreview}`);
console.log(`Entities: ${updateDoc.entities_labels.join(', ')}`);
// Update the document
if (!dryRun) {
const updateSuccess = await updateOpenSearchDocument(client, index, documentId, updateDoc);
if (updateSuccess) {
updatedCount++;
console.log(`Document ${documentId} updated successfully`);
} else {
errorCount++;
console.log(`Failed to update document ${documentId}`);
}
} else {
console.log(`[DRY RUN] Would update document ${documentId}`);
updatedCount++;
}
}
}
console.log(`\n=== Analysis Complete ===`);
console.log(`Total documents found: ${totalDocuments}`);
console.log(`Processed: ${processedCount}`);
console.log(`Skipped (already processed): ${skippedCount}`);
console.log(`Skipped (content too short): ${tooShortCount}`);
console.log(`Successfully updated: ${updatedCount}`);
console.log(`Errors: ${errorCount}`);
if (dryRun) {
console.log(`Note: This was a dry run. No documents were actually updated.`);
}
}
/**
* Main function to run the OpenSearch document analyzer
*/
async function main() {
// Parse command line arguments
const argv = yargs(hideBin(process.argv))
.usage('Usage: $0 [options]')
.option('index', {
alias: 'i',
describe: 'OpenSearch index to search',
type: 'string',
demandOption: true
})
.option('query', {
alias: 'q',
describe: 'Query string to filter documents',
type: 'string'
})
.option('content-field', {
alias: 'f',
describe: 'Field containing the content to analyze',
type: 'string',
default: 'content'
})
.option('service', {
alias: 's',
describe: 'AI service to use for analysis',
choices: ['google', 'openai', 'anthropic'],
demandOption: true
})
.option('batch-size', {
alias: 'b',
describe: 'Number of documents to process in each batch',
type: 'number',
default: 1
})
.option('max-content-length', {
alias: 'm',
describe: 'Maximum content length to send to AI service (characters)',
type: 'number',
default: 15000
})
.option('min-content-length', {
describe: 'Minimum content length required for analysis (characters)',
type: 'number',
default: 50
})
.option('include-title', {
describe: 'Include title field in document update',
type: 'boolean',
default: false
})
.option('include-description', {
describe: 'Include description field in document update',
type: 'boolean',
default: false
})
.option('include-document-type', {
describe: 'Include document_type field in document update',
type: 'boolean',
default: false
})
.option('dry-run', {
describe: 'Run without updating documents (for testing)',
type: 'boolean',
default: false
})
.option('overwrite', {
describe: 'Force processing of documents that already have summary_texts field',
type: 'boolean',
default: false
})
.help()
.argv;
try {
await analyzeOpenSearchDocuments({
index: argv.index,
query: argv.query,
contentField: argv.contentField,
service: argv.service,
batchSize: argv.batchSize,
maxContentLength: argv.maxContentLength,
minContentLength: argv.minContentLength,
includeTitle: argv.includeTitle,
includeDescription: argv.includeDescription,
includeDocumentType: argv.includeDocumentType,
dryRun: argv.dryRun,
overwrite: argv.overwrite
});
} catch (error) {
console.error(`Unexpected error in main: ${error.message}`);
process.exit(1);
}
}
// Run the main function
main().catch(error => {
console.error('Fatal error:', error);
process.exit(1);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment