Skip to content

Instantly share code, notes, and snippets.

@nobuhikosekiya
Created December 5, 2025 08:26
Show Gist options
  • Select an option

  • Save nobuhikosekiya/ccb958e06200a5c07910a4af153c58b3 to your computer and use it in GitHub Desktop.

Select an option

Save nobuhikosekiya/ccb958e06200a5c07910a4af153c58b3 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Elastic Cloud Serverlessにログを送信するスクリプト
"""
import os
import json
import requests
from datetime import datetime
import glob
from pathlib import Path
import time
class ElasticLogSender:
def __init__(self, elasticsearch_url, api_key, data_stream_name="logs-app-default", use_data_stream=True):
"""
Elastic Cloud Serverlessクライアントを初期化
Args:
elasticsearch_url (str): Elastic Cloud ServerlessのエンドポイントURL
api_key (str): API認証キー
data_stream_name (str): Data Stream名(logs-{dataset}-{namespace}形式)
use_data_stream (bool): Data Streamを使用するかどうか
"""
self.elasticsearch_url = elasticsearch_url.rstrip('/')
self.api_key = api_key
self.data_stream_name = data_stream_name
self.use_data_stream = use_data_stream
self.headers = {
'Authorization': f'ApiKey {api_key}',
'Content-Type': 'application/json'
}
def send_log_entry(self, message, timestamp=None, additional_fields=None):
"""
単一のログエントリをElasticsearchに送信
Args:
message (str): ログメッセージ
timestamp (str, optional): タイムスタンプ。指定されない場合は現在時刻
additional_fields (dict, optional): 追加フィールド
Returns:
bool: 送信が成功した場合True
"""
if timestamp is None:
timestamp = datetime.utcnow().isoformat() + "Z"
log_entry = {
"@timestamp": timestamp,
"message": message.strip()
}
# 追加フィールドがある場合は追加
if additional_fields:
log_entry.update(additional_fields)
# Data Streamの場合は適切なフィールドを追加
if self.use_data_stream:
# Data Stream必須フィールドを追加
log_entry.update({
"data_stream": {
"type": "logs",
"dataset": self.data_stream_name.split('-')[1] if '-' in self.data_stream_name else "app",
"namespace": self.data_stream_name.split('-')[2] if self.data_stream_name.count('-') >= 2 else "default"
}
})
url = f"{self.elasticsearch_url}/{self.data_stream_name}/_doc"
else:
url = f"{self.elasticsearch_url}/{self.data_stream_name}/_doc"
try:
response = requests.post(url, headers=self.headers, json=log_entry)
response.raise_for_status()
print(f"✓ ログエントリを送信しました: {message[:50]}...")
return True
except requests.exceptions.RequestException as e:
print(f"✗ ログ送信エラー: {e}")
print(f" レスポンス: {response.text if 'response' in locals() else 'N/A'}")
return False
def send_bulk_logs(self, log_entries):
"""
複数のログエントリをバルクAPIで一括送信
Args:
log_entries (list): ログエントリのリスト
Returns:
bool: 送信が成功した場合True
"""
if not log_entries:
return True
bulk_data = []
for entry in log_entries:
# Data Streamの場合は適切なフィールドを追加
if self.use_data_stream and "data_stream" not in entry:
entry["data_stream"] = {
"type": "logs",
"dataset": self.data_stream_name.split('-')[1] if '-' in self.data_stream_name else "app",
"namespace": self.data_stream_name.split('-')[2] if self.data_stream_name.count('-') >= 2 else "default"
}
# バルクAPIのインデックス指定
if self.use_data_stream:
bulk_data.append(json.dumps({"create": {"_index": self.data_stream_name}}))
else:
bulk_data.append(json.dumps({"index": {"_index": self.data_stream_name}}))
bulk_data.append(json.dumps(entry))
bulk_body = "\n".join(bulk_data) + "\n"
url = f"{self.elasticsearch_url}/_bulk"
headers = self.headers.copy()
headers['Content-Type'] = 'application/x-ndjson'
try:
response = requests.post(url, headers=headers, data=bulk_body)
response.raise_for_status()
result = response.json()
if result.get('errors', False):
print(f"✗ バルク送信でエラーが発生: {result}")
return False
print(f"✓ {len(log_entries)}件のログエントリを一括送信しました")
return True
except requests.exceptions.RequestException as e:
print(f"✗ バルク送信エラー: {e}")
print(f" レスポンス: {response.text if 'response' in locals() else 'N/A'}")
return False
def process_log_file(self, file_path):
"""
ログファイルを読み込んでエントリのリストを作成
Args:
file_path (str): ログファイルのパス
Returns:
list: ログエントリのリスト
"""
log_entries = []
current_timestamp = datetime.utcnow().isoformat() + "Z"
try:
with open(file_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if line: # 空行をスキップ
log_entry = {
"@timestamp": current_timestamp,
"message": line,
"source_file": str(file_path),
"line_number": line_num
}
log_entries.append(log_entry)
print(f"✓ {file_path}から{len(log_entries)}行読み込みました")
return log_entries
except Exception as e:
print(f"✗ ファイル読み込みエラー ({file_path}): {e}")
return []
def process_log_folder(self, folder_path, pattern="*"):
"""
フォルダ内のログファイルを処理してElasticsearchに送信
Args:
folder_path (str): ログファイルが格納されているフォルダのパス
pattern (str): ファイル名のパターン(デフォルトは全ファイル)
"""
folder = Path(folder_path)
if not folder.exists():
print(f"✗ フォルダが見つかりません: {folder_path}")
return
if not folder.is_dir():
print(f"✗ 指定されたパスはフォルダではありません: {folder_path}")
return
# フォルダ内のファイルを検索
file_pattern = folder / pattern
log_files = glob.glob(str(file_pattern))
if not log_files:
print(f"✗ パターン '{pattern}' に一致するファイルが見つかりません: {folder_path}")
return
print(f"📁 {len(log_files)}個のファイルを処理します...")
total_entries = 0
successful_files = 0
for log_file in log_files:
print(f"\n📄 処理中: {log_file}")
# ログファイルを読み込み
log_entries = self.process_log_file(log_file)
if log_entries:
# バルクAPIで送信
if self.send_bulk_logs(log_entries):
total_entries += len(log_entries)
successful_files += 1
else:
print(f"✗ {log_file}の送信に失敗しました")
# レート制限を避けるため少し待機
time.sleep(0.1)
print(f"\n📊 処理完了:")
print(f" - 成功したファイル: {successful_files}/{len(log_files)}")
print(f" - 送信したログエントリ総数: {total_entries}")
def main():
"""
メイン関数 - 設定を読み込んでログ送信を実行
"""
# 設定(環境変数または直接指定)
ELASTICSEARCH_URL = os.getenv('ELASTICSEARCH_URL', 'https://your-deployment.es.us-central1.gcp.cloud.es.io')
API_KEY = os.getenv('ELASTIC_API_KEY', 'your-api-key-here')
DATA_STREAM_NAME = os.getenv('DATA_STREAM_NAME', 'logs-app-default')
USE_DATA_STREAM = os.getenv('USE_DATA_STREAM', 'true').lower() == 'true'
LOG_FOLDER = os.getenv('LOG_FOLDER', 'log_folder')
FILE_PATTERN = os.getenv('FILE_PATTERN', '*') # *.log, *.txt など
# 必須設定チェック
if not ELASTICSEARCH_URL or ELASTICSEARCH_URL == 'https://your-deployment.es.us-central1.gcp.cloud.es.io':
print("✗ ELASTICSEARCH_URLを設定してください")
print(" 例: export ELASTICSEARCH_URL='https://your-deployment.es.region.provider.cloud.es.io'")
return
if not API_KEY or API_KEY == 'your-api-key-here':
print("✗ ELASTIC_API_KEYを設定してください")
print(" 例: export ELASTIC_API_KEY='your-api-key'")
return
print("🚀 Elastic Cloud Serverlessログ送信開始")
print(f" エンドポイント: {ELASTICSEARCH_URL}")
print(f" Data Stream: {DATA_STREAM_NAME}" if USE_DATA_STREAM else f" インデックス: {DATA_STREAM_NAME}")
print(f" Data Stream使用: {'Yes' if USE_DATA_STREAM else 'No'}")
print(f" ログフォルダ: {LOG_FOLDER}")
print(f" ファイルパターン: {FILE_PATTERN}")
print()
# ログ送信クライアントを初期化
sender = ElasticLogSender(ELASTICSEARCH_URL, API_KEY, DATA_STREAM_NAME, USE_DATA_STREAM)
# ログフォルダを処理
sender.process_log_folder(LOG_FOLDER, FILE_PATTERN)
print("\n✅ 処理完了")
if __name__ == "__main__":
main()
@nobuhikosekiya
Copy link
Author

使い方

  1. 以下の形式の.envファイルを同じところに置いて、URLやAPI_KEYを設定します。
  2. log_folderのフォルダにアップロードしたいログファイルを配置します。
  3. python log_sender.py実行でログがアップロードされます。
export ELASTICSEARCH_URL=
export ELASTIC_API_KEY=

export DATA_STREAM_NAME="logs"                    # インデックス名(デフォルト: logs)
export USE_DATA_STREAM="True"                      # Data Stream使用
export LOG_FOLDER="log_folder"                 # ログフォルダパス(デフォルト: log_folder)
export FILE_PATTERN="*.log"                    # ファイルパターン(デフォルト: *)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment