Created
May 26, 2025 19:55
-
-
Save manzke/37d69b338086fa50249308b61782b300 to your computer and use it in GitHub Desktop.
Ai-based OpenSearch Enricher
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const fs = require('fs'); | |
| const path = require('path'); | |
| const { OpenAI } = require('openai'); | |
| const { Anthropic } = require('@anthropic-ai/sdk'); | |
| const { GoogleGenAI } = require('@google/genai'); | |
| const yargs = require('yargs/yargs'); | |
| const { hideBin } = require('yargs/helpers'); | |
| const dotenv = require('dotenv'); | |
| const { Client } = require('@opensearch-project/opensearch'); | |
| // Load environment variables - try multiple paths | |
| const possibleEnvPaths = [ | |
| path.resolve(__dirname, '.env'), // .env in the same directory as the script | |
| path.resolve(__dirname, '../.env'), // .env in the parent directory | |
| path.resolve(process.cwd(), '.env') // .env in the current working directory | |
| ]; | |
| // Try loading from each possible path | |
| let envLoaded = false; | |
| for (const envPath of possibleEnvPaths) { | |
| if (fs.existsSync(envPath)) { | |
| console.log(`Loading environment variables from: ${envPath}`); | |
| dotenv.config({ path: envPath }); | |
| envLoaded = true; | |
| break; | |
| } | |
| } | |
| if (!envLoaded) { | |
| console.warn('Warning: No .env file found. Make sure to provide environment variables.'); | |
| } | |
| // --- Shared Constants --- | |
| const COMMON_ANALYSIS_PROMPT = `Analyze this text content comprehensively. Based on its content, provide the following: | |
| 1. A descriptive title (max 10 words). | |
| 2. A short description (2-3 sentences). | |
| 3. A comprehensive summary. | |
| 4. A list of the most important entities (maximum 25 entities). Focus on named entities like people, organizations, locations, products, and key concepts. Only include entities that are clearly important to understanding the content. | |
| 5. A possible document type (e.g., article, report, paper, news, blog post, documentation, etc.). | |
| Please format your response as a JSON object with the following keys: 'title', 'description', 'summary', 'entities' (as a list of strings, max 25 items), and 'document_type'.`; | |
| // --- Helper Functions --- | |
| /** | |
| * Parses JSON response text and handles potential errors | |
| * @param {string} responseText - Response text to parse as JSON | |
| * @param {string} serviceName - Name of the service for error reporting | |
| * @returns {object} Parsed JSON object or error object | |
| */ | |
| function parseJsonResponse(responseText, serviceName) { | |
| try { | |
| // Remove code block markers if present | |
| let cleaned = responseText.trim(); | |
| if (cleaned.startsWith('```')) { | |
| cleaned = cleaned.replace(/^```[a-zA-Z]*\n?/, '').replace(/```$/, '').trim(); | |
| } | |
| return JSON.parse(cleaned); | |
| } catch (error) { | |
| console.error(`Error decoding JSON from ${serviceName} response: ${error.message}`); | |
| console.error(`Raw response text: ${responseText}`); | |
| return { | |
| title: `Error: Could not parse JSON response from ${serviceName}`, | |
| description: responseText, | |
| summary: '', | |
| entities: [], | |
| document_type: 'Error' | |
| }; | |
| } | |
| } | |
| /** | |
| * Create an OpenSearch client | |
| * @returns {Client} OpenSearch client instance | |
| */ | |
| function createOpenSearchClient() { | |
| const host = process.env.OPENSEARCH_HOST || 'localhost'; | |
| const port = process.env.OPENSEARCH_PORT || 9200; | |
| const protocol = process.env.OPENSEARCH_PROTOCOL || 'http'; | |
| const username = process.env.OPENSEARCH_USERNAME; | |
| const password = process.env.OPENSEARCH_PASSWORD; | |
| const auth = username && password ? { | |
| username, | |
| password | |
| } : null; | |
| const ssl = protocol === 'https' ? { | |
| rejectUnauthorized: false // Set to true in production with proper certs | |
| } : null; | |
| return new Client({ | |
| node: `${protocol}://${host}:${port}`, | |
| auth, | |
| ssl | |
| }); | |
| } | |
| /** | |
| * Run a search query against OpenSearch | |
| * @param {Client} client - OpenSearch client | |
| * @param {string} index - Index to search | |
| * @param {object} query - Query object | |
| * @param {number} size - Number of results to return | |
| * @returns {Promise<Array>} Search results | |
| */ | |
| async function searchOpenSearch(client, index, query, size = 100) { | |
| try { | |
| const response = await client.search({ | |
| index, | |
| body: { | |
| query, | |
| size | |
| } | |
| }); | |
| if (response.body.hits && response.body.hits.hits) { | |
| return response.body.hits.hits; | |
| } else { | |
| console.error('Unexpected response structure from OpenSearch'); | |
| return []; | |
| } | |
| } catch (error) { | |
| console.error(`Error searching OpenSearch: ${error.message}`); | |
| return []; | |
| } | |
| } | |
| /** | |
| * Update a document in OpenSearch | |
| * @param {Client} client - OpenSearch client | |
| * @param {string} index - Index to update | |
| * @param {string} id - Document ID | |
| * @param {object} doc - Document fields to update | |
| * @returns {Promise<boolean>} Success status | |
| */ | |
| async function updateOpenSearchDocument(client, index, id, doc) { | |
| try { | |
| await client.update({ | |
| index, | |
| id, | |
| body: { | |
| doc | |
| } | |
| }); | |
| return true; | |
| } catch (error) { | |
| console.error(`Error updating document ${id}: ${error.message}`); | |
| return false; | |
| } | |
| } | |
| // --- AI Service Interaction Functions --- | |
| /** | |
| * Analyzes text content using Google Gemini Pro | |
| * @param {string} textContent - Text content to analyze | |
| * @returns {Promise<object>} Analysis result | |
| */ | |
| async function analyzeWithGoogle(textContent) { | |
| console.log(`Analyzing text content with Google Gemini Pro...`); | |
| const apiKey = process.env.GOOGLE_API_KEY; | |
| if (!apiKey) { | |
| console.error("Error: GOOGLE_API_KEY not found in environment variables."); | |
| return {}; | |
| } | |
| try { | |
| // Validate API key | |
| if (!apiKey || apiKey.trim() === '') { | |
| console.error("Error: GOOGLE_API_KEY is empty or invalid."); | |
| return {}; | |
| } | |
| // Initialize the Google GenAI SDK | |
| const ai = new GoogleGenAI({ apiKey }); | |
| const modelName = 'gemini-2.5-flash-preview-05-20'; // 'gemini-2.0-flash'; | |
| console.log(`Using Google Gemini model: ${modelName}`); | |
| // Use the SDK's generateContent method | |
| const result = await ai.models.generateContent({ | |
| model: modelName, | |
| contents: [ | |
| { | |
| role: 'user', | |
| parts: [ | |
| { text: COMMON_ANALYSIS_PROMPT + "\n\nCONTENT TO ANALYZE:\n" + textContent } | |
| ] | |
| } | |
| ] | |
| }); | |
| const responseText = result.text; | |
| if (responseText) { | |
| try { | |
| return parseJsonResponse(responseText, "Google Gemini"); | |
| } catch (parseError) { | |
| console.error(`Error processing Gemini response: ${parseError.message}`); | |
| console.error(`Raw response text: ${responseText}`); | |
| return { | |
| title: "Error parsing Gemini response", | |
| description: "The response from Gemini could not be parsed as valid JSON.", | |
| summary: responseText.substring(0, 200) + (responseText.length > 200 ? '...' : ''), | |
| entities: [], | |
| document_type: "Error" | |
| }; | |
| } | |
| } else { | |
| console.error("Error: No valid text content in response from Google Gemini."); | |
| return {}; | |
| } | |
| } catch (error) { | |
| console.error(`An unexpected error occurred in analyzeWithGoogle (Gemini): ${error.message}`); | |
| return {}; | |
| } | |
| } | |
| /** | |
| * Analyzes text content using OpenAI API on Azure | |
| * @param {string} textContent - Text content to analyze | |
| * @returns {Promise<object>} Analysis result | |
| */ | |
| async function analyzeWithOpenAiAzure(textContent) { | |
| console.log(`Analyzing text content with OpenAI on Azure...`); | |
| const azureEndpoint = process.env.AZURE_OPENAI_ENDPOINT; | |
| const apiKey = process.env.AZURE_OPENAI_KEY; | |
| const deploymentName = process.env.AZURE_OPENAI_DEPLOYMENT_NAME; | |
| if (!azureEndpoint || !apiKey || !deploymentName) { | |
| console.error("Error: Azure OpenAI environment variables (AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_KEY, AZURE_OPENAI_DEPLOYMENT_NAME) not fully set."); | |
| return {}; | |
| } | |
| try { | |
| const client = new OpenAI({ | |
| azure: { | |
| apiKey, | |
| endpoint: azureEndpoint, | |
| deploymentName, | |
| }, | |
| apiVersion: '2024-02-01', | |
| }); | |
| const response = await client.chat.completions.create({ | |
| model: deploymentName, | |
| messages: [ | |
| { | |
| role: 'user', | |
| content: COMMON_ANALYSIS_PROMPT + "\n\nCONTENT TO ANALYZE:\n" + textContent | |
| } | |
| ], | |
| max_tokens: 2048 | |
| }); | |
| if (response.choices && | |
| response.choices[0] && | |
| response.choices[0].message && | |
| response.choices[0].message.content) { | |
| const responseText = response.choices[0].message.content; | |
| return parseJsonResponse(responseText, "OpenAI"); | |
| } else { | |
| console.error("Error: Unexpected response structure from OpenAI."); | |
| console.error(`Raw response: ${JSON.stringify(response)}`); | |
| return {}; | |
| } | |
| } catch (error) { | |
| console.error(`An unexpected error occurred in analyzeWithOpenAiAzure: ${error.message}`); | |
| if (error.status) { | |
| console.error(`Status code: ${error.status}`); | |
| } | |
| if (error.response) { | |
| console.error(`Response: ${JSON.stringify(error.response)}`); | |
| } | |
| return {}; | |
| } | |
| } | |
| /** | |
| * Analyzes text content using Anthropic Claude API | |
| * @param {string} textContent - Text content to analyze | |
| * @returns {Promise<object>} Analysis result | |
| */ | |
| async function analyzeWithAnthropic(textContent) { | |
| console.log(`Analyzing text content with Anthropic Claude...`); | |
| const apiKey = process.env.ANTHROPIC_API_KEY; | |
| if (!apiKey) { | |
| console.error("Error: ANTHROPIC_API_KEY not found in environment variables."); | |
| return {}; | |
| } | |
| try { | |
| const client = new Anthropic({ apiKey }); | |
| const message = await client.messages.create({ | |
| model: "claude-3-5-sonnet-20240620", | |
| max_tokens: 2048, | |
| messages: [ | |
| { | |
| role: "user", | |
| content: COMMON_ANALYSIS_PROMPT + "\n\nCONTENT TO ANALYZE:\n" + textContent | |
| } | |
| ] | |
| }); | |
| if (message.content && | |
| Array.isArray(message.content) && | |
| message.content[0] && | |
| message.content[0].text) { | |
| const responseText = message.content[0].text; | |
| return parseJsonResponse(responseText, "Anthropic"); | |
| } else { | |
| console.error("Error: Unexpected response structure from Anthropic."); | |
| console.error(`Raw response: ${JSON.stringify(message)}`); | |
| return {}; | |
| } | |
| } catch (error) { | |
| console.error(`An unexpected error occurred in analyzeWithAnthropic: ${error.message}`); | |
| if (error.status) { | |
| console.error(`Status code: ${error.status}`); | |
| } | |
| return {}; | |
| } | |
| } | |
| /** | |
| * Analyze OpenSearch documents and update them with AI-generated insights | |
| * @param {object} options - Configuration options | |
| */ | |
| async function analyzeOpenSearchDocuments(options) { | |
| const { | |
| index, | |
| query, | |
| contentField, | |
| service, | |
| batchSize = 10, | |
| dryRun = false, | |
| maxContentLength = 15000, // Max content length parameter | |
| minContentLength = 50, // Min content length parameter | |
| overwrite = false // Add overwrite parameter | |
| } = options; | |
| console.log(`\n=== Analyzing OpenSearch documents in index '${index}' ===\n`); | |
| // Create OpenSearch client | |
| const client = createOpenSearchClient(); | |
| // Test connection | |
| try { | |
| const info = await client.info(); | |
| console.log(`Connected to OpenSearch version ${info.body.version.number}`); | |
| } catch (error) { | |
| console.error(`Failed to connect to OpenSearch: ${error.message}`); | |
| return; | |
| } | |
| // Build the query | |
| let searchQuery = { match_all: {} }; // Default query | |
| if (query) { | |
| try { | |
| if (typeof query === 'string') { | |
| // Assume it's a query string for the content field | |
| searchQuery = { | |
| query_string: { | |
| query, | |
| default_field: contentField | |
| } | |
| }; | |
| } else if (typeof query === 'object') { | |
| // Assume it's already a properly formatted query | |
| searchQuery = query; | |
| } | |
| } catch (error) { | |
| console.error(`Error parsing query: ${error.message}`); | |
| return; | |
| } | |
| } | |
| // Get total count of matching documents | |
| const countResponse = await client.count({ | |
| index, | |
| body: { | |
| query: searchQuery | |
| } | |
| }); | |
| const totalDocuments = countResponse.body.count; | |
| console.log(`Found ${totalDocuments} matching documents`); | |
| if (totalDocuments === 0) { | |
| console.log("No documents to process."); | |
| return; | |
| } | |
| // Process documents in batches | |
| let processedCount = 0; | |
| let updatedCount = 0; | |
| let errorCount = 0; | |
| let skippedCount = 0; | |
| let tooShortCount = 0; | |
| for (let from = 0; from < totalDocuments; from += batchSize) { | |
| const size = Math.min(batchSize, totalDocuments - from); | |
| console.log(`\nProcessing batch ${Math.floor(from/batchSize) + 1} (documents ${from+1}-${from+size} of ${totalDocuments})...`); | |
| // Search for a batch of documents | |
| const searchResponse = await client.search({ | |
| index, | |
| body: { | |
| query: searchQuery, | |
| from, | |
| size | |
| } | |
| }); | |
| const hits = searchResponse.body.hits.hits; | |
| // Process each document in the batch | |
| for (const hit of hits) { | |
| processedCount++; | |
| const documentId = hit._id; | |
| const document = hit._source; | |
| // Check if document has already been processed and should be skipped | |
| if (!overwrite && document.summary_texts && Array.isArray(document.summary_texts) && document.summary_texts.length > 0) { | |
| console.log(`Document ${documentId} already has summary_texts field, skipping. Use --overwrite to force processing.`); | |
| skippedCount++; | |
| continue; | |
| } | |
| // Skip if content field is missing | |
| if (!document[contentField]) { | |
| console.log(`Document ${documentId} has no '${contentField}' field, skipping...`); | |
| errorCount++; | |
| continue; | |
| } | |
| // Handle different content field types (string, array, object) | |
| let content = document[contentField]; | |
| // If content is an array, join the elements | |
| if (Array.isArray(content)) { | |
| console.log(`Content field is an array with ${content.length} elements`); | |
| content = content.join("\n"); | |
| } | |
| // If content is an object, stringify it | |
| else if (typeof content === 'object' && content !== null) { | |
| console.log(`Content field is an object`); | |
| content = JSON.stringify(content, null, 2); | |
| } | |
| // If it's already a string, use it directly | |
| else if (typeof content === 'string') { | |
| console.log(`Content field is a string`); | |
| } | |
| // For any other type, convert to string | |
| else { | |
| console.log(`Content field is type: ${typeof content}, converting to string`); | |
| content = String(content); | |
| } | |
| console.log(`\nProcessing document ${documentId} (${processedCount}/${totalDocuments})...`); | |
| console.log(`Content length: ${content.length} characters`); | |
| if (content.length < 100) { | |
| console.log(`Content is "${content}"`); | |
| } else { | |
| console.log(`Content preview: "${content.substring(0, 100)}..."`); | |
| } | |
| // Skip if content is too short to analyze meaningfully | |
| if (content.length < minContentLength) { | |
| console.log(`Content length (${content.length}) is below minimum threshold (${minContentLength}), skipping...`); | |
| tooShortCount++; | |
| continue; | |
| } | |
| // Truncate content if it's too long to avoid token limits | |
| if (content.length > maxContentLength) { | |
| console.log(`Content exceeds maximum length (${content.length} > ${maxContentLength}), truncating...`); | |
| // Take the first and last portions to preserve context | |
| const firstPart = content.substring(0, maxContentLength * 0.7); | |
| const lastPart = content.substring(content.length - maxContentLength * 0.3); | |
| content = firstPart + "\n\n[...content truncated for length...]\n\n" + lastPart; | |
| console.log(`Truncated content length: ${content.length} characters`); | |
| } | |
| // Analyze with the selected AI service | |
| let analysisResult; | |
| try { | |
| switch (service) { | |
| case 'google': | |
| analysisResult = await analyzeWithGoogle(content); | |
| break; | |
| case 'openai': | |
| analysisResult = await analyzeWithOpenAiAzure(content); | |
| break; | |
| case 'anthropic': | |
| analysisResult = await analyzeWithAnthropic(content); | |
| break; | |
| default: | |
| console.error(`Error: Unknown service '${service}'`); | |
| errorCount++; | |
| continue; | |
| } | |
| } catch (error) { | |
| console.error(`Error analyzing document ${documentId}: ${error.message}`); | |
| errorCount++; | |
| continue; | |
| } | |
| // Skip if analysis failed | |
| if (!analysisResult || Object.keys(analysisResult).length === 0) { | |
| console.log(`Analysis failed for document ${documentId}, skipping...`); | |
| errorCount++; | |
| continue; | |
| } | |
| // Create update document | |
| const updateDoc = { | |
| summary_texts: [analysisResult.summary || ""], | |
| entities_labels: (analysisResult.entities || []).slice(0, 25) // Ensure max 25 entities | |
| }; | |
| // Log if entities were limited | |
| if (analysisResult.entities && analysisResult.entities.length > 25) { | |
| console.log(`Limited entities from ${analysisResult.entities.length} to 25`); | |
| } | |
| // Add other fields if needed | |
| if (options.includeTitle && analysisResult.title) { | |
| updateDoc.title = analysisResult.title; | |
| } | |
| if (options.includeDescription && analysisResult.description) { | |
| updateDoc.description_texts = [analysisResult.description]; | |
| } | |
| if (options.includeDocumentType && analysisResult.document_type) { | |
| updateDoc.documentType = analysisResult.document_type; | |
| } | |
| // Log the first 100 characters of the summary or the full summary if it's shorter | |
| const summaryPreview = typeof updateDoc.summary_texts[0] === 'string' && updateDoc.summary_texts[0].length > 100 | |
| ? updateDoc.summary_texts[0].substring(0, 100) + '...' | |
| : updateDoc.summary_texts[0]; | |
| console.log(`Summary: ${summaryPreview}`); | |
| console.log(`Entities: ${updateDoc.entities_labels.join(', ')}`); | |
| // Update the document | |
| if (!dryRun) { | |
| const updateSuccess = await updateOpenSearchDocument(client, index, documentId, updateDoc); | |
| if (updateSuccess) { | |
| updatedCount++; | |
| console.log(`Document ${documentId} updated successfully`); | |
| } else { | |
| errorCount++; | |
| console.log(`Failed to update document ${documentId}`); | |
| } | |
| } else { | |
| console.log(`[DRY RUN] Would update document ${documentId}`); | |
| updatedCount++; | |
| } | |
| } | |
| } | |
| console.log(`\n=== Analysis Complete ===`); | |
| console.log(`Total documents found: ${totalDocuments}`); | |
| console.log(`Processed: ${processedCount}`); | |
| console.log(`Skipped (already processed): ${skippedCount}`); | |
| console.log(`Skipped (content too short): ${tooShortCount}`); | |
| console.log(`Successfully updated: ${updatedCount}`); | |
| console.log(`Errors: ${errorCount}`); | |
| if (dryRun) { | |
| console.log(`Note: This was a dry run. No documents were actually updated.`); | |
| } | |
| } | |
| /** | |
| * Main function to run the OpenSearch document analyzer | |
| */ | |
| async function main() { | |
| // Parse command line arguments | |
| const argv = yargs(hideBin(process.argv)) | |
| .usage('Usage: $0 [options]') | |
| .option('index', { | |
| alias: 'i', | |
| describe: 'OpenSearch index to search', | |
| type: 'string', | |
| demandOption: true | |
| }) | |
| .option('query', { | |
| alias: 'q', | |
| describe: 'Query string to filter documents', | |
| type: 'string' | |
| }) | |
| .option('content-field', { | |
| alias: 'f', | |
| describe: 'Field containing the content to analyze', | |
| type: 'string', | |
| default: 'content' | |
| }) | |
| .option('service', { | |
| alias: 's', | |
| describe: 'AI service to use for analysis', | |
| choices: ['google', 'openai', 'anthropic'], | |
| demandOption: true | |
| }) | |
| .option('batch-size', { | |
| alias: 'b', | |
| describe: 'Number of documents to process in each batch', | |
| type: 'number', | |
| default: 1 | |
| }) | |
| .option('max-content-length', { | |
| alias: 'm', | |
| describe: 'Maximum content length to send to AI service (characters)', | |
| type: 'number', | |
| default: 15000 | |
| }) | |
| .option('min-content-length', { | |
| describe: 'Minimum content length required for analysis (characters)', | |
| type: 'number', | |
| default: 50 | |
| }) | |
| .option('include-title', { | |
| describe: 'Include title field in document update', | |
| type: 'boolean', | |
| default: false | |
| }) | |
| .option('include-description', { | |
| describe: 'Include description field in document update', | |
| type: 'boolean', | |
| default: false | |
| }) | |
| .option('include-document-type', { | |
| describe: 'Include document_type field in document update', | |
| type: 'boolean', | |
| default: false | |
| }) | |
| .option('dry-run', { | |
| describe: 'Run without updating documents (for testing)', | |
| type: 'boolean', | |
| default: false | |
| }) | |
| .option('overwrite', { | |
| describe: 'Force processing of documents that already have summary_texts field', | |
| type: 'boolean', | |
| default: false | |
| }) | |
| .help() | |
| .argv; | |
| try { | |
| await analyzeOpenSearchDocuments({ | |
| index: argv.index, | |
| query: argv.query, | |
| contentField: argv.contentField, | |
| service: argv.service, | |
| batchSize: argv.batchSize, | |
| maxContentLength: argv.maxContentLength, | |
| minContentLength: argv.minContentLength, | |
| includeTitle: argv.includeTitle, | |
| includeDescription: argv.includeDescription, | |
| includeDocumentType: argv.includeDocumentType, | |
| dryRun: argv.dryRun, | |
| overwrite: argv.overwrite | |
| }); | |
| } catch (error) { | |
| console.error(`Unexpected error in main: ${error.message}`); | |
| process.exit(1); | |
| } | |
| } | |
| // Run the main function | |
| main().catch(error => { | |
| console.error('Fatal error:', error); | |
| process.exit(1); | |
| }); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment