Skip to content

Instantly share code, notes, and snippets.

@that0n3guy
Created November 25, 2025 19:43
Show Gist options
  • Select an option

  • Save that0n3guy/5e848d5503cbe0b20fdc597a39da0e74 to your computer and use it in GitHub Desktop.

Select an option

Save that0n3guy/5e848d5503cbe0b20fdc597a39da0e74 to your computer and use it in GitHub Desktop.
cal_syncer

Kestra Calendar Syncer Flows

This gist contains two Kestra flows for bidirectional Google Calendar synchronization.

Files

  1. init_database.yml - Database initialization flow (run once)
  2. sync_google_calendars.yml - Main bidirectional sync flow (runs every 15 minutes)

📋 init_database.yml

Purpose: Initialize PostgreSQL database schema for Google Calendar sync. Run this once before starting the sync workflows.

Features:

  • Creates synced_events table with proper indexes
  • Idempotent (safe to run multiple times)
  • Environment-aware (dev/prod database selection)

Required Secrets:

  • POSTGRES_HOST - PostgreSQL server host:port
  • POSTGRES_USER - Database username
  • POSTGRES_PASSWD - Database password
  • CAL_SYNCER_POSTGRES_DB - Database name (e.g., cal_syncer_dev or cal_syncer)

🔄 sync_google_calendars.yml

Purpose: Bidirectional sync between two Google Calendars with PostgreSQL tracking.

Features:

  • Bidirectional sync (Calendar A ↔ Calendar B)
  • Runs every 15 minutes via trigger
  • Syncs events for next 180 days (6 months)
  • Tracks sync state in PostgreSQL
  • Handles creates, updates, and deletions
  • Skips events that are already synced or have calendar as attendee
  • Preserves meeting rooms, locations, and hangout links
  • Telegram notifications on errors

Required Secrets:

  • GOOGLE_SERVICE_ACCOUNT - Google service account JSON (base64)
  • CALENDAR_A_ID - First calendar ID
  • CALENDAR_B_ID - Second calendar ID
  • POSTGRES_HOST - PostgreSQL server host:port
  • POSTGRES_USER - Database username
  • POSTGRES_PASSWD - Database password
  • CAL_SYNCER_POSTGRES_DB - Database name
  • TELEGRAM_BOT_TOKEN - Telegram bot token (for error notifications)
  • TELEGRAM_CHAT_ID - Telegram chat ID (for error notifications)

Inputs:

  • calendar_a - First calendar ID (defaults to secret)
  • calendar_b - Second calendar ID (defaults to secret)
  • days_ahead - Days to sync ahead (default: 180)
  • max_events - Max events per calendar (default: 500, prod: 2500)

🚀 Quick Start

1. Set up Google Service Account

  1. Create a Google Cloud project
  2. Enable Google Calendar API
  3. Create a service account with Calendar API access
  4. Download the service account JSON key
  5. Share both calendars with the service account email

2. Set up PostgreSQL Database

Ensure you have a PostgreSQL database accessible from Kestra.

3. Configure Kestra Secrets

In Kestra, add the following secrets:

GOOGLE_SERVICE_ACCOUNT: <base64-encoded-service-account-json>
CALENDAR_A_ID: <your-first-calendar-id@group.calendar.google.com>
CALENDAR_B_ID: <your-second-calendar-id@group.calendar.google.com>
POSTGRES_HOST: <hostname:port>
POSTGRES_USER: <database-username>
POSTGRES_PASSWD: <database-password>
CAL_SYNCER_POSTGRES_DB: cal_syncer
TELEGRAM_BOT_TOKEN: <optional-telegram-bot-token>
TELEGRAM_CHAT_ID: <optional-telegram-chat-id>

4. Deploy and Run

  1. Run init_database.yml once to create the database schema
  2. Run sync_google_calendars.yml manually to test
  3. Enable the trigger to run automatically every 15 minutes

📊 Database Schema

CREATE TABLE synced_events (
  id BIGSERIAL PRIMARY KEY,
  primary_calendar TEXT NOT NULL,
  primary_event_id TEXT NOT NULL,
  secondary_calendar TEXT NOT NULL,
  secondary_event_id TEXT NOT NULL,
  event_summary TEXT,
  event_start TIMESTAMPTZ,
  event_end TIMESTAMPTZ,
  event_signature TEXT,
  created_at TIMESTAMPTZ DEFAULT NOW(),
  last_updated TIMESTAMPTZ DEFAULT NOW(),
  last_checked TIMESTAMPTZ DEFAULT NOW(),
  CONSTRAINT unique_primary_event UNIQUE(primary_event_id, primary_calendar)
);

Indexes:

  • idx_synced_events_primary - Primary calendar and event ID
  • idx_synced_events_secondary - Secondary calendar and event ID
  • idx_synced_events_last_checked - Last checked timestamp

🔧 How It Works

Sync Logic

  1. Fetch Events: Retrieves events from both calendars for the configured time range
  2. Load Sync State: Loads existing sync records from PostgreSQL
  3. Detect Changes: Uses signature-based detection to identify new, updated, or deleted events
  4. Sync Calendar A → B: Creates/updates events from Calendar A to Calendar B
  5. Sync Calendar B → A: Creates/updates events from Calendar B to Calendar A
  6. Handle Deletions: Removes synced events when source events are deleted
  7. Update Database: Atomically updates PostgreSQL with sync state

Event Signature

Events are tracked using a signature that includes:

  • Summary
  • Start time
  • End time
  • Location
  • Transparency
  • Hangout link

When any of these change, the event is updated in the target calendar.

Skip Conditions

Events are skipped if:

  1. They contain 🔄 SYNCED FROM: in the description (already a synced event)
  2. The target calendar is already invited as an attendee

🎨 Customization

Change Namespace

Update the namespace field in both files:

namespace: your_company.cal_syncer

Adjust Sync Frequency

Modify the trigger cron expression:

triggers:
  - id: every_15_minutes
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "*/15 * * * *"  # Change this

Modify Event Prefix

Update the summary prefix in the sync scripts:

summary: `[Calendar A] ${summary}`,  // Change [Calendar A] to your prefix

Change Timezone

Update the timezone in the normalization function:

timeZone: dateObj.timeZone || 'UTC'  // Change UTC to your timezone

📝 Notes

  • Events are tagged with [Calendar A] or [Calendar B] prefix
  • Synced events include metadata in description
  • Signature-based change detection prevents unnecessary updates
  • Atomic operations ensure data consistency
  • Handles edge cases (already deleted events, orphaned records)
  • Concurrency limit of 1 prevents overlapping executions

🐛 Troubleshooting

Events Not Syncing

  1. Check that both calendars are shared with the service account
  2. Verify all secrets are correctly configured
  3. Check Kestra execution logs for errors
  4. Ensure database is accessible from Kestra

Duplicate Events

  1. Verify the database was initialized with init_database.yml
  2. Check for unique constraint violations in logs
  3. Ensure only one instance is running (concurrency limit)

Missing Updates

  1. Check event signature calculation
  2. Verify PostgreSQL connection
  3. Review sync logs for skipped events

📄 License

Feel free to use and modify these flows for your own calendar sync needs!

id: init_database
namespace: company.cal_syncer
description: |
Initialize PostgreSQL database schema for Google Calendar sync.
Creates synced_events table with indexes.
Run this once before starting the sync workflows.
Safe to run multiple times (idempotent with IF NOT EXISTS).
Environment-aware: Uses CAL_SYNCER_POSTGRES_DB secret for database name.
- Dev: cal_syncer_dev
- Prod: cal_syncer
tasks:
# ============================================================================
# Step 1: Create database schema using PostgreSQL
# ============================================================================
- id: create_database_schema
type: io.kestra.plugin.jdbc.postgresql.Queries
url: "jdbc:postgresql://{{ secret('POSTGRES_HOST') }}/{{ secret('CAL_SYNCER_POSTGRES_DB') }}"
username: "{{ secret('POSTGRES_USER') }}"
password: "{{ secret('POSTGRES_PASSWD') }}"
sql: |
-- ============================================================================
-- SYNCED EVENTS TABLE
-- ============================================================================
CREATE TABLE IF NOT EXISTS synced_events (
id BIGSERIAL PRIMARY KEY,
primary_calendar TEXT NOT NULL,
primary_event_id TEXT NOT NULL,
secondary_calendar TEXT NOT NULL,
secondary_event_id TEXT NOT NULL,
event_summary TEXT,
event_start TIMESTAMPTZ,
event_end TIMESTAMPTZ,
event_signature TEXT,
created_at TIMESTAMPTZ DEFAULT NOW(),
last_updated TIMESTAMPTZ DEFAULT NOW(),
last_checked TIMESTAMPTZ DEFAULT NOW(),
CONSTRAINT unique_primary_event UNIQUE(primary_event_id, primary_calendar)
);
-- ============================================================================
-- INDEXES FOR PERFORMANCE
-- ============================================================================
CREATE INDEX IF NOT EXISTS idx_synced_events_primary
ON synced_events(primary_event_id, primary_calendar);
CREATE INDEX IF NOT EXISTS idx_synced_events_secondary
ON synced_events(secondary_event_id, secondary_calendar);
CREATE INDEX IF NOT EXISTS idx_synced_events_last_checked
ON synced_events(last_checked);
# ============================================================================
# Step 2: Verify schema creation
# ============================================================================
- id: verify_schema
type: io.kestra.plugin.jdbc.postgresql.Query
url: "jdbc:postgresql://{{ secret('POSTGRES_HOST') }}/{{ secret('CAL_SYNCER_POSTGRES_DB') }}"
username: "{{ secret('POSTGRES_USER') }}"
password: "{{ secret('POSTGRES_PASSWD') }}"
sql: |
SELECT
table_name,
table_type
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'synced_events'
ORDER BY table_name;
fetch: true
# ============================================================================
# Step 3: Output verification results
# ============================================================================
- id: log_results
type: io.kestra.plugin.scripts.shell.Commands
commands:
- |
echo "============================================================"
echo "📊 POSTGRESQL DATABASE INITIALIZATION COMPLETE"
echo "============================================================"
echo ""
echo "✅ Table Created:"
echo " - synced_events (with constraints and indexes)"
echo ""
echo "✅ Indexes Created:"
echo " - idx_synced_events_primary"
echo " - idx_synced_events_secondary"
echo " - idx_synced_events_last_checked"
echo ""
echo "🎯 Database ready for calendar sync operations"
echo "============================================================"
echo ""
echo "Verification Results:"
echo '{{ outputs.verify_schema.rows | json }}'
# ============================================================================
# Notes
# ============================================================================
# - Run this flow once before starting sync operations
# - Safe to run multiple times (CREATE IF NOT EXISTS)
# - Uses Kestra PostgreSQL JDBC plugin
# - Requires secrets: POSTGRES_HOST, POSTGRES_USER, POSTGRES_PASSWD, CAL_SYNCER_POSTGRES_DB
# - Database name: set via CAL_SYNCER_POSTGRES_DB secret (cal_syncer_dev or cal_syncer)
# - Verify with: SELECT COUNT(*) FROM synced_events;
id: sync_google_calendars
namespace: company.cal_syncer
concurrency:
limit: 1
description: |
Bidirectional Google Calendar sync between Calendar A and Calendar B calendars.
Runs every 15 minutes, syncs events for next 180 days (6 months), tracks in PostgreSQL database.
Environment-aware: Uses secrets for calendar IDs and database.
- Dev: Test calendars, cal_syncer_dev database
- Prod: Production calendars, cal_syncer database
inputs:
- id: calendar_a
type: STRING
defaults: "{{ secret('CALENDAR_A_ID') }}"
- id: calendar_b
type: STRING
defaults: "{{ secret('CALENDAR_B_ID') }}"
- id: days_ahead
type: INT
defaults: 180
- id: max_events
type: INT
defaults: 500
description: "Max events to fetch per calendar (default 500, prod trigger uses 2500)"
tasks:
- id: working_directory
type: io.kestra.plugin.core.flow.WorkingDirectory
cache:
patterns:
- node_modules/**
ttl: P30D
tasks:
# ============================================================================
# Step 1: Fetch events from both calendars
# ============================================================================
- id: fetch_calendar_a_events
type: io.kestra.plugin.googleworkspace.calendar.ListEvents
serviceAccount: "{{ secret('GOOGLE_SERVICE_ACCOUNT') }}"
calendarId: "{{ inputs.calendar_a }}"
timeMin: "{{ now() | dateAdd(-1, 'DAYS') }}"
timeMax: "{{ now() | dateAdd(inputs.days_ahead, 'DAYS') }}"
maxResults: "{{ inputs.max_events }}"
singleEvents: true
orderBy: startTime
- id: fetch_calendar_b_events
type: io.kestra.plugin.googleworkspace.calendar.ListEvents
serviceAccount: "{{ secret('GOOGLE_SERVICE_ACCOUNT') }}"
calendarId: "{{ inputs.calendar_b }}"
timeMin: "{{ now() | dateAdd(-1, 'DAYS') }}"
timeMax: "{{ now() | dateAdd(inputs.days_ahead, 'DAYS') }}"
maxResults: "{{ inputs.max_events }}"
singleEvents: true
orderBy: startTime
- id: save_calendar_a_events
type: io.kestra.plugin.scripts.node.Script
taskRunner:
type: io.kestra.plugin.core.runner.Process
inputFiles:
events_input.json: "{{ outputs.fetch_calendar_a_events.metadataList | json }}"
script: |
const fs = require('fs');
const eventList = JSON.parse(fs.readFileSync('events_input.json', 'utf8'));
fs.writeFileSync('calendar_a_events.json', JSON.stringify(eventList, null, 2));
console.log(`📊 Saved ${eventList.length} calendar A events with full metadata`);
- id: save_calendar_b_events_and_npm_install
type: io.kestra.plugin.scripts.node.Script
taskRunner:
type: io.kestra.plugin.core.runner.Process
beforeCommands:
- npm install # do this for future tasks
inputFiles:
package.json: |
{
"dependencies": {
"googleapis": "^143.0.0",
"pg": "^8.13.1"
}
}
events_input.json: "{{ outputs.fetch_calendar_b_events.metadataList | json }}"
script: |
const fs = require('fs');
const eventList = JSON.parse(fs.readFileSync('events_input.json', 'utf8'));
fs.writeFileSync('calendar_b_events.json', JSON.stringify(eventList, null, 2));
console.log(`📊 Saved ${eventList.length} calendar B events with full metadata`);
# ============================================================================
# Step 2: Load existing sync records from PostgreSQL
# ============================================================================
- id: load_synced_events
type: io.kestra.plugin.jdbc.postgresql.Query
url: "jdbc:postgresql://{{ secret('POSTGRES_HOST') }}/{{ secret('CAL_SYNCER_POSTGRES_DB') }}"
username: "{{ secret('POSTGRES_USER') }}"
password: "{{ secret('POSTGRES_PASSWD') }}"
sql: |
SELECT
id,
primary_event_id,
primary_calendar,
secondary_event_id,
secondary_calendar,
event_signature,
event_summary
FROM synced_events
fetch: true
- id: save_synced_events
type: io.kestra.plugin.scripts.node.Script
taskRunner:
type: io.kestra.plugin.core.runner.Process
inputFiles:
synced_data_input.json: "{{ outputs.load_synced_events.rows | json }}"
script: |
const fs = require('fs');
const syncedEvents = JSON.parse(fs.readFileSync('synced_data_input.json', 'utf8'));
fs.writeFileSync('synced_events.json', JSON.stringify(syncedEvents, null, 2));
console.log(`✅ Loaded ${syncedEvents.length} existing sync records from PostgreSQL`);
# ============================================================================
# Step 3: Process Calendar A → Calendar B sync
# ============================================================================
- id: sync_calendar_a_to_b
type: io.kestra.plugin.scripts.node.Script
taskRunner:
type: io.kestra.plugin.core.runner.Process
script: |
const fs = require('fs');
// Load synced events
const synced = JSON.parse(fs.readFileSync('synced_events.json', 'utf8'));
// Load calendar A events (metadataList is already an array)
const calendarAEvents = JSON.parse(fs.readFileSync('calendar_a_events.json', 'utf8'));
const calendarA = "{{ inputs.calendar_a }}";
const calendarB = "{{ inputs.calendar_b }}";
// Build lookup map of already synced events (full records, not just IDs)
const syncedEventsMap = new Map(
synced.map(s => [`${s.primary_event_id}||${s.primary_calendar}`, s])
);
// Helper function to build event signature
function buildSignature(event) {
return `${event.summary}|${JSON.stringify(event.start)}|${JSON.stringify(event.end)}|${event.location || ''}|${event.transparency || 'opaque'}|${event.hangoutLink || ''}`;
}
const eventsToSync = [];
const eventsToUpdate = [];
for (const event of calendarAEvents) {
const eventId = event.id;
if (!eventId) continue;
const syncKey = `${eventId}||${calendarA}`;
const syncedEvent = syncedEventsMap.get(syncKey);
// Check 1: Is this itself a synced event?
const description = event.description || '';
if (description.includes('🔄 SYNCED FROM:')) {
continue;
}
// Check 2: Is target calendar already invited as attendee?
const attendees = event.attendees || [];
const attendeeEmails = attendees.map(a => a.email);
if (attendeeEmails.includes(calendarB)) {
console.log(`⏭️ Skipping "${event.summary}" - calendar B already invited as attendee`);
continue;
}
// Check 3: Already synced?
if (syncedEvent) {
// Calculate current signature
const currentSignature = buildSignature(event);
// Compare with stored signature
if (currentSignature !== syncedEvent.event_signature) {
// Signature changed → needs update
eventsToUpdate.push({
primary_event: event,
synced_record: syncedEvent,
new_signature: currentSignature
});
console.log(`📝 Update detected: ${event.summary}`);
}
continue; // Don't add to new sync list
}
// New event needs syncing
eventsToSync.push(event);
}
// Save for next tasks
fs.writeFileSync('calendar_a_to_sync.json', JSON.stringify(eventsToSync, null, 2));
fs.writeFileSync('calendar_a_updates.json', JSON.stringify(eventsToUpdate, null, 2));
console.log(`\n=== Calendar A Analysis ===`);
console.log(`📊 New events to sync: ${eventsToSync.length}`);
console.log(`📝 Events needing updates: ${eventsToUpdate.length}`);
- id: create_calendar_a_synced_events
type: io.kestra.plugin.scripts.node.Script
taskRunner:
type: io.kestra.plugin.core.runner.Process
script: |
const fs = require('fs');
const events = JSON.parse(fs.readFileSync('calendar_a_to_sync.json', 'utf8'));
const calendarA = "{{ inputs.calendar_a }}";
const calendarB = "{{ inputs.calendar_b }}";
const createdEvents = [];
for (const event of events) {
const summary = event.summary || 'No Title';
const originalDesc = event.description || '';
const now = new Date().toISOString();
// Extract room names from attendees (resource === true)
const roomNames = (event.attendees || [])
.filter(attendee => attendee.resource === true)
.map(attendee => attendee.displayName || attendee.email)
.join(', ');
// Normalize date/time format for Google Calendar API
const normalizeDateTime = (dateObj) => {
if (!dateObj) return null;
// If it's already a simple object with dateTime string or date string, return as-is
if (typeof dateObj.dateTime === 'string' || typeof dateObj.date === 'string') {
return dateObj;
}
// Convert complex dateTime object to ISO string
if (dateObj.dateTime && typeof dateObj.dateTime === 'object') {
const timestamp = dateObj.dateTime.value;
const dateTimeStr = new Date(timestamp).toISOString();
return {
dateTime: dateTimeStr,
timeZone: dateObj.timeZone || 'UTC'
};
}
// Handle date-only events
if (dateObj.date && typeof dateObj.date === 'object') {
const timestamp = dateObj.date.value;
const dateStr = new Date(timestamp).toISOString().split('T')[0];
return { date: dateStr };
}
return dateObj;
};
const normalizedStart = normalizeDateTime(event.start);
const normalizedEnd = normalizeDateTime(event.end);
// Handle room placement: if location exists, add to description; otherwise add to location
let finalLocation = event.location || '';
// Build description in order: Room, Meeting Link, ---, Original Description, ---, Sync metadata
let descriptionParts = [];
if (roomNames && event.location) {
// Location exists → add room to description
descriptionParts.push(`📍 Room: ${roomNames}`);
} else if (roomNames) {
// No location → put room in location field
finalLocation = roomNames;
}
// Add hangout link to description if it exists
if (event.hangoutLink) {
descriptionParts.push(`🔗 Meeting Link: ${event.hangoutLink}`);
}
// Add separator if we have room or meeting link
if (descriptionParts.length > 0) {
descriptionParts.push('---');
}
// Add original description
if (originalDesc) {
descriptionParts.push(originalDesc);
descriptionParts.push('---');
}
const finalDescription = descriptionParts.join('\n');
// Build synced event
const syncedEvent = {
summary: `[Calendar A] ${summary}`,
description: `${finalDescription}\n🔄 SYNCED FROM: ${calendarA}\n📅 PRIMARY EVENT ID: ${event.id}\n⏰ SYNCED AT: ${now}`,
start: normalizedStart,
end: normalizedEnd,
location: finalLocation,
transparency: event.transparency || 'opaque',
visibility: 'default',
conferenceData: event.conferenceData,
hangoutLink: event.hangoutLink,
// Store metadata for database insert
_primary_event_id: event.id,
_primary_calendar: calendarA
};
createdEvents.push(syncedEvent);
}
fs.writeFileSync('to_create.json', JSON.stringify(createdEvents, null, 2));
console.log(`✅ Prepared ${createdEvents.length} events for creation on calendar B`);
- id: create_events_on_calendar_b
type: io.kestra.plugin.scripts.node.Script
taskRunner:
type: io.kestra.plugin.core.runner.Process
env:
SERVICE_ACCOUNT_JSON: "{{ secret('GOOGLE_SERVICE_ACCOUNT') }}"
CALENDAR_ID: "{{ inputs.calendar_b }}"
POSTGRES_HOST: "{{ secret('POSTGRES_HOST') }}"
POSTGRES_DB: "{{ secret('CAL_SYNCER_POSTGRES_DB') }}"
POSTGRES_USER: "{{ secret('POSTGRES_USER') }}"
POSTGRES_PASSWORD: "{{ secret('POSTGRES_PASSWD') }}"
script: |
const fs = require('fs');
const { google } = require('googleapis');
const { Pool } = require('pg');
// Kestra auto-decodes base64 secrets, so parse JSON directly
const credentials = JSON.parse(process.env.SERVICE_ACCOUNT_JSON);
// Authenticate Google Calendar
const auth = new google.auth.GoogleAuth({
credentials,
scopes: ['https://www.googleapis.com/auth/calendar']
});
const calendar = google.calendar({ version: 'v3', auth });
const calendarId = process.env.CALENDAR_ID;
// Setup PostgreSQL connection - parse host:port format
const hostParts = process.env.POSTGRES_HOST.split(':');
const pool = new Pool({
host: hostParts[0],
port: hostParts[1] ? parseInt(hostParts[1]) : 5432,
database: process.env.POSTGRES_DB,
user: process.env.POSTGRES_USER,
password: process.env.POSTGRES_PASSWORD
});
// Load events to create
const events = JSON.parse(fs.readFileSync('to_create.json', 'utf8'));
const created = [];
async function createEvents() {
for (const event of events) {
// Extract metadata
const primaryEventId = event._primary_event_id;
const primaryCalendar = event._primary_calendar;
delete event._primary_event_id;
delete event._primary_calendar;
try {
// 1. Create event in Google Calendar
const result = await calendar.events.insert({
calendarId,
resource: event
});
const startTime = event.start.dateTime || event.start.date;
const endTime = event.end.dateTime || event.end.date;
const signature = `${event.summary}|${JSON.stringify(event.start)}|${JSON.stringify(event.end)}|${event.location || ''}|${event.transparency || 'opaque'}`;
const now = new Date().toISOString();
// 2. Immediately write to database (atomic per event)
try {
await pool.query(`
INSERT INTO synced_events (
primary_calendar, primary_event_id, secondary_calendar, secondary_event_id,
event_summary, event_start, event_end, event_signature,
created_at, last_updated, last_checked
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
`, [
primaryCalendar, primaryEventId, calendarId, result.data.id,
event.summary, startTime, endTime, signature,
now, now, now
]);
created.push({
primary_event_id: primaryEventId,
primary_calendar: primaryCalendar,
secondary_event_id: result.data.id,
secondary_calendar: calendarId,
event_summary: event.summary,
event_start: startTime,
event_end: endTime,
event_signature: signature
});
console.log(`✅ Created & saved: ${event.summary} (${startTime})`);
} catch (dbError) {
// GCal succeeded but DB failed - log orphaned event for manual recovery
console.error(`⚠️ Created in GCal but DB failed for ${event.summary} (${startTime}): ${dbError.message}`);
console.error(` Orphaned GCal event ID: ${result.data.id}`);
}
} catch (error) {
const startTime = event.start.dateTime || event.start.date;
console.error(`❌ Failed to create ${event.summary} (${startTime}): ${error.message}`);
}
}
await pool.end();
// Save for summary
fs.writeFileSync('calendar_a_created_events.json', JSON.stringify(created, null, 2));
console.log(`\n📊 Summary: ${created.length}/${events.length} events created and saved to DB`);
}
createEvents().catch(err => {
console.error('Fatal error:', err);
process.exit(1);
});
- id: update_calendar_a_events_on_b
type: io.kestra.plugin.scripts.node.Script
description: "Update existing synced events on Calendar B when Calendar A events change"
taskRunner:
type: io.kestra.plugin.core.runner.Process
env:
GOOGLE_SERVICE_ACCOUNT: "{{ secret('GOOGLE_SERVICE_ACCOUNT') }}"
POSTGRES_HOST: "{{ secret('POSTGRES_HOST') }}"
POSTGRES_DB: "{{ secret('CAL_SYNCER_POSTGRES_DB') }}"
POSTGRES_USER: "{{ secret('POSTGRES_USER') }}"
POSTGRES_PASSWORD: "{{ secret('POSTGRES_PASSWD') }}"
script: |
const { google } = require('googleapis');
const { Pool } = require('pg');
const fs = require('fs');
// Normalize date/time format for Google Calendar API
function normalizeDateTime(dateObj) {
if (!dateObj) return null;
if (typeof dateObj.dateTime === 'string' || typeof dateObj.date === 'string') {
return dateObj;
}
if (dateObj.dateTime && typeof dateObj.dateTime === 'object') {
const timestamp = dateObj.dateTime.value;
const dateTimeStr = new Date(timestamp).toISOString();
return { dateTime: dateTimeStr, timeZone: dateObj.timeZone || 'UTC' };
}
if (dateObj.date && typeof dateObj.date === 'object') {
const timestamp = dateObj.date.value;
const dateStr = new Date(timestamp).toISOString().split('T')[0];
return { date: dateStr };
}
return dateObj;
}
async function updateEvents() {
const eventsToUpdate = JSON.parse(fs.readFileSync('calendar_a_updates.json', 'utf8'));
if (eventsToUpdate.length === 0) {
console.log('No Calendar A events need updates on Calendar B');
fs.writeFileSync('calendar_a_update_results.json', JSON.stringify([], null, 2));
fs.writeFileSync('calendar_a_update_errors.json', JSON.stringify([], null, 2));
return;
}
console.log(`\n📝 Updating ${eventsToUpdate.length} events on calendar B...\n`);
const serviceAccountJson = process.env.GOOGLE_SERVICE_ACCOUNT;
const auth = new google.auth.GoogleAuth({
credentials: JSON.parse(serviceAccountJson),
scopes: ['https://www.googleapis.com/auth/calendar'],
});
const calendar = google.calendar({ version: 'v3', auth });
// Setup PostgreSQL connection
// Parse host:port format
const hostParts = process.env.POSTGRES_HOST.split(':');
const pool = new Pool({
host: hostParts[0],
port: hostParts[1] ? parseInt(hostParts[1]) : 5432,
database: process.env.POSTGRES_DB,
user: process.env.POSTGRES_USER,
password: process.env.POSTGRES_PASSWORD
});
const updateResults = [];
const updateErrors = [];
for (const update of eventsToUpdate) {
try {
const primaryEvent = update.primary_event;
const syncedRecord = update.synced_record;
const normalizedStart = normalizeDateTime(primaryEvent.start);
const normalizedEnd = normalizeDateTime(primaryEvent.end);
// Extract room names from attendees (resource === true)
const roomNames = (primaryEvent.attendees || [])
.filter(attendee => attendee.resource === true)
.map(attendee => attendee.displayName || attendee.email)
.join(', ');
// Handle room placement: if location exists, add to description; otherwise add to location
let finalLocation = primaryEvent.location || '';
const originalDesc = primaryEvent.description || '';
// Build description in order: Room, Meeting Link, ---, Original Description, ---, Sync metadata
let descriptionParts = [];
if (roomNames && primaryEvent.location) {
// Location exists → add room to description
descriptionParts.push(`📍 Room: ${roomNames}`);
} else if (roomNames) {
// No location → put room in location field
finalLocation = roomNames;
}
// Add hangout link to description if it exists
if (primaryEvent.hangoutLink) {
descriptionParts.push(`🔗 Meeting Link: ${primaryEvent.hangoutLink}`);
}
// Add separator if we have room or meeting link
if (descriptionParts.length > 0) {
descriptionParts.push('---');
}
// Add original description
if (originalDesc) {
descriptionParts.push(originalDesc);
descriptionParts.push('---');
}
const finalDescription = descriptionParts.join('\n');
const updatedEvent = {
summary: `[Calendar A] ${primaryEvent.summary}`,
description: `${finalDescription}\n🔄 SYNCED FROM: Calendar A`,
start: normalizedStart,
end: normalizedEnd,
location: finalLocation,
transparency: primaryEvent.transparency || 'opaque',
conferenceData: primaryEvent.conferenceData,
hangoutLink: primaryEvent.hangoutLink,
};
// 1. Update in Google Calendar
await calendar.events.update({
calendarId: syncedRecord.secondary_calendar,
eventId: syncedRecord.secondary_event_id,
requestBody: updatedEvent,
});
const startTime = normalizedStart.dateTime || normalizedStart.date;
const endTime = normalizedEnd.dateTime || normalizedEnd.date;
const now = new Date().toISOString();
// 2. Immediately update database
try {
await pool.query(`
UPDATE synced_events
SET event_summary = $1, event_start = $2, event_end = $3,
event_signature = $4, last_updated = $5, last_checked = $6
WHERE primary_event_id = $7 AND primary_calendar = $8
`, [
primaryEvent.summary, startTime, endTime,
update.new_signature, now, now,
syncedRecord.primary_event_id, syncedRecord.primary_calendar
]);
updateResults.push({
primary_event_id: syncedRecord.primary_event_id,
primary_calendar: syncedRecord.primary_calendar,
secondary_event_id: syncedRecord.secondary_event_id,
secondary_calendar: syncedRecord.secondary_calendar,
event_summary: primaryEvent.summary,
event_start: startTime,
event_end: endTime,
event_signature: update.new_signature,
updated_at: now,
});
console.log(`✅ Updated & saved: ${primaryEvent.summary} (${startTime})`);
} catch (dbError) {
console.error(`⚠️ Updated in GCal but DB failed for ${primaryEvent.summary} (${startTime}): ${dbError.message}`);
updateErrors.push({ event_id: syncedRecord.primary_event_id, error: `DB: ${dbError.message}` });
}
} catch (error) {
console.error(`❌ Failed to update event ${update.synced_record.primary_event_id}:`, error.message);
updateErrors.push({ event_id: update.synced_record.primary_event_id, error: error.message });
}
}
await pool.end();
fs.writeFileSync('calendar_a_update_results.json', JSON.stringify(updateResults, null, 2));
fs.writeFileSync('calendar_a_update_errors.json', JSON.stringify(updateErrors, null, 2));
console.log(`\n📊 Update Summary: ✅ ${updateResults.length} | ❌ ${updateErrors.length}`);
}
updateEvents().catch(err => {
console.error('Fatal error:', err);
process.exit(1);
});
# ============================================================================
# Step 4: Process Calendar B → Calendar A sync (same logic, reversed)
# ============================================================================
- id: sync_calendar_b_to_a
type: io.kestra.plugin.scripts.node.Script
taskRunner:
type: io.kestra.plugin.core.runner.Process
script: |
const fs = require('fs');
const synced = JSON.parse(fs.readFileSync('synced_events.json', 'utf8'));
// Load calendar B events (metadataList is already an array)
const calendarBEvents = JSON.parse(fs.readFileSync('calendar_b_events.json', 'utf8'));
const calendarA = "{{ inputs.calendar_a }}";
const calendarB = "{{ inputs.calendar_b }}";
// Build lookup map of already synced events (full records, not just IDs)
const syncedEventsMap = new Map(
synced.map(s => [`${s.primary_event_id}||${s.primary_calendar}`, s])
);
// Helper function to build event signature
function buildSignature(event) {
return `${event.summary}|${JSON.stringify(event.start)}|${JSON.stringify(event.end)}|${event.location || ''}|${event.transparency || 'opaque'}|${event.hangoutLink || ''}`;
}
const eventsToSync = [];
const eventsToUpdate = [];
for (const event of calendarBEvents) {
const eventId = event.id;
if (!eventId) continue;
const syncKey = `${eventId}||${calendarB}`;
const syncedEvent = syncedEventsMap.get(syncKey);
// Check 1: Is this itself a synced event?
const description = event.description || '';
if (description.includes('🔄 SYNCED FROM:')) {
continue;
}
// Check 2: Is target calendar already invited as attendee?
const attendees = event.attendees || [];
const attendeeEmails = attendees.map(a => a.email);
if (attendeeEmails.includes(calendarA)) {
console.log(`⏭️ Skipping "${event.summary}" - calendar A already invited as attendee`);
continue;
}
// Check 3: Already synced?
if (syncedEvent) {
// Calculate current signature
const currentSignature = buildSignature(event);
// Compare with stored signature
if (currentSignature !== syncedEvent.event_signature) {
// Signature changed → needs update
eventsToUpdate.push({
primary_event: event,
synced_record: syncedEvent,
new_signature: currentSignature
});
console.log(`📝 Update detected: ${event.summary}`);
}
continue; // Don't add to new sync list
}
// New event needs syncing
eventsToSync.push(event);
}
// Save for next tasks
fs.writeFileSync('calendar_b_to_sync.json', JSON.stringify(eventsToSync, null, 2));
fs.writeFileSync('calendar_b_updates.json', JSON.stringify(eventsToUpdate, null, 2));
console.log(`\n=== Calendar B Analysis ===`);
console.log(`📊 New events to sync: ${eventsToSync.length}`);
console.log(`📝 Events needing updates: ${eventsToUpdate.length}`);
- id: create_calendar_b_synced_events
type: io.kestra.plugin.scripts.node.Script
taskRunner:
type: io.kestra.plugin.core.runner.Process
script: |
const fs = require('fs');
const events = JSON.parse(fs.readFileSync('calendar_b_to_sync.json', 'utf8'));
const calendarB = "{{ inputs.calendar_b }}";
const calendarA = "{{ inputs.calendar_a }}";
const createdEvents = [];
for (const event of events) {
const summary = event.summary || 'No Title';
const originalDesc = event.description || '';
const now = new Date().toISOString();
// Extract room names from attendees (resource === true)
const roomNames = (event.attendees || [])
.filter(attendee => attendee.resource === true)
.map(attendee => attendee.displayName || attendee.email)
.join(', ');
// Normalize date/time format for Google Calendar API
const normalizeDateTime = (dateObj) => {
if (!dateObj) return null;
// If it's already a simple object with dateTime string or date string, return as-is
if (typeof dateObj.dateTime === 'string' || typeof dateObj.date === 'string') {
return dateObj;
}
// Convert complex dateTime object to ISO string
if (dateObj.dateTime && typeof dateObj.dateTime === 'object') {
const timestamp = dateObj.dateTime.value;
const dateTimeStr = new Date(timestamp).toISOString();
return {
dateTime: dateTimeStr,
timeZone: dateObj.timeZone || 'UTC'
};
}
// Handle date-only events
if (dateObj.date && typeof dateObj.date === 'object') {
const timestamp = dateObj.date.value;
const dateStr = new Date(timestamp).toISOString().split('T')[0];
return { date: dateStr };
}
return dateObj;
};
const normalizedStart = normalizeDateTime(event.start);
const normalizedEnd = normalizeDateTime(event.end);
// Handle room placement: if location exists, add to description; otherwise add to location
let finalLocation = event.location || '';
// Build description in order: Room, Meeting Link, ---, Original Description, ---, Sync metadata
let descriptionParts = [];
if (roomNames && event.location) {
// Location exists → add room to description
descriptionParts.push(`📍 Room: ${roomNames}`);
} else if (roomNames) {
// No location → put room in location field
finalLocation = roomNames;
}
// Add hangout link to description if it exists
if (event.hangoutLink) {
descriptionParts.push(`🔗 Meeting Link: ${event.hangoutLink}`);
}
// Add separator if we have room or meeting link
if (descriptionParts.length > 0) {
descriptionParts.push('---');
}
// Add original description
if (originalDesc) {
descriptionParts.push(originalDesc);
descriptionParts.push('---');
}
const finalDescription = descriptionParts.join('\n');
const syncedEvent = {
summary: `[Calendar B] ${summary}`,
description: `${finalDescription}\n🔄 SYNCED FROM: ${calendarB}\n📅 PRIMARY EVENT ID: ${event.id}\n⏰ SYNCED AT: ${now}`,
start: normalizedStart,
end: normalizedEnd,
location: finalLocation,
transparency: event.transparency || 'opaque',
visibility: 'default',
conferenceData: event.conferenceData,
hangoutLink: event.hangoutLink,
_primary_event_id: event.id,
_primary_calendar: calendarB
};
createdEvents.push(syncedEvent);
}
fs.writeFileSync('to_create.json', JSON.stringify(createdEvents, null, 2));
console.log(`✅ Prepared ${createdEvents.length} events for creation on calendar A`);
- id: create_events_on_calendar_a
type: io.kestra.plugin.scripts.node.Script
taskRunner:
type: io.kestra.plugin.core.runner.Process
env:
SERVICE_ACCOUNT_JSON: "{{ secret('GOOGLE_SERVICE_ACCOUNT') }}"
CALENDAR_ID: "{{ inputs.calendar_a }}"
POSTGRES_HOST: "{{ secret('POSTGRES_HOST') }}"
POSTGRES_DB: "{{ secret('CAL_SYNCER_POSTGRES_DB') }}"
POSTGRES_USER: "{{ secret('POSTGRES_USER') }}"
POSTGRES_PASSWORD: "{{ secret('POSTGRES_PASSWD') }}"
script: |
const fs = require('fs');
const { google } = require('googleapis');
const { Pool } = require('pg');
const credentials = JSON.parse(process.env.SERVICE_ACCOUNT_JSON);
const auth = new google.auth.GoogleAuth({
credentials,
scopes: ['https://www.googleapis.com/auth/calendar']
});
const calendar = google.calendar({ version: 'v3', auth });
const calendarId = process.env.CALENDAR_ID;
const events = JSON.parse(fs.readFileSync('to_create.json', 'utf8'));
// Setup PostgreSQL connection
// Parse host:port format
const hostParts = process.env.POSTGRES_HOST.split(':');
const pool = new Pool({
host: hostParts[0],
port: hostParts[1] ? parseInt(hostParts[1]) : 5432,
database: process.env.POSTGRES_DB,
user: process.env.POSTGRES_USER,
password: process.env.POSTGRES_PASSWORD
});
const created = [];
async function createEvents() {
for (const event of events) {
const primaryEventId = event._primary_event_id;
const primaryCalendar = event._primary_calendar;
delete event._primary_event_id;
delete event._primary_calendar;
try {
// 1. Create event in Google Calendar
const result = await calendar.events.insert({
calendarId,
resource: event
});
const startTime = event.start.dateTime || event.start.date;
const endTime = event.end.dateTime || event.end.date;
const signature = `${event.summary}|${JSON.stringify(event.start)}|${JSON.stringify(event.end)}|${event.location || ''}|${event.transparency || 'opaque'}`;
const now = new Date().toISOString();
// 2. Immediately write to database (atomic per event)
try {
await pool.query(`
INSERT INTO synced_events (
primary_calendar, primary_event_id, secondary_calendar, secondary_event_id,
event_summary, event_start, event_end, event_signature,
created_at, last_updated, last_checked
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
`, [
primaryCalendar, primaryEventId, calendarId, result.data.id,
event.summary, startTime, endTime, signature,
now, now, now
]);
created.push({
primary_event_id: primaryEventId,
primary_calendar: primaryCalendar,
secondary_event_id: result.data.id,
secondary_calendar: calendarId,
event_summary: event.summary,
event_start: startTime,
event_end: endTime,
event_signature: signature
});
console.log(`✅ Created & saved: ${event.summary} (${startTime})`);
} catch (dbError) {
// GCal succeeded but DB failed - log orphaned event for manual recovery
console.error(`⚠️ Created in GCal but DB failed for ${event.summary} (${startTime}): ${dbError.message}`);
console.error(` Orphaned GCal event ID: ${result.data.id}`);
}
} catch (error) {
const startTime = event.start.dateTime || event.start.date;
console.error(`❌ Failed to create ${event.summary} (${startTime}): ${error.message}`);
}
}
await pool.end();
fs.writeFileSync('calendar_b_created_events.json', JSON.stringify(created, null, 2));
console.log(`\n📊 Summary: ${created.length}/${events.length} events created and saved to DB`);
}
createEvents().catch(err => {
console.error('Fatal error:', err);
process.exit(1);
});
- id: update_calendar_b_events_on_a
type: io.kestra.plugin.scripts.node.Script
description: "Update existing synced events on Calendar A when Calendar B events change"
taskRunner:
type: io.kestra.plugin.core.runner.Process
env:
GOOGLE_SERVICE_ACCOUNT: "{{ secret('GOOGLE_SERVICE_ACCOUNT') }}"
POSTGRES_HOST: "{{ secret('POSTGRES_HOST') }}"
POSTGRES_DB: "{{ secret('CAL_SYNCER_POSTGRES_DB') }}"
POSTGRES_USER: "{{ secret('POSTGRES_USER') }}"
POSTGRES_PASSWORD: "{{ secret('POSTGRES_PASSWD') }}"
script: |
const { google } = require('googleapis');
const { Pool } = require('pg');
const fs = require('fs');
function normalizeDateTime(dateObj) {
if (!dateObj) return null;
if (typeof dateObj.dateTime === 'string' || typeof dateObj.date === 'string') {
return dateObj;
}
if (dateObj.dateTime && typeof dateObj.dateTime === 'object') {
const timestamp = dateObj.dateTime.value;
const dateTimeStr = new Date(timestamp).toISOString();
return { dateTime: dateTimeStr, timeZone: dateObj.timeZone || 'UTC' };
}
if (dateObj.date && typeof dateObj.date === 'object') {
const timestamp = dateObj.date.value;
const dateStr = new Date(timestamp).toISOString().split('T')[0];
return { date: dateStr };
}
return dateObj;
}
async function updateEvents() {
const eventsToUpdate = JSON.parse(fs.readFileSync('calendar_b_updates.json', 'utf8'));
if (eventsToUpdate.length === 0) {
console.log('No calendar B events need updates on Calendar A');
fs.writeFileSync('calendar_b_update_results.json', JSON.stringify([], null, 2));
fs.writeFileSync('calendar_b_update_errors.json', JSON.stringify([], null, 2));
return;
}
console.log(`\n📝 Updating ${eventsToUpdate.length} events on calendar A...\n`);
const serviceAccountJson = process.env.GOOGLE_SERVICE_ACCOUNT;
const auth = new google.auth.GoogleAuth({
credentials: JSON.parse(serviceAccountJson),
scopes: ['https://www.googleapis.com/auth/calendar'],
});
const calendar = google.calendar({ version: 'v3', auth });
// Setup PostgreSQL connection
// Parse host:port format
const hostParts = process.env.POSTGRES_HOST.split(':');
const pool = new Pool({
host: hostParts[0],
port: hostParts[1] ? parseInt(hostParts[1]) : 5432,
database: process.env.POSTGRES_DB,
user: process.env.POSTGRES_USER,
password: process.env.POSTGRES_PASSWORD
});
const updateResults = [];
const updateErrors = [];
for (const update of eventsToUpdate) {
try {
const primaryEvent = update.primary_event;
const syncedRecord = update.synced_record;
const normalizedStart = normalizeDateTime(primaryEvent.start);
const normalizedEnd = normalizeDateTime(primaryEvent.end);
// Extract room names from attendees (resource === true)
const roomNames = (primaryEvent.attendees || [])
.filter(attendee => attendee.resource === true)
.map(attendee => attendee.displayName || attendee.email)
.join(', ');
// Handle room placement: if location exists, add to description; otherwise add to location
let finalLocation = primaryEvent.location || '';
const originalDesc = primaryEvent.description || '';
// Build description in order: Room, Meeting Link, ---, Original Description, ---, Sync metadata
let descriptionParts = [];
if (roomNames && primaryEvent.location) {
// Location exists → add room to description
descriptionParts.push(`📍 Room: ${roomNames}`);
} else if (roomNames) {
// No location → put room in location field
finalLocation = roomNames;
}
// Add hangout link to description if it exists
if (primaryEvent.hangoutLink) {
descriptionParts.push(`🔗 Meeting Link: ${primaryEvent.hangoutLink}`);
}
// Add separator if we have room or meeting link
if (descriptionParts.length > 0) {
descriptionParts.push('---');
}
// Add original description
if (originalDesc) {
descriptionParts.push(originalDesc);
descriptionParts.push('---');
}
const finalDescription = descriptionParts.join('\n');
const updatedEvent = {
summary: `[Calendar B] ${primaryEvent.summary}`,
description: `${finalDescription}\n🔄 SYNCED FROM: Calendar B`,
start: normalizedStart,
end: normalizedEnd,
location: finalLocation,
transparency: primaryEvent.transparency || 'opaque',
conferenceData: primaryEvent.conferenceData,
hangoutLink: primaryEvent.hangoutLink,
};
// 1. Update in Google Calendar
await calendar.events.update({
calendarId: syncedRecord.secondary_calendar,
eventId: syncedRecord.secondary_event_id,
requestBody: updatedEvent,
});
const startTime = normalizedStart.dateTime || normalizedStart.date;
const endTime = normalizedEnd.dateTime || normalizedEnd.date;
const now = new Date().toISOString();
// 2. Immediately update database
try {
await pool.query(`
UPDATE synced_events
SET event_summary = $1, event_start = $2, event_end = $3,
event_signature = $4, last_updated = $5, last_checked = $6
WHERE primary_event_id = $7 AND primary_calendar = $8
`, [
primaryEvent.summary, startTime, endTime,
update.new_signature, now, now,
syncedRecord.primary_event_id, syncedRecord.primary_calendar
]);
updateResults.push({
primary_event_id: syncedRecord.primary_event_id,
primary_calendar: syncedRecord.primary_calendar,
secondary_event_id: syncedRecord.secondary_event_id,
secondary_calendar: syncedRecord.secondary_calendar,
event_summary: primaryEvent.summary,
event_start: startTime,
event_end: endTime,
event_signature: update.new_signature,
updated_at: now,
});
console.log(`✅ Updated & saved: ${primaryEvent.summary} (${startTime})`);
} catch (dbError) {
console.error(`⚠️ Updated in GCal but DB failed for ${primaryEvent.summary} (${startTime}): ${dbError.message}`);
updateErrors.push({ event_id: syncedRecord.primary_event_id, error: `DB: ${dbError.message}` });
}
} catch (error) {
console.error(`❌ Failed to update event ${update.synced_record.primary_event_id}:`, error.message);
updateErrors.push({ event_id: update.synced_record.primary_event_id, error: error.message });
}
}
await pool.end();
fs.writeFileSync('calendar_b_update_results.json', JSON.stringify(updateResults, null, 2));
fs.writeFileSync('calendar_b_update_errors.json', JSON.stringify(updateErrors, null, 2));
console.log(`\n📊 Update Summary: ✅ ${updateResults.length} | ❌ ${updateErrors.length}`);
}
updateEvents().catch(err => {
console.error('Fatal error:', err);
process.exit(1);
});
# ============================================================================
# Step 5: Detect and delete removed events
# ============================================================================
- id: detect_and_delete_removed_events
type: io.kestra.plugin.scripts.node.Script
taskRunner:
type: io.kestra.plugin.core.runner.Process
env:
SERVICE_ACCOUNT_JSON: "{{ secret('GOOGLE_SERVICE_ACCOUNT') }}"
POSTGRES_HOST: "{{ secret('POSTGRES_HOST') }}"
POSTGRES_DB: "{{ secret('CAL_SYNCER_POSTGRES_DB') }}"
POSTGRES_USER: "{{ secret('POSTGRES_USER') }}"
POSTGRES_PASSWORD: "{{ secret('POSTGRES_PASSWD') }}"
script: |
const fs = require('fs');
const { google } = require('googleapis');
const { Pool } = require('pg');
// Load data
const syncedEvents = JSON.parse(fs.readFileSync('synced_events.json', 'utf8'));
const calendarAEvents = JSON.parse(fs.readFileSync('calendar_a_events.json', 'utf8'));
const calendarBEvents = JSON.parse(fs.readFileSync('calendar_b_events.json', 'utf8'));
// Build sets of current event IDs
const currentCalendarAIds = new Set(calendarAEvents.map(e => e.id).filter(Boolean));
const currentCalendarBIds = new Set(calendarBEvents.map(e => e.id).filter(Boolean));
const calendarA = "{{ inputs.calendar_a }}";
const calendarB = "{{ inputs.calendar_b }}";
console.log(`📊 Checking ${syncedEvents.length} synced events for deletions`);
console.log(`📊 Current Calendar A events: ${currentCalendarAIds.size}`);
console.log(`📊 Current calendar B events: ${currentCalendarBIds.size}`);
// Find deleted events
const deletedEvents = [];
for (const event of syncedEvents) {
const isPrimaryCalendarA = event.primary_calendar === calendarA;
const currentIds = isPrimaryCalendarA ? currentCalendarAIds : currentCalendarBIds;
if (!currentIds.has(event.primary_event_id)) {
deletedEvents.push(event);
}
}
console.log(`🗑️ Found ${deletedEvents.length} deleted events`);
if (deletedEvents.length === 0) {
fs.writeFileSync('deletion_results.json', JSON.stringify([], null, 2));
console.log('✅ No deletions to process');
process.exit(0);
}
// Setup Google Calendar and PostgreSQL
const credentials = JSON.parse(process.env.SERVICE_ACCOUNT_JSON);
const auth = new google.auth.GoogleAuth({
credentials,
scopes: ['https://www.googleapis.com/auth/calendar']
});
const calendar = google.calendar({ version: 'v3', auth });
const hostParts = process.env.POSTGRES_HOST.split(':');
const pool = new Pool({
host: hostParts[0],
port: hostParts[1] ? parseInt(hostParts[1]) : 5432,
database: process.env.POSTGRES_DB,
user: process.env.POSTGRES_USER,
password: process.env.POSTGRES_PASSWORD
});
const deletionResults = [];
async function processDeletedEvents() {
for (const event of deletedEvents) {
let gcalDeleted = false;
let alreadyDeleted = false;
try {
// 1. Delete event from Google Calendar (secondary calendar)
await calendar.events.delete({
calendarId: event.secondary_calendar,
eventId: event.secondary_event_id
});
gcalDeleted = true;
} catch (error) {
// Check if event was already deleted (410 Gone)
if (error.code === 410 || error.message.includes('Resource has been deleted')) {
console.log(`ℹ️ Already deleted from calendar: ${event.event_summary}`);
alreadyDeleted = true;
gcalDeleted = true; // Treat as success for DB cleanup
} else {
// Real error - log and skip DB deletion
console.error(`❌ Failed to delete ${event.event_summary}: ${error.message}`);
deletionResults.push({
id: event.id,
event_summary: event.event_summary,
status: 'failed',
error: error.message
});
continue; // Skip to next event
}
}
// 2. Delete from database if GCal deletion succeeded or event already deleted
if (gcalDeleted) {
try {
await pool.query(
`DELETE FROM synced_events WHERE id = $1`,
[event.id]
);
deletionResults.push({
id: event.id,
event_summary: event.event_summary,
primary_calendar: event.primary_calendar,
secondary_calendar: event.secondary_calendar,
status: 'success'
});
if (alreadyDeleted) {
console.log(`✅ Already deleted, removed from DB: ${event.event_summary}`);
} else {
console.log(`✅ Deleted & removed from DB: ${event.event_summary}`);
}
} catch (dbError) {
// GCal deletion succeeded but DB deletion failed
console.error(`⚠️ Deleted from GCal but DB failed for ${event.event_summary}: ${dbError.message}`);
console.error(` Orphaned DB record ID: ${event.id}`);
deletionResults.push({
id: event.id,
event_summary: event.event_summary,
status: 'gcal_success_db_failed',
error: dbError.message
});
}
}
}
await pool.end();
fs.writeFileSync('deletion_results.json', JSON.stringify(deletionResults, null, 2));
const successful = deletionResults.filter(r => r.status === 'success').length;
const failed = deletionResults.filter(r => r.status === 'failed').length;
const partial = deletionResults.filter(r => r.status === 'gcal_success_db_failed').length;
console.log(`\n📊 Deletion Summary: ✅ ${successful} | ⚠️ ${partial} | ❌ ${failed}`);
}
processDeletedEvents().catch(err => {
console.error('Fatal error:', err);
process.exit(1);
});
# ============================================================================
# Step 6: Log summary (database writes happen immediately in create/update tasks)
# ============================================================================
- id: log_summary
type: io.kestra.plugin.scripts.node.Script
taskRunner:
type: io.kestra.plugin.core.runner.Process
script: |
const fs = require('fs');
// Read from actual filenames written by previous tasks
const calendarACreated = JSON.parse(fs.readFileSync('calendar_a_created_events.json', 'utf8'));
const calendarBCreated = JSON.parse(fs.readFileSync('calendar_b_created_events.json', 'utf8'));
const calendarAUpdated = JSON.parse(fs.readFileSync('calendar_a_update_results.json', 'utf8'));
const calendarBUpdated = JSON.parse(fs.readFileSync('calendar_b_update_results.json', 'utf8'));
const deletionResults = JSON.parse(fs.readFileSync('deletion_results.json', 'utf8'));
const now = new Date().toISOString();
const deletedSuccess = deletionResults.filter(r => r.status === 'success').length;
const deletedFailed = deletionResults.filter(r => r.status === 'failed').length;
const deletedPartial = deletionResults.filter(r => r.status === 'gcal_success_db_failed').length;
console.log('\n' + '='.repeat(60));
console.log('📅 GOOGLE CALENDAR SYNC SUMMARY');
console.log('='.repeat(60));
console.log(`⏰ Timestamp: ${now}`);
console.log(`\n📊 CREATE Operations:`);
console.log(` Calendar A → Calendar B: ${calendarACreated.length} events`);
console.log(` Calendar B → Calendar A: ${calendarBCreated.length} events`);
console.log(` Total created: ${calendarACreated.length + calendarBCreated.length}`);
console.log(`\n📝 UPDATE Operations:`);
console.log(` Calendar A → Calendar B: ${calendarAUpdated.length} events`);
console.log(` Calendar B → Calendar A: ${calendarBUpdated.length} events`);
console.log(` Total updated: ${calendarAUpdated.length + calendarBUpdated.length}`);
console.log(`\n🗑️ DELETE Operations:`);
console.log(` Successful: ${deletedSuccess} events`);
console.log(` Failed: ${deletedFailed} events`);
console.log(` Partial: ${deletedPartial} events`);
console.log(` Total deleted: ${deletedSuccess + deletedPartial + deletedFailed}`);
console.log(`\n✅ Grand Total: ${calendarACreated.length + calendarBCreated.length + calendarAUpdated.length + calendarBUpdated.length + deletedSuccess + deletedPartial + deletedFailed} operations`);
console.log('='.repeat(60));
# ============================================================================
# Triggers
# ============================================================================
triggers:
- id: every_15_minutes
type: io.kestra.plugin.core.trigger.Schedule
cron: "*/15 * * * *"
inputs:
calendar_a: "{{ secret('CALENDAR_A_ID') }}"
calendar_b: "{{ secret('CALENDAR_B_ID') }}"
max_events: 2500
# ============================================================================
# Error Handling
# ============================================================================
errors:
- id: notify_error
type: io.kestra.plugin.notifications.telegram.TelegramSend
token: "{{ secret('TELEGRAM_BOT_TOKEN') }}"
channel: "{{ secret('TELEGRAM_CHAT_ID') }}"
payload: |
❌ **Calendar Sync Failed**
Flow: {{ flow.id }}
Execution: {{ execution.id }}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment