|
from __future__ import annotations, print_function |
|
|
|
# /// script |
|
# dependencies = [ |
|
# "google-api-python-client", |
|
# "google-auth-httplib2", |
|
# "google-auth-oauthlib", |
|
# ] |
|
# [tool.uv] |
|
# exclude-newer = "2023-10-16T00:00:00Z" |
|
# /// |
|
|
|
import datetime |
|
import os.path |
|
from typing import Optional, Tuple, List, Dict, Any |
|
|
|
from google.oauth2.credentials import Credentials |
|
from google_auth_oauthlib.flow import InstalledAppFlow |
|
from google.auth.transport.requests import Request |
|
from googleapiclient.discovery import build |
|
|
|
SCOPES = ["https://www.googleapis.com/auth/calendar.readonly"] |
|
|
|
|
|
def get_calendar_service(): |
|
creds = None |
|
if os.path.exists("token.json"): |
|
creds = Credentials.from_authorized_user_file("token.json", SCOPES) |
|
if not creds or not creds.valid: |
|
if creds and creds.expired and creds.refresh_token: |
|
creds.refresh(Request()) |
|
else: |
|
flow = InstalledAppFlow.from_client_secrets_file("credentials.json", SCOPES) |
|
creds = flow.run_local_server(port=0) |
|
with open("token.json", "w") as token: |
|
token.write(creds.to_json()) |
|
return build("calendar", "v3", credentials=creds) |
|
|
|
|
|
def parse_event_window(e: Dict[str, Any]) -> Optional[Tuple[datetime.datetime, datetime.datetime]]: |
|
"""Return event start/end as aware datetimes. Skip all-day (date-only) and malformed.""" |
|
start_raw = e.get("start", {}) |
|
end_raw = e.get("end", {}) |
|
start_dt = start_raw.get("dateTime") |
|
end_dt = end_raw.get("dateTime") |
|
if not start_dt or not end_dt: |
|
return None |
|
s = datetime.datetime.fromisoformat(start_dt.replace("Z", "+00:00")) |
|
t = datetime.datetime.fromisoformat(end_dt.replace("Z", "+00:00")) |
|
return s, t |
|
|
|
|
|
def my_response_status(e: Dict[str, Any]) -> Optional[str]: |
|
"""Return your attendee responseStatus if present.""" |
|
for a in e.get("attendees", []) or []: |
|
if a.get("self"): |
|
return a.get("responseStatus") |
|
return None |
|
|
|
|
|
def is_event_busy(e: Dict[str, Any]) -> bool: |
|
"""Busy means it's a time-blocking meeting that isn't focus/OOO/outOfOffice and isn't declined.""" |
|
summary = (e.get("summary") or "").lower() |
|
event_type = e.get("eventType", "") |
|
|
|
# Ignore focus / OOO / out-of-office |
|
if "focus" in summary or "ooo" in summary or event_type == "outOfOffice": |
|
return False |
|
|
|
# If you declined, ignore completely for busy/free |
|
if my_response_status(e) == "declined": |
|
return False |
|
|
|
# Respect transparency: transparent does not block time |
|
if e.get("transparency") == "transparent": |
|
return False |
|
|
|
# Busy if it has a concrete time window |
|
return parse_event_window(e) is not None |
|
|
|
|
|
def get_current_and_next_event(service): |
|
now_utc = datetime.datetime.now(datetime.UTC) |
|
horizon = now_utc + datetime.timedelta(hours=4) |
|
|
|
events_result = service.events().list( |
|
calendarId="primary", |
|
timeMin=now_utc.isoformat(), |
|
timeMax=horizon.isoformat(), |
|
singleEvents=True, |
|
orderBy="startTime", |
|
showHiddenInvitations=False, |
|
).execute() |
|
|
|
events = events_result.get("items", []) |
|
current = None |
|
next_ev = None |
|
|
|
for e in events: |
|
window = parse_event_window(e) |
|
if not window: |
|
continue |
|
s, t = window |
|
|
|
if is_event_busy(e): |
|
if s <= now_utc <= t: |
|
current = e |
|
elif s > now_utc and next_ev is None: |
|
next_ev = e |
|
if current and next_ev: |
|
break |
|
|
|
return current, next_ev |
|
|
|
|
|
def list_events_remaining_today(service) -> List[Dict[str, Any]]: |
|
local_now = datetime.datetime.now().astimezone() |
|
end_of_day_local = local_now.replace(hour=23, minute=59, second=59, microsecond=0) |
|
time_min = local_now.astimezone(datetime.UTC).isoformat() |
|
time_max = end_of_day_local.astimezone(datetime.UTC).isoformat() |
|
|
|
all_events: List[Dict[str, Any]] = [] |
|
page_token = None |
|
while True: |
|
resp = service.events().list( |
|
calendarId="primary", |
|
timeMin=time_min, |
|
timeMax=time_max, |
|
singleEvents=True, |
|
orderBy="startTime", |
|
pageToken=page_token, |
|
showHiddenInvitations=False, |
|
).execute() |
|
items = resp.get("items", []) |
|
all_events.extend(items) |
|
page_token = resp.get("nextPageToken") |
|
if not page_token: |
|
break |
|
|
|
rows = [] |
|
for e in all_events: |
|
# Skip declined meetings entirely |
|
if my_response_status(e) == "declined": |
|
continue |
|
|
|
timing = parse_event_window(e) |
|
all_day = False |
|
|
|
if not timing: |
|
all_day = True |
|
start_local = end_local = None |
|
status = "FREE (all-day)" |
|
else: |
|
s_utc, t_utc = timing |
|
start_local = s_utc.astimezone(local_now.tzinfo) |
|
end_local = t_utc.astimezone(local_now.tzinfo) |
|
status = "BUSY" if is_event_busy(e) else "FREE" |
|
|
|
rows.append( |
|
{ |
|
"summary": e.get("summary") or "(no title)", |
|
"status": status, |
|
"start_local": start_local, |
|
"end_local": end_local, |
|
} |
|
) |
|
return rows |
|
|
|
|
|
def format_time(dt: Optional[datetime.datetime]) -> str: |
|
return "-" if not dt else dt.strftime("%H:%M") |
|
|
|
|
|
if __name__ == "__main__": |
|
service = get_calendar_service() |
|
current, next_event = get_current_and_next_event(service) |
|
local_tz = datetime.datetime.now().astimezone().tzinfo |
|
|
|
if current: |
|
end_time = datetime.datetime.fromisoformat( |
|
current["end"]["dateTime"].replace("Z", "+00:00") |
|
).astimezone(local_tz) |
|
print(f"Busy until {end_time.strftime('%H:%M')}") |
|
elif next_event: |
|
start_time = datetime.datetime.fromisoformat( |
|
next_event["start"]["dateTime"].replace("Z", "+00:00") |
|
).astimezone(local_tz) |
|
print(f"Free until {start_time.strftime('%H:%M')}") |
|
else: |
|
print("Free") |
|
|
|
print("\nToday's remaining events:") |
|
events = list_events_remaining_today(service) |
|
if not events: |
|
print("(none)") |
|
else: |
|
for e in events: |
|
print( |
|
f"- {format_time(e['start_local'])}–{format_time(e['end_local'])} " |
|
f"[{e['status']}] {e['summary']}" |
|
) |