Created
December 5, 2025 08:26
-
-
Save nobuhikosekiya/ccb958e06200a5c07910a4af153c58b3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Elastic Cloud Serverlessにログを送信するスクリプト | |
| """ | |
| import os | |
| import json | |
| import requests | |
| from datetime import datetime | |
| import glob | |
| from pathlib import Path | |
| import time | |
| class ElasticLogSender: | |
| def __init__(self, elasticsearch_url, api_key, data_stream_name="logs-app-default", use_data_stream=True): | |
| """ | |
| Elastic Cloud Serverlessクライアントを初期化 | |
| Args: | |
| elasticsearch_url (str): Elastic Cloud ServerlessのエンドポイントURL | |
| api_key (str): API認証キー | |
| data_stream_name (str): Data Stream名(logs-{dataset}-{namespace}形式) | |
| use_data_stream (bool): Data Streamを使用するかどうか | |
| """ | |
| self.elasticsearch_url = elasticsearch_url.rstrip('/') | |
| self.api_key = api_key | |
| self.data_stream_name = data_stream_name | |
| self.use_data_stream = use_data_stream | |
| self.headers = { | |
| 'Authorization': f'ApiKey {api_key}', | |
| 'Content-Type': 'application/json' | |
| } | |
| def send_log_entry(self, message, timestamp=None, additional_fields=None): | |
| """ | |
| 単一のログエントリをElasticsearchに送信 | |
| Args: | |
| message (str): ログメッセージ | |
| timestamp (str, optional): タイムスタンプ。指定されない場合は現在時刻 | |
| additional_fields (dict, optional): 追加フィールド | |
| Returns: | |
| bool: 送信が成功した場合True | |
| """ | |
| if timestamp is None: | |
| timestamp = datetime.utcnow().isoformat() + "Z" | |
| log_entry = { | |
| "@timestamp": timestamp, | |
| "message": message.strip() | |
| } | |
| # 追加フィールドがある場合は追加 | |
| if additional_fields: | |
| log_entry.update(additional_fields) | |
| # Data Streamの場合は適切なフィールドを追加 | |
| if self.use_data_stream: | |
| # Data Stream必須フィールドを追加 | |
| log_entry.update({ | |
| "data_stream": { | |
| "type": "logs", | |
| "dataset": self.data_stream_name.split('-')[1] if '-' in self.data_stream_name else "app", | |
| "namespace": self.data_stream_name.split('-')[2] if self.data_stream_name.count('-') >= 2 else "default" | |
| } | |
| }) | |
| url = f"{self.elasticsearch_url}/{self.data_stream_name}/_doc" | |
| else: | |
| url = f"{self.elasticsearch_url}/{self.data_stream_name}/_doc" | |
| try: | |
| response = requests.post(url, headers=self.headers, json=log_entry) | |
| response.raise_for_status() | |
| print(f"✓ ログエントリを送信しました: {message[:50]}...") | |
| return True | |
| except requests.exceptions.RequestException as e: | |
| print(f"✗ ログ送信エラー: {e}") | |
| print(f" レスポンス: {response.text if 'response' in locals() else 'N/A'}") | |
| return False | |
| def send_bulk_logs(self, log_entries): | |
| """ | |
| 複数のログエントリをバルクAPIで一括送信 | |
| Args: | |
| log_entries (list): ログエントリのリスト | |
| Returns: | |
| bool: 送信が成功した場合True | |
| """ | |
| if not log_entries: | |
| return True | |
| bulk_data = [] | |
| for entry in log_entries: | |
| # Data Streamの場合は適切なフィールドを追加 | |
| if self.use_data_stream and "data_stream" not in entry: | |
| entry["data_stream"] = { | |
| "type": "logs", | |
| "dataset": self.data_stream_name.split('-')[1] if '-' in self.data_stream_name else "app", | |
| "namespace": self.data_stream_name.split('-')[2] if self.data_stream_name.count('-') >= 2 else "default" | |
| } | |
| # バルクAPIのインデックス指定 | |
| if self.use_data_stream: | |
| bulk_data.append(json.dumps({"create": {"_index": self.data_stream_name}})) | |
| else: | |
| bulk_data.append(json.dumps({"index": {"_index": self.data_stream_name}})) | |
| bulk_data.append(json.dumps(entry)) | |
| bulk_body = "\n".join(bulk_data) + "\n" | |
| url = f"{self.elasticsearch_url}/_bulk" | |
| headers = self.headers.copy() | |
| headers['Content-Type'] = 'application/x-ndjson' | |
| try: | |
| response = requests.post(url, headers=headers, data=bulk_body) | |
| response.raise_for_status() | |
| result = response.json() | |
| if result.get('errors', False): | |
| print(f"✗ バルク送信でエラーが発生: {result}") | |
| return False | |
| print(f"✓ {len(log_entries)}件のログエントリを一括送信しました") | |
| return True | |
| except requests.exceptions.RequestException as e: | |
| print(f"✗ バルク送信エラー: {e}") | |
| print(f" レスポンス: {response.text if 'response' in locals() else 'N/A'}") | |
| return False | |
| def process_log_file(self, file_path): | |
| """ | |
| ログファイルを読み込んでエントリのリストを作成 | |
| Args: | |
| file_path (str): ログファイルのパス | |
| Returns: | |
| list: ログエントリのリスト | |
| """ | |
| log_entries = [] | |
| current_timestamp = datetime.utcnow().isoformat() + "Z" | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| for line_num, line in enumerate(f, 1): | |
| line = line.strip() | |
| if line: # 空行をスキップ | |
| log_entry = { | |
| "@timestamp": current_timestamp, | |
| "message": line, | |
| "source_file": str(file_path), | |
| "line_number": line_num | |
| } | |
| log_entries.append(log_entry) | |
| print(f"✓ {file_path}から{len(log_entries)}行読み込みました") | |
| return log_entries | |
| except Exception as e: | |
| print(f"✗ ファイル読み込みエラー ({file_path}): {e}") | |
| return [] | |
| def process_log_folder(self, folder_path, pattern="*"): | |
| """ | |
| フォルダ内のログファイルを処理してElasticsearchに送信 | |
| Args: | |
| folder_path (str): ログファイルが格納されているフォルダのパス | |
| pattern (str): ファイル名のパターン(デフォルトは全ファイル) | |
| """ | |
| folder = Path(folder_path) | |
| if not folder.exists(): | |
| print(f"✗ フォルダが見つかりません: {folder_path}") | |
| return | |
| if not folder.is_dir(): | |
| print(f"✗ 指定されたパスはフォルダではありません: {folder_path}") | |
| return | |
| # フォルダ内のファイルを検索 | |
| file_pattern = folder / pattern | |
| log_files = glob.glob(str(file_pattern)) | |
| if not log_files: | |
| print(f"✗ パターン '{pattern}' に一致するファイルが見つかりません: {folder_path}") | |
| return | |
| print(f"📁 {len(log_files)}個のファイルを処理します...") | |
| total_entries = 0 | |
| successful_files = 0 | |
| for log_file in log_files: | |
| print(f"\n📄 処理中: {log_file}") | |
| # ログファイルを読み込み | |
| log_entries = self.process_log_file(log_file) | |
| if log_entries: | |
| # バルクAPIで送信 | |
| if self.send_bulk_logs(log_entries): | |
| total_entries += len(log_entries) | |
| successful_files += 1 | |
| else: | |
| print(f"✗ {log_file}の送信に失敗しました") | |
| # レート制限を避けるため少し待機 | |
| time.sleep(0.1) | |
| print(f"\n📊 処理完了:") | |
| print(f" - 成功したファイル: {successful_files}/{len(log_files)}") | |
| print(f" - 送信したログエントリ総数: {total_entries}") | |
| def main(): | |
| """ | |
| メイン関数 - 設定を読み込んでログ送信を実行 | |
| """ | |
| # 設定(環境変数または直接指定) | |
| ELASTICSEARCH_URL = os.getenv('ELASTICSEARCH_URL', 'https://your-deployment.es.us-central1.gcp.cloud.es.io') | |
| API_KEY = os.getenv('ELASTIC_API_KEY', 'your-api-key-here') | |
| DATA_STREAM_NAME = os.getenv('DATA_STREAM_NAME', 'logs-app-default') | |
| USE_DATA_STREAM = os.getenv('USE_DATA_STREAM', 'true').lower() == 'true' | |
| LOG_FOLDER = os.getenv('LOG_FOLDER', 'log_folder') | |
| FILE_PATTERN = os.getenv('FILE_PATTERN', '*') # *.log, *.txt など | |
| # 必須設定チェック | |
| if not ELASTICSEARCH_URL or ELASTICSEARCH_URL == 'https://your-deployment.es.us-central1.gcp.cloud.es.io': | |
| print("✗ ELASTICSEARCH_URLを設定してください") | |
| print(" 例: export ELASTICSEARCH_URL='https://your-deployment.es.region.provider.cloud.es.io'") | |
| return | |
| if not API_KEY or API_KEY == 'your-api-key-here': | |
| print("✗ ELASTIC_API_KEYを設定してください") | |
| print(" 例: export ELASTIC_API_KEY='your-api-key'") | |
| return | |
| print("🚀 Elastic Cloud Serverlessログ送信開始") | |
| print(f" エンドポイント: {ELASTICSEARCH_URL}") | |
| print(f" Data Stream: {DATA_STREAM_NAME}" if USE_DATA_STREAM else f" インデックス: {DATA_STREAM_NAME}") | |
| print(f" Data Stream使用: {'Yes' if USE_DATA_STREAM else 'No'}") | |
| print(f" ログフォルダ: {LOG_FOLDER}") | |
| print(f" ファイルパターン: {FILE_PATTERN}") | |
| print() | |
| # ログ送信クライアントを初期化 | |
| sender = ElasticLogSender(ELASTICSEARCH_URL, API_KEY, DATA_STREAM_NAME, USE_DATA_STREAM) | |
| # ログフォルダを処理 | |
| sender.process_log_folder(LOG_FOLDER, FILE_PATTERN) | |
| print("\n✅ 処理完了") | |
| if __name__ == "__main__": | |
| main() |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
使い方