Skip to content

Instantly share code, notes, and snippets.

@kartben
Last active December 4, 2025 20:12
Show Gist options
  • Select an option

  • Save kartben/2928d2f314b3cc820a229ac248a8d26a to your computer and use it in GitHub Desktop.

Select an option

Save kartben/2928d2f314b3cc820a229ac248a8d26a to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Script to check GitHub activity of maintainers and collaborators listed in MAINTAINERS.yml
This script extracts all GitHub usernames from the MAINTAINERS.yml file and checks their
activity in the specified GitHub organization using the GitHub GraphQL API.
Requirements:
- GITHUB_TOKEN environment variable must be set
- requests library (pip install requests)
Usage:
python check_maintainer_github_activity.py [options]
Options:
--maintainers-file FILE Path to MAINTAINERS.yml file (default: ./MAINTAINERS.yml)
--org ORGANIZATION GitHub organization to check activity against (default: zephyrproject-rtos)
--months MONTHS Number of months to check for inactivity (default: 6)
--output-format FORMAT Output format: table, json, csv (default: table)
--verbose Enable verbose output
--chunk-large-windows Break large time windows (>12 months) into smaller chunks
to avoid API limitations
--max-workers WORKERS Maximum number of parallel workers for checking user activity
(default: 6)
"""
import argparse
import csv
import json
import os
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta
from typing import Optional, List, Set, Dict
import requests
import yaml
class GitHubActivityChecker:
"""Check GitHub user activity using GraphQL API"""
def __init__(self, token: str, org: str = "zephyrproject-rtos", months: int = 6):
self.token = token
self.org = org
self.months = months
self.headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
self.graphql_url = "https://api.github.com/graphql"
# Calculate the date threshold for activity
self.activity_threshold = datetime.now() - timedelta(days=months * 30)
self.org_id: Optional[str] = None
# Warn about potential API limitations with large time windows
if months > 12:
print(f"Warning: Querying {months} months of data may hit GitHub API limitations.")
print("For time windows > 12 months, consider using smaller intervals or chunking.")
print("Large time windows may return incomplete or zero contribution data.")
print()
def get_organization_id(self) -> str:
"""Resolve and cache the organization ID from its login name."""
if self.org_id:
return self.org_id
query = """
query($org: String!) {
organization(login: $org) {
id
}
}
"""
variables = {"org": self.org}
try:
response = requests.post(
self.graphql_url,
headers=self.headers,
json={"query": query, "variables": variables},
)
response.raise_for_status()
data = response.json()
if 'errors' in data:
error_msg = "; ".join([error['message'] for error in data['errors']])
raise RuntimeError(f"Failed to resolve org id for '{self.org}': {error_msg}")
org_node = data.get('data', {}).get('organization')
if not org_node or not org_node.get('id'):
raise RuntimeError(f"Organization '{self.org}' not found or no ID returned")
self.org_id = org_node['id']
return self.org_id
except requests.exceptions.RequestException as e:
raise RuntimeError(f"Network error resolving org id: {e}") from e
def extract_usernames_from_maintainers(self, maintainers_file: str) -> Set[str]:
"""Extract all GitHub usernames from MAINTAINERS.yml file"""
usernames = set()
try:
with open(maintainers_file, encoding='utf-8') as f:
content = f.read()
except FileNotFoundError:
print(f"Error: MAINTAINERS.yml file not found at {maintainers_file}")
sys.exit(1)
except Exception as e:
print(f"Error reading MAINTAINERS.yml: {e}")
sys.exit(1)
# Parse YAML content
try:
data = yaml.safe_load(content)
except yaml.YAMLError as e:
print(f"Error parsing MAINTAINERS.yml: {e}")
sys.exit(1)
# Extract usernames from maintainers and collaborators
for area_data in data.values():
if isinstance(area_data, dict):
# Extract maintainers
if 'maintainers' in area_data:
for maintainer in area_data['maintainers']:
if isinstance(maintainer, str):
usernames.add(maintainer.strip())
# Extract collaborators
if 'collaborators' in area_data:
for collaborator in area_data['collaborators']:
if isinstance(collaborator, str):
usernames.add(collaborator.strip())
return usernames
def get_user_activity_chunked(self, username: str, chunk_months: int = 6) -> Dict:
"""Get user activity information using chunked queries for large time windows"""
if self.months <= chunk_months:
# No need to chunk, use regular method
return self.get_user_activity(username)
# Calculate number of chunks needed
num_chunks = (self.months + chunk_months - 1) // chunk_months # Ceiling division
# Store original settings
original_months = self.months
# original_threshold = self.activity_threshold
total_contributions = 0
has_contributions = False
has_restricted_contributions = False
commit_contributions = 0
issue_contributions = 0
pr_contributions = 0
pr_review_contributions = 0
repository_contributions = 0
# Query each chunk
for i in range(num_chunks):
# Calculate the time range for this chunk
# Chunk 0: 0 to chunk_months months ago
# Chunk 1: chunk_months to 2*chunk_months months ago
# etc.
months_back_start = i * chunk_months
months_back_end = min((i + 1) * chunk_months, original_months)
# Calculate chunk dates
# start_date is further back in time (older)
# end_date is closer to now (newer)
chunk_start_date = datetime.now() - timedelta(days=months_back_end * 30)
chunk_end_date = datetime.now() - timedelta(days=months_back_start * 30)
# print(
# f"\nChunk {i + 1}/{num_chunks}: {actual_chunk_months} months from "
# f"{chunk_start_date.strftime('%Y-%m-%d')} to "
# f"{chunk_end_date.strftime('%Y-%m-%d')}"
# )
chunk_result = self.get_user_activity(
username, start_date=chunk_start_date, end_date=chunk_end_date
)
# Aggregate results
if chunk_result['status'] not in ['error', 'not_found']:
total_contributions += chunk_result['total_contributions']
has_contributions = has_contributions or chunk_result['has_contributions']
has_restricted_contributions = (
has_restricted_contributions or chunk_result['has_restricted_contributions']
)
commit_contributions += chunk_result['commit_contributions']
issue_contributions += chunk_result['issue_contributions']
pr_contributions += chunk_result['pr_contributions']
pr_review_contributions += chunk_result['pr_review_contributions']
repository_contributions += chunk_result['repository_contributions']
else:
# If any chunk fails, return the error
return chunk_result
# Restore original settings
self.months = original_months
# self.activity_threshold = original_threshold
# Determine final status
if has_restricted_contributions:
status = "private_activity"
elif has_contributions:
status = "active"
else:
status = "inactive"
return {
"username": username,
"name": chunk_result.get('name'),
"email": chunk_result.get('email'),
"status": status,
"has_contributions": has_contributions,
"has_restricted_contributions": has_restricted_contributions,
"total_contributions": total_contributions,
"commit_contributions": commit_contributions,
"issue_contributions": issue_contributions,
"pr_contributions": pr_contributions,
"pr_review_contributions": pr_review_contributions,
"repository_contributions": repository_contributions,
}
def format_time_since(self, date_str: str) -> str:
"""Format time since date string (ISO 8601)"""
last_date = datetime.fromisoformat(date_str.replace('Z', '+00:00'))
now = datetime.now(last_date.tzinfo)
diff = now - last_date
days = diff.days
if days < 0:
return "in the future"
if days == 0:
return "today"
if days < 30:
return f"{days} days ago"
months = days // 30
if months < 24:
return f"{months} months ago"
years = days // 365
return f"{years} years ago"
def get_user_activity(
self, username: str, start_date: datetime, end_date: Optional[datetime] = None
) -> Dict:
"""Get user activity information using GraphQL API"""
query = """
query($username: String!, $orgId: ID!, $from: DateTime!, $to: DateTime) {
user(login: $username) {
login
name
email
contributionsCollection(organizationID: $orgId, from: $from, to: $to) {
hasAnyContributions
hasAnyRestrictedContributions
totalCommitContributions
totalIssueContributions
totalPullRequestContributions
totalPullRequestReviewContributions
totalRepositoryContributions
# Fetch first 1 of each to verify counts (API sometimes reports 0 totals for non-members)
pullRequestContributions(first: 1) { nodes { occurredAt } }
issueContributions(first: 1) { nodes { occurredAt } }
pullRequestReviewContributions(first: 1) { nodes { occurredAt } }
repositoryContributions(first: 1) { nodes { occurredAt } }
}
}
}
"""
variables = {
"username": username,
"orgId": self.get_organization_id(),
"from": start_date.isoformat(),
}
if end_date:
variables["to"] = end_date.isoformat()
try:
response = requests.post(
self.graphql_url,
headers=self.headers,
json={"query": query, "variables": variables},
)
response.raise_for_status()
data = response.json()
if 'errors' in data:
error_msg = "; ".join([error['message'] for error in data['errors']])
return {
"username": username,
"status": "error",
"error": error_msg,
"has_contributions": False,
"has_restricted_contributions": False,
"total_contributions": 0,
}
user_data = data['data']['user']
if not user_data:
return {
"username": username,
"status": "not_found",
"error": "User not found",
"has_contributions": False,
"has_restricted_contributions": False,
"total_contributions": 0,
}
contributions = user_data['contributionsCollection']
# Helper to get count or 1 if nodes exist
def get_count(key, list_key):
count = contributions[key]
if count == 0 and contributions.get(list_key, {}).get('nodes'):
return 1
return count
# Compute total contributions without relying on contributionCalendar
# Exclude commits as they are often part of PRs and we want to avoid double counting
issue_count = get_count('totalIssueContributions', 'issueContributions')
pr_count = get_count('totalPullRequestContributions', 'pullRequestContributions')
review_count = get_count('totalPullRequestReviewContributions', 'pullRequestReviewContributions')
repo_count = get_count('totalRepositoryContributions', 'repositoryContributions')
total_contributions = issue_count + pr_count + review_count + repo_count
# Determine status
if contributions['hasAnyRestrictedContributions']:
status = "private_activity"
elif total_contributions > 0:
# print in green (ansi)
status = "active"
else:
status = "inactive"
return {
"username": username,
"name": user_data.get('name'),
"email": user_data.get('email'),
"status": status,
"has_contributions": contributions['hasAnyContributions'],
"has_restricted_contributions": contributions['hasAnyRestrictedContributions'],
"total_contributions": total_contributions,
"commit_contributions": contributions['totalCommitContributions'],
"issue_contributions": issue_count,
"pr_contributions": pr_count,
"pr_review_contributions": review_count,
"repository_contributions": repo_count,
}
except requests.exceptions.RequestException as e:
return {
"username": username,
"status": "error",
"error": str(e),
"has_contributions": False,
"has_restricted_contributions": False,
"total_contributions": 0,
}
def get_user_recent_activity(self, username: str, max_activities: int = 3) -> List[Dict]:
"""Get user's most recent activities using iterative chunked queries"""
# Start with a reasonable time window and expand if needed
chunk_months = 6
max_months_back = 48 # Look back up to 4 years
activities = []
for months_back in range(chunk_months, max_months_back + 1, chunk_months):
# Calculate date range for this chunk
end_date = datetime.now() - timedelta(days=(months_back - chunk_months) * 30)
start_date = datetime.now() - timedelta(days=months_back * 30)
chunk_activities = self._get_activities_for_period(username, start_date, end_date)
activities.extend(chunk_activities)
# If we have enough activities, we can stop
if len(activities) >= max_activities:
break
# Sort by date and return the most recent ones
sorted_activities = sorted(activities, key=lambda x: x['date'], reverse=True)
return sorted_activities[:max_activities]
def _get_activities_for_period(
self, username: str, start_date: datetime, end_date: datetime
) -> List[Dict]:
"""Get activities for a specific time period"""
query = """
query($username: String!, $from: DateTime!, $to: DateTime!, $orgId: ID!) {
user(login: $username) {
contributionsCollection(from: $from, to: $to, organizationID: $orgId) {
commitContributionsByRepository(maxRepositories: 10) {
repository {
nameWithOwner
}
contributions(first: 10, orderBy: {field: OCCURRED_AT, direction: DESC}) {
nodes {
occurredAt
}
}
}
pullRequestContributions(first: 10, orderBy: {direction: DESC}) {
nodes {
occurredAt
pullRequest {
title
url
createdAt
}
}
}
issueContributions(first: 10, orderBy: {direction: DESC}) {
nodes {
occurredAt
issue {
title
url
createdAt
}
}
}
pullRequestReviewContributions(first: 10, orderBy: {direction: DESC}) {
nodes {
occurredAt
pullRequestReview {
pullRequest {
title
url
}
createdAt
}
}
}
}
}
}
"""
variables = {
"username": username,
"from": start_date.isoformat(),
"to": end_date.isoformat(),
"orgId": self.get_organization_id(),
}
try:
response = requests.post(
self.graphql_url,
headers=self.headers,
json={"query": query, "variables": variables},
)
response.raise_for_status()
data = response.json()
if 'errors' in data:
return []
user_data = data.get('data', {}).get('user')
if not user_data:
return []
activities = []
collection = user_data['contributionsCollection']
# Commits
for repo in collection.get('commitContributionsByRepository', []):
repo_name = repo['repository']['nameWithOwner']
for contrib in repo['contributions']['nodes']:
activities.append(
{
"type": "commit",
"title": f"Commit to {repo_name}",
"url": f"https://github.com/{repo_name}",
"date": contrib['occurredAt'],
}
)
# PRs
for contrib in collection.get('pullRequestContributions', {}).get('nodes', []):
pr = contrib['pullRequest']
activities.append(
{
"type": "pr",
"title": pr['title'],
"url": pr['url'],
"date": contrib['occurredAt'],
}
)
# Issues
for contrib in collection.get('issueContributions', {}).get('nodes', []):
issue = contrib['issue']
activities.append(
{
"type": "issue",
"title": issue['title'],
"url": issue['url'],
"date": contrib['occurredAt'],
}
)
# PR Reviews
for contrib in collection.get('pullRequestReviewContributions', {}).get('nodes', []):
review = contrib['pullRequestReview']
activities.append(
{
"type": "review",
"title": f"Review on '{review['pullRequest']['title']}'",
"url": review['pullRequest']['url'],
"date": contrib['occurredAt'],
}
)
return activities
except requests.exceptions.RequestException:
return []
def _check_single_user(self, username: str, chunk_large_windows: bool = False) -> Dict:
"""Check activity for a single user (helper method for parallel processing)"""
# Use chunking for large time windows if enabled
if chunk_large_windows and self.months > 12:
return self.get_user_activity_chunked(username)
else:
return self.get_user_activity(username, start_date=self.activity_threshold)
def check_all_users(
self,
usernames: Set[str],
verbose: bool = False,
chunk_large_windows: bool = False,
max_workers: int = 6,
) -> List[Dict]:
"""Check activity for all users using parallel processing"""
results = []
total_users = len(usernames)
completed_count = 0
print(f"Checking activity for {total_users} users in {self.org} organization...")
print(
f"Activity threshold: {self.activity_threshold.strftime('%Y-%m-%d')} "
f"(last {self.months} months)"
)
print(f"Using {max_workers} parallel workers...")
print()
# Create a list of usernames for consistent ordering
username_list = sorted(usernames)
# Use ThreadPoolExecutor for parallel processing
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_username = {
executor.submit(self._check_single_user, username, chunk_large_windows): username
for username in username_list
}
# Process completed tasks as they finish
for future in as_completed(future_to_username):
username = future_to_username[future]
completed_count += 1
try:
result = future.result()
results.append(result)
if verbose:
if result['status'] == 'active':
print(
f"[{completed_count:03}/{total_users}] {username}: \033[92m{result['status']}\033[0m"
)
elif result['status'] == 'inactive':
print(
f"[{completed_count:03}/{total_users}] {username}: \033[91m{result['status']}\033[0m"
)
elif result['status'] == 'private_activity':
print(
f"[{completed_count:03}/{total_users}] {username}: \033[96m{result['status']}\033[0m"
)
else:
print(
f"[{completed_count:03}/{total_users}] {username}: {result['status']}"
)
else:
# Non-verbose progress indicator
if completed_count % 10 == 0 or completed_count == total_users:
print(f"Progress: {completed_count:03}/{total_users} users checked...")
except Exception as exc:
print(f"User {username} generated an exception: {exc}")
# Add error result
results.append(
{
"username": username,
"status": "error",
"error": str(exc),
"has_contributions": False,
"has_restricted_contributions": False,
"total_contributions": 0,
}
)
return results
def print_results_table(self, results: List[Dict]):
"""Print results in table format"""
print(f"\nGitHub Activity Report for {self.org} (last {self.months} months)")
print("=" * 80)
# Count by status
status_counts = {}
for result in results:
status = result['status']
status_counts[status] = status_counts.get(status, 0) + 1
print(f"Total users checked: {len(results)}")
print(f"Active users: {status_counts.get('active', 0)}")
print(f"Inactive users: {status_counts.get('inactive', 0)}")
print(f"Users with private activity: {status_counts.get('private_activity', 0)}")
print(f"Errors: {status_counts.get('error', 0)}")
print(f"Not found: {status_counts.get('not_found', 0)}")
print()
# Group results by status
active_users = [r for r in results if r['status'] == 'active']
inactive_users = [r for r in results if r['status'] == 'inactive']
private_users = [r for r in results if r['status'] == 'private_activity']
error_users = [r for r in results if r['status'] in ['error', 'not_found']]
if inactive_users:
print("INACTIVE USERS:")
print("-" * 40)
for user in inactive_users:
print(f" {user['username']}")
# Get and print recent activities
recent_activities = self.get_user_recent_activity(user['username'])
if recent_activities:
# Calculate time since last activity
last_activity = recent_activities[0]
time_str = self.format_time_since(last_activity['date'])
print(f" Last activity: {time_str} ({last_activity['type']})")
print(" Recent activities:")
for activity in recent_activities:
print(f" - {activity['type']}: {activity['title']}")
print(f" {activity['url']} ({activity['date']})")
else:
print(" No recent public activities found.")
print()
if private_users:
print("USERS WITH PRIVATE ACTIVITY:")
print("-" * 40)
for user in private_users:
print(f" {user['username']}")
print()
if active_users:
print("ACTIVE USERS:")
print("-" * 40)
for user in sorted(active_users, key=lambda x: x['total_contributions'], reverse=True):
print(f" {user['username']:<20} ({user['total_contributions']} contributions)")
print()
if error_users:
print("ERRORS:")
print("-" * 40)
for user in error_users:
print(f" {user['username']:<20} - {user.get('error', 'Unknown error')}")
print()
def print_results_json(self, results: List[Dict]):
"""Print results in JSON format"""
print(json.dumps(results, indent=2))
def print_results_csv(self, results: List[Dict]):
"""Print results in CSV format"""
if not results:
return
fieldnames = [
'username',
'name',
'email',
'status',
'has_contributions',
'has_restricted_contributions',
'total_contributions',
'commit_contributions',
'issue_contributions',
'pr_contributions',
'pr_review_contributions',
'repository_contributions',
'error',
]
writer = csv.DictWriter(sys.stdout, fieldnames=fieldnames)
writer.writeheader()
for result in results:
row = {field: result.get(field, '') for field in fieldnames}
writer.writerow(row)
def main():
parser = argparse.ArgumentParser(
description="Check GitHub activity of maintainers and collaborators",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument(
"--maintainers-file",
default="./MAINTAINERS.yml",
help="Path to MAINTAINERS.yml file (default: ./MAINTAINERS.yml)",
)
parser.add_argument(
"--org",
default="zephyrproject-rtos",
help="GitHub organization to check activity against (default: zephyrproject-rtos)",
)
parser.add_argument(
"--months",
type=int,
default=6,
help="Number of months to check for inactivity (default: 6)",
)
parser.add_argument(
"--output-format",
choices=["table", "json", "csv"],
default="table",
help="Output format (default: table)",
)
parser.add_argument("--verbose", action="store_true", help="Enable verbose output")
parser.add_argument(
"--chunk-large-windows",
action="store_true",
help="Break large time windows (>12 months) into smaller chunks to avoid API limitations",
)
parser.add_argument(
"--max-workers",
type=int,
default=6,
help="Maximum number of parallel workers for checking user activity (default: 6)",
)
args = parser.parse_args()
# Check for GitHub token
github_token = os.getenv('GITHUB_TOKEN')
if not github_token:
print("Error: GITHUB_TOKEN environment variable is required")
print("Please set your GitHub token: export GITHUB_TOKEN=your_token_here")
sys.exit(1)
# Initialize checker
checker = GitHubActivityChecker(token=github_token, org=args.org, months=args.months)
# Extract usernames
usernames = checker.extract_usernames_from_maintainers(args.maintainers_file)
if not usernames:
print("No usernames found in MAINTAINERS.yml")
sys.exit(1)
# # create array of first 10 usernames
# usernames = list(usernames)[:10]
# Check activity
results = checker.check_all_users(
usernames,
verbose=args.verbose,
chunk_large_windows=args.chunk_large_windows,
max_workers=args.max_workers,
)
# Print results
if args.output_format == "json":
checker.print_results_json(results)
elif args.output_format == "csv":
checker.print_results_csv(results)
else:
checker.print_results_table(results)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment