Skip to content

Instantly share code, notes, and snippets.

@fukaz55
Created January 15, 2026 04:39
Show Gist options
  • Select an option

  • Save fukaz55/ff6b5ca3274798515959f9c66fea3c09 to your computer and use it in GitHub Desktop.

Select an option

Save fukaz55/ff6b5ca3274798515959f9c66fea3c09 to your computer and use it in GitHub Desktop.
Mastodonのエクスポートで取得した投稿本文のアーカイブ outbox.json から、公開範囲が Public な投稿を sqlite3 データベースに保存する
import json
import re
import sqlite3
import os
from datetime import datetime, timezone, timedelta
# 入力ファイル名と出力ファイル名
INPUT_FILE = 'outbox.json'
DB_FILE = 'mastodon_posts.db'
# JST (日本標準時) の定義
JST = timezone(timedelta(hours=+9), 'JST')
def setup_database():
"""データベースの初期設定とテーブル作成"""
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
# テーブルの作成
cursor.execute('''
CREATE TABLE IF NOT EXISTS posts (
id TEXT PRIMARY KEY,
published TEXT,
dated TEXT,
target_to TEXT,
content TEXT,
attachment TEXT,
tag TEXT
)
''')
conn.commit()
return conn
def clean_html(raw_html):
"""HTMLタグを取り除き、プレーンテキストに変換する"""
if not raw_html:
return ""
clean_text = re.sub(r'<br\s*/?>|</p>', '\n', raw_html)
clean_text = re.sub(r'<[^>]+>', '', clean_text)
return clean_text.strip()
def convert_to_jst(iso_date_str):
"""ISO 8601形式の文字列をJSTの読みやすい形式に変換する"""
if not iso_date_str:
return ""
try:
# 文字列をdatetimeオブジェクトに変換 (末尾のZをUTCとして処理)
# Mastodonの形式 2023-10-01T12:00:00Z に対応
utc_dt = datetime.fromisoformat(iso_date_str.replace('Z', '+00:00'))
# JSTに変換
jst_dt = utc_dt.astimezone(JST)
# CSVで見やすい形式 (2023-10-01 21:00:00) にフォーマット
return jst_dt.strftime('%Y-%m-%d %H:%M:%S')
except Exception:
return iso_date_str
def process_outbox():
# DB接続
conn = setup_database()
cursor = conn.cursor()
with open(INPUT_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
posts = []
for item in data.get('orderedItems', []):
if item.get('type') == 'Create' and isinstance(item.get('object'), dict):
obj = item['object']
# ID
post_id = obj.get('id', '')
# 公開日時をJSTに変換
published_utc = obj.get('published', '')
published_jst = convert_to_jst(published_utc)
# 公開範囲 (to)
to_field = obj.get('to', [])
to_str = " ".join(to_field) if isinstance(to_field, list) else str(to_field)
# publicでないものは除外する
if "#Public" not in to_str:
continue
# 本文
content = clean_html(obj.get('content', ''))
# 添付ファイル (attachment)
attachments = obj.get('attachment', [])
attachment_urls = " ".join([a.get('url', '') for a in attachments if 'url' in a])
# タグ (tag)
tags = obj.get('tag', [])
tag_names = " ".join([t.get('name', '') for t in tags if t.get('type') == 'Hashtag' and 'name' in t])
posts.append({
"id": post_id,
"published": published_jst,
"to": to_str,
"content": content,
"attachment": attachment_urls,
"tag": tag_names
})
# データ挿入用のSQL
# すでに同じIDがある場合は無視
insert_sql = '''
INSERT OR IGNORE INTO posts (id, published, dated, target_to, content, attachment, tag)
VALUES (?, ?, ?, ?, ?, ?, ?)
'''
try:
count = 0
for row in posts:
cursor.execute(insert_sql, (
row["id"], # id
row["published"], # published
row["published"][0:10], #dated
row["to"], # to
row["content"], # content
row["attachment"], # attachment
row["tag"] # tag
))
count += 1
conn.commit()
print(f"処理完了: {count} 件のデータを {DB_FILE} に保存しました。")
except Exception as e:
print(f"エラーが発生しました: {e}")
conn.rollback()
finally:
conn.close()
if __name__ == "__main__":
process_outbox()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment