Last active
December 4, 2025 20:12
-
-
Save kartben/2928d2f314b3cc820a229ac248a8d26a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Script to check GitHub activity of maintainers and collaborators listed in MAINTAINERS.yml | |
| This script extracts all GitHub usernames from the MAINTAINERS.yml file and checks their | |
| activity in the specified GitHub organization using the GitHub GraphQL API. | |
| Requirements: | |
| - GITHUB_TOKEN environment variable must be set | |
| - requests library (pip install requests) | |
| Usage: | |
| python check_maintainer_github_activity.py [options] | |
| Options: | |
| --maintainers-file FILE Path to MAINTAINERS.yml file (default: ./MAINTAINERS.yml) | |
| --org ORGANIZATION GitHub organization to check activity against (default: zephyrproject-rtos) | |
| --months MONTHS Number of months to check for inactivity (default: 6) | |
| --output-format FORMAT Output format: table, json, csv (default: table) | |
| --verbose Enable verbose output | |
| --chunk-large-windows Break large time windows (>12 months) into smaller chunks | |
| to avoid API limitations | |
| --max-workers WORKERS Maximum number of parallel workers for checking user activity | |
| (default: 6) | |
| """ | |
| import argparse | |
| import csv | |
| import json | |
| import os | |
| import sys | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from datetime import datetime, timedelta | |
| from typing import Optional, List, Set, Dict | |
| import requests | |
| import yaml | |
| class GitHubActivityChecker: | |
| """Check GitHub user activity using GraphQL API""" | |
| def __init__(self, token: str, org: str = "zephyrproject-rtos", months: int = 6): | |
| self.token = token | |
| self.org = org | |
| self.months = months | |
| self.headers = { | |
| "Authorization": f"Bearer {token}", | |
| "Content-Type": "application/json", | |
| } | |
| self.graphql_url = "https://api.github.com/graphql" | |
| # Calculate the date threshold for activity | |
| self.activity_threshold = datetime.now() - timedelta(days=months * 30) | |
| self.org_id: Optional[str] = None | |
| # Warn about potential API limitations with large time windows | |
| if months > 12: | |
| print(f"Warning: Querying {months} months of data may hit GitHub API limitations.") | |
| print("For time windows > 12 months, consider using smaller intervals or chunking.") | |
| print("Large time windows may return incomplete or zero contribution data.") | |
| print() | |
| def get_organization_id(self) -> str: | |
| """Resolve and cache the organization ID from its login name.""" | |
| if self.org_id: | |
| return self.org_id | |
| query = """ | |
| query($org: String!) { | |
| organization(login: $org) { | |
| id | |
| } | |
| } | |
| """ | |
| variables = {"org": self.org} | |
| try: | |
| response = requests.post( | |
| self.graphql_url, | |
| headers=self.headers, | |
| json={"query": query, "variables": variables}, | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| if 'errors' in data: | |
| error_msg = "; ".join([error['message'] for error in data['errors']]) | |
| raise RuntimeError(f"Failed to resolve org id for '{self.org}': {error_msg}") | |
| org_node = data.get('data', {}).get('organization') | |
| if not org_node or not org_node.get('id'): | |
| raise RuntimeError(f"Organization '{self.org}' not found or no ID returned") | |
| self.org_id = org_node['id'] | |
| return self.org_id | |
| except requests.exceptions.RequestException as e: | |
| raise RuntimeError(f"Network error resolving org id: {e}") from e | |
| def extract_usernames_from_maintainers(self, maintainers_file: str) -> Set[str]: | |
| """Extract all GitHub usernames from MAINTAINERS.yml file""" | |
| usernames = set() | |
| try: | |
| with open(maintainers_file, encoding='utf-8') as f: | |
| content = f.read() | |
| except FileNotFoundError: | |
| print(f"Error: MAINTAINERS.yml file not found at {maintainers_file}") | |
| sys.exit(1) | |
| except Exception as e: | |
| print(f"Error reading MAINTAINERS.yml: {e}") | |
| sys.exit(1) | |
| # Parse YAML content | |
| try: | |
| data = yaml.safe_load(content) | |
| except yaml.YAMLError as e: | |
| print(f"Error parsing MAINTAINERS.yml: {e}") | |
| sys.exit(1) | |
| # Extract usernames from maintainers and collaborators | |
| for area_data in data.values(): | |
| if isinstance(area_data, dict): | |
| # Extract maintainers | |
| if 'maintainers' in area_data: | |
| for maintainer in area_data['maintainers']: | |
| if isinstance(maintainer, str): | |
| usernames.add(maintainer.strip()) | |
| # Extract collaborators | |
| if 'collaborators' in area_data: | |
| for collaborator in area_data['collaborators']: | |
| if isinstance(collaborator, str): | |
| usernames.add(collaborator.strip()) | |
| return usernames | |
| def get_user_activity_chunked(self, username: str, chunk_months: int = 6) -> Dict: | |
| """Get user activity information using chunked queries for large time windows""" | |
| if self.months <= chunk_months: | |
| # No need to chunk, use regular method | |
| return self.get_user_activity(username) | |
| # Calculate number of chunks needed | |
| num_chunks = (self.months + chunk_months - 1) // chunk_months # Ceiling division | |
| # Store original settings | |
| original_months = self.months | |
| # original_threshold = self.activity_threshold | |
| total_contributions = 0 | |
| has_contributions = False | |
| has_restricted_contributions = False | |
| commit_contributions = 0 | |
| issue_contributions = 0 | |
| pr_contributions = 0 | |
| pr_review_contributions = 0 | |
| repository_contributions = 0 | |
| # Query each chunk | |
| for i in range(num_chunks): | |
| # Calculate the time range for this chunk | |
| # Chunk 0: 0 to chunk_months months ago | |
| # Chunk 1: chunk_months to 2*chunk_months months ago | |
| # etc. | |
| months_back_start = i * chunk_months | |
| months_back_end = min((i + 1) * chunk_months, original_months) | |
| # Calculate chunk dates | |
| # start_date is further back in time (older) | |
| # end_date is closer to now (newer) | |
| chunk_start_date = datetime.now() - timedelta(days=months_back_end * 30) | |
| chunk_end_date = datetime.now() - timedelta(days=months_back_start * 30) | |
| # print( | |
| # f"\nChunk {i + 1}/{num_chunks}: {actual_chunk_months} months from " | |
| # f"{chunk_start_date.strftime('%Y-%m-%d')} to " | |
| # f"{chunk_end_date.strftime('%Y-%m-%d')}" | |
| # ) | |
| chunk_result = self.get_user_activity( | |
| username, start_date=chunk_start_date, end_date=chunk_end_date | |
| ) | |
| # Aggregate results | |
| if chunk_result['status'] not in ['error', 'not_found']: | |
| total_contributions += chunk_result['total_contributions'] | |
| has_contributions = has_contributions or chunk_result['has_contributions'] | |
| has_restricted_contributions = ( | |
| has_restricted_contributions or chunk_result['has_restricted_contributions'] | |
| ) | |
| commit_contributions += chunk_result['commit_contributions'] | |
| issue_contributions += chunk_result['issue_contributions'] | |
| pr_contributions += chunk_result['pr_contributions'] | |
| pr_review_contributions += chunk_result['pr_review_contributions'] | |
| repository_contributions += chunk_result['repository_contributions'] | |
| else: | |
| # If any chunk fails, return the error | |
| return chunk_result | |
| # Restore original settings | |
| self.months = original_months | |
| # self.activity_threshold = original_threshold | |
| # Determine final status | |
| if has_restricted_contributions: | |
| status = "private_activity" | |
| elif has_contributions: | |
| status = "active" | |
| else: | |
| status = "inactive" | |
| return { | |
| "username": username, | |
| "name": chunk_result.get('name'), | |
| "email": chunk_result.get('email'), | |
| "status": status, | |
| "has_contributions": has_contributions, | |
| "has_restricted_contributions": has_restricted_contributions, | |
| "total_contributions": total_contributions, | |
| "commit_contributions": commit_contributions, | |
| "issue_contributions": issue_contributions, | |
| "pr_contributions": pr_contributions, | |
| "pr_review_contributions": pr_review_contributions, | |
| "repository_contributions": repository_contributions, | |
| } | |
| def format_time_since(self, date_str: str) -> str: | |
| """Format time since date string (ISO 8601)""" | |
| last_date = datetime.fromisoformat(date_str.replace('Z', '+00:00')) | |
| now = datetime.now(last_date.tzinfo) | |
| diff = now - last_date | |
| days = diff.days | |
| if days < 0: | |
| return "in the future" | |
| if days == 0: | |
| return "today" | |
| if days < 30: | |
| return f"{days} days ago" | |
| months = days // 30 | |
| if months < 24: | |
| return f"{months} months ago" | |
| years = days // 365 | |
| return f"{years} years ago" | |
| def get_user_activity( | |
| self, username: str, start_date: datetime, end_date: Optional[datetime] = None | |
| ) -> Dict: | |
| """Get user activity information using GraphQL API""" | |
| query = """ | |
| query($username: String!, $orgId: ID!, $from: DateTime!, $to: DateTime) { | |
| user(login: $username) { | |
| login | |
| name | |
| contributionsCollection(organizationID: $orgId, from: $from, to: $to) { | |
| hasAnyContributions | |
| hasAnyRestrictedContributions | |
| totalCommitContributions | |
| totalIssueContributions | |
| totalPullRequestContributions | |
| totalPullRequestReviewContributions | |
| totalRepositoryContributions | |
| # Fetch first 1 of each to verify counts (API sometimes reports 0 totals for non-members) | |
| pullRequestContributions(first: 1) { nodes { occurredAt } } | |
| issueContributions(first: 1) { nodes { occurredAt } } | |
| pullRequestReviewContributions(first: 1) { nodes { occurredAt } } | |
| repositoryContributions(first: 1) { nodes { occurredAt } } | |
| } | |
| } | |
| } | |
| """ | |
| variables = { | |
| "username": username, | |
| "orgId": self.get_organization_id(), | |
| "from": start_date.isoformat(), | |
| } | |
| if end_date: | |
| variables["to"] = end_date.isoformat() | |
| try: | |
| response = requests.post( | |
| self.graphql_url, | |
| headers=self.headers, | |
| json={"query": query, "variables": variables}, | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| if 'errors' in data: | |
| error_msg = "; ".join([error['message'] for error in data['errors']]) | |
| return { | |
| "username": username, | |
| "status": "error", | |
| "error": error_msg, | |
| "has_contributions": False, | |
| "has_restricted_contributions": False, | |
| "total_contributions": 0, | |
| } | |
| user_data = data['data']['user'] | |
| if not user_data: | |
| return { | |
| "username": username, | |
| "status": "not_found", | |
| "error": "User not found", | |
| "has_contributions": False, | |
| "has_restricted_contributions": False, | |
| "total_contributions": 0, | |
| } | |
| contributions = user_data['contributionsCollection'] | |
| # Helper to get count or 1 if nodes exist | |
| def get_count(key, list_key): | |
| count = contributions[key] | |
| if count == 0 and contributions.get(list_key, {}).get('nodes'): | |
| return 1 | |
| return count | |
| # Compute total contributions without relying on contributionCalendar | |
| # Exclude commits as they are often part of PRs and we want to avoid double counting | |
| issue_count = get_count('totalIssueContributions', 'issueContributions') | |
| pr_count = get_count('totalPullRequestContributions', 'pullRequestContributions') | |
| review_count = get_count('totalPullRequestReviewContributions', 'pullRequestReviewContributions') | |
| repo_count = get_count('totalRepositoryContributions', 'repositoryContributions') | |
| total_contributions = issue_count + pr_count + review_count + repo_count | |
| # Determine status | |
| if contributions['hasAnyRestrictedContributions']: | |
| status = "private_activity" | |
| elif total_contributions > 0: | |
| # print in green (ansi) | |
| status = "active" | |
| else: | |
| status = "inactive" | |
| return { | |
| "username": username, | |
| "name": user_data.get('name'), | |
| "email": user_data.get('email'), | |
| "status": status, | |
| "has_contributions": contributions['hasAnyContributions'], | |
| "has_restricted_contributions": contributions['hasAnyRestrictedContributions'], | |
| "total_contributions": total_contributions, | |
| "commit_contributions": contributions['totalCommitContributions'], | |
| "issue_contributions": issue_count, | |
| "pr_contributions": pr_count, | |
| "pr_review_contributions": review_count, | |
| "repository_contributions": repo_count, | |
| } | |
| except requests.exceptions.RequestException as e: | |
| return { | |
| "username": username, | |
| "status": "error", | |
| "error": str(e), | |
| "has_contributions": False, | |
| "has_restricted_contributions": False, | |
| "total_contributions": 0, | |
| } | |
| def get_user_recent_activity(self, username: str, max_activities: int = 3) -> List[Dict]: | |
| """Get user's most recent activities using iterative chunked queries""" | |
| # Start with a reasonable time window and expand if needed | |
| chunk_months = 6 | |
| max_months_back = 48 # Look back up to 4 years | |
| activities = [] | |
| for months_back in range(chunk_months, max_months_back + 1, chunk_months): | |
| # Calculate date range for this chunk | |
| end_date = datetime.now() - timedelta(days=(months_back - chunk_months) * 30) | |
| start_date = datetime.now() - timedelta(days=months_back * 30) | |
| chunk_activities = self._get_activities_for_period(username, start_date, end_date) | |
| activities.extend(chunk_activities) | |
| # If we have enough activities, we can stop | |
| if len(activities) >= max_activities: | |
| break | |
| # Sort by date and return the most recent ones | |
| sorted_activities = sorted(activities, key=lambda x: x['date'], reverse=True) | |
| return sorted_activities[:max_activities] | |
| def _get_activities_for_period( | |
| self, username: str, start_date: datetime, end_date: datetime | |
| ) -> List[Dict]: | |
| """Get activities for a specific time period""" | |
| query = """ | |
| query($username: String!, $from: DateTime!, $to: DateTime!, $orgId: ID!) { | |
| user(login: $username) { | |
| contributionsCollection(from: $from, to: $to, organizationID: $orgId) { | |
| commitContributionsByRepository(maxRepositories: 10) { | |
| repository { | |
| nameWithOwner | |
| } | |
| contributions(first: 10, orderBy: {field: OCCURRED_AT, direction: DESC}) { | |
| nodes { | |
| occurredAt | |
| } | |
| } | |
| } | |
| pullRequestContributions(first: 10, orderBy: {direction: DESC}) { | |
| nodes { | |
| occurredAt | |
| pullRequest { | |
| title | |
| url | |
| createdAt | |
| } | |
| } | |
| } | |
| issueContributions(first: 10, orderBy: {direction: DESC}) { | |
| nodes { | |
| occurredAt | |
| issue { | |
| title | |
| url | |
| createdAt | |
| } | |
| } | |
| } | |
| pullRequestReviewContributions(first: 10, orderBy: {direction: DESC}) { | |
| nodes { | |
| occurredAt | |
| pullRequestReview { | |
| pullRequest { | |
| title | |
| url | |
| } | |
| createdAt | |
| } | |
| } | |
| } | |
| } | |
| } | |
| } | |
| """ | |
| variables = { | |
| "username": username, | |
| "from": start_date.isoformat(), | |
| "to": end_date.isoformat(), | |
| "orgId": self.get_organization_id(), | |
| } | |
| try: | |
| response = requests.post( | |
| self.graphql_url, | |
| headers=self.headers, | |
| json={"query": query, "variables": variables}, | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| if 'errors' in data: | |
| return [] | |
| user_data = data.get('data', {}).get('user') | |
| if not user_data: | |
| return [] | |
| activities = [] | |
| collection = user_data['contributionsCollection'] | |
| # Commits | |
| for repo in collection.get('commitContributionsByRepository', []): | |
| repo_name = repo['repository']['nameWithOwner'] | |
| for contrib in repo['contributions']['nodes']: | |
| activities.append( | |
| { | |
| "type": "commit", | |
| "title": f"Commit to {repo_name}", | |
| "url": f"https://github.com/{repo_name}", | |
| "date": contrib['occurredAt'], | |
| } | |
| ) | |
| # PRs | |
| for contrib in collection.get('pullRequestContributions', {}).get('nodes', []): | |
| pr = contrib['pullRequest'] | |
| activities.append( | |
| { | |
| "type": "pr", | |
| "title": pr['title'], | |
| "url": pr['url'], | |
| "date": contrib['occurredAt'], | |
| } | |
| ) | |
| # Issues | |
| for contrib in collection.get('issueContributions', {}).get('nodes', []): | |
| issue = contrib['issue'] | |
| activities.append( | |
| { | |
| "type": "issue", | |
| "title": issue['title'], | |
| "url": issue['url'], | |
| "date": contrib['occurredAt'], | |
| } | |
| ) | |
| # PR Reviews | |
| for contrib in collection.get('pullRequestReviewContributions', {}).get('nodes', []): | |
| review = contrib['pullRequestReview'] | |
| activities.append( | |
| { | |
| "type": "review", | |
| "title": f"Review on '{review['pullRequest']['title']}'", | |
| "url": review['pullRequest']['url'], | |
| "date": contrib['occurredAt'], | |
| } | |
| ) | |
| return activities | |
| except requests.exceptions.RequestException: | |
| return [] | |
| def _check_single_user(self, username: str, chunk_large_windows: bool = False) -> Dict: | |
| """Check activity for a single user (helper method for parallel processing)""" | |
| # Use chunking for large time windows if enabled | |
| if chunk_large_windows and self.months > 12: | |
| return self.get_user_activity_chunked(username) | |
| else: | |
| return self.get_user_activity(username, start_date=self.activity_threshold) | |
| def check_all_users( | |
| self, | |
| usernames: Set[str], | |
| verbose: bool = False, | |
| chunk_large_windows: bool = False, | |
| max_workers: int = 6, | |
| ) -> List[Dict]: | |
| """Check activity for all users using parallel processing""" | |
| results = [] | |
| total_users = len(usernames) | |
| completed_count = 0 | |
| print(f"Checking activity for {total_users} users in {self.org} organization...") | |
| print( | |
| f"Activity threshold: {self.activity_threshold.strftime('%Y-%m-%d')} " | |
| f"(last {self.months} months)" | |
| ) | |
| print(f"Using {max_workers} parallel workers...") | |
| print() | |
| # Create a list of usernames for consistent ordering | |
| username_list = sorted(usernames) | |
| # Use ThreadPoolExecutor for parallel processing | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| # Submit all tasks | |
| future_to_username = { | |
| executor.submit(self._check_single_user, username, chunk_large_windows): username | |
| for username in username_list | |
| } | |
| # Process completed tasks as they finish | |
| for future in as_completed(future_to_username): | |
| username = future_to_username[future] | |
| completed_count += 1 | |
| try: | |
| result = future.result() | |
| results.append(result) | |
| if verbose: | |
| if result['status'] == 'active': | |
| print( | |
| f"[{completed_count:03}/{total_users}] {username}: \033[92m{result['status']}\033[0m" | |
| ) | |
| elif result['status'] == 'inactive': | |
| print( | |
| f"[{completed_count:03}/{total_users}] {username}: \033[91m{result['status']}\033[0m" | |
| ) | |
| elif result['status'] == 'private_activity': | |
| print( | |
| f"[{completed_count:03}/{total_users}] {username}: \033[96m{result['status']}\033[0m" | |
| ) | |
| else: | |
| print( | |
| f"[{completed_count:03}/{total_users}] {username}: {result['status']}" | |
| ) | |
| else: | |
| # Non-verbose progress indicator | |
| if completed_count % 10 == 0 or completed_count == total_users: | |
| print(f"Progress: {completed_count:03}/{total_users} users checked...") | |
| except Exception as exc: | |
| print(f"User {username} generated an exception: {exc}") | |
| # Add error result | |
| results.append( | |
| { | |
| "username": username, | |
| "status": "error", | |
| "error": str(exc), | |
| "has_contributions": False, | |
| "has_restricted_contributions": False, | |
| "total_contributions": 0, | |
| } | |
| ) | |
| return results | |
| def print_results_table(self, results: List[Dict]): | |
| """Print results in table format""" | |
| print(f"\nGitHub Activity Report for {self.org} (last {self.months} months)") | |
| print("=" * 80) | |
| # Count by status | |
| status_counts = {} | |
| for result in results: | |
| status = result['status'] | |
| status_counts[status] = status_counts.get(status, 0) + 1 | |
| print(f"Total users checked: {len(results)}") | |
| print(f"Active users: {status_counts.get('active', 0)}") | |
| print(f"Inactive users: {status_counts.get('inactive', 0)}") | |
| print(f"Users with private activity: {status_counts.get('private_activity', 0)}") | |
| print(f"Errors: {status_counts.get('error', 0)}") | |
| print(f"Not found: {status_counts.get('not_found', 0)}") | |
| print() | |
| # Group results by status | |
| active_users = [r for r in results if r['status'] == 'active'] | |
| inactive_users = [r for r in results if r['status'] == 'inactive'] | |
| private_users = [r for r in results if r['status'] == 'private_activity'] | |
| error_users = [r for r in results if r['status'] in ['error', 'not_found']] | |
| if inactive_users: | |
| print("INACTIVE USERS:") | |
| print("-" * 40) | |
| for user in inactive_users: | |
| print(f" {user['username']}") | |
| # Get and print recent activities | |
| recent_activities = self.get_user_recent_activity(user['username']) | |
| if recent_activities: | |
| # Calculate time since last activity | |
| last_activity = recent_activities[0] | |
| time_str = self.format_time_since(last_activity['date']) | |
| print(f" Last activity: {time_str} ({last_activity['type']})") | |
| print(" Recent activities:") | |
| for activity in recent_activities: | |
| print(f" - {activity['type']}: {activity['title']}") | |
| print(f" {activity['url']} ({activity['date']})") | |
| else: | |
| print(" No recent public activities found.") | |
| print() | |
| if private_users: | |
| print("USERS WITH PRIVATE ACTIVITY:") | |
| print("-" * 40) | |
| for user in private_users: | |
| print(f" {user['username']}") | |
| print() | |
| if active_users: | |
| print("ACTIVE USERS:") | |
| print("-" * 40) | |
| for user in sorted(active_users, key=lambda x: x['total_contributions'], reverse=True): | |
| print(f" {user['username']:<20} ({user['total_contributions']} contributions)") | |
| print() | |
| if error_users: | |
| print("ERRORS:") | |
| print("-" * 40) | |
| for user in error_users: | |
| print(f" {user['username']:<20} - {user.get('error', 'Unknown error')}") | |
| print() | |
| def print_results_json(self, results: List[Dict]): | |
| """Print results in JSON format""" | |
| print(json.dumps(results, indent=2)) | |
| def print_results_csv(self, results: List[Dict]): | |
| """Print results in CSV format""" | |
| if not results: | |
| return | |
| fieldnames = [ | |
| 'username', | |
| 'name', | |
| 'email', | |
| 'status', | |
| 'has_contributions', | |
| 'has_restricted_contributions', | |
| 'total_contributions', | |
| 'commit_contributions', | |
| 'issue_contributions', | |
| 'pr_contributions', | |
| 'pr_review_contributions', | |
| 'repository_contributions', | |
| 'error', | |
| ] | |
| writer = csv.DictWriter(sys.stdout, fieldnames=fieldnames) | |
| writer.writeheader() | |
| for result in results: | |
| row = {field: result.get(field, '') for field in fieldnames} | |
| writer.writerow(row) | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Check GitHub activity of maintainers and collaborators", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=__doc__, | |
| ) | |
| parser.add_argument( | |
| "--maintainers-file", | |
| default="./MAINTAINERS.yml", | |
| help="Path to MAINTAINERS.yml file (default: ./MAINTAINERS.yml)", | |
| ) | |
| parser.add_argument( | |
| "--org", | |
| default="zephyrproject-rtos", | |
| help="GitHub organization to check activity against (default: zephyrproject-rtos)", | |
| ) | |
| parser.add_argument( | |
| "--months", | |
| type=int, | |
| default=6, | |
| help="Number of months to check for inactivity (default: 6)", | |
| ) | |
| parser.add_argument( | |
| "--output-format", | |
| choices=["table", "json", "csv"], | |
| default="table", | |
| help="Output format (default: table)", | |
| ) | |
| parser.add_argument("--verbose", action="store_true", help="Enable verbose output") | |
| parser.add_argument( | |
| "--chunk-large-windows", | |
| action="store_true", | |
| help="Break large time windows (>12 months) into smaller chunks to avoid API limitations", | |
| ) | |
| parser.add_argument( | |
| "--max-workers", | |
| type=int, | |
| default=6, | |
| help="Maximum number of parallel workers for checking user activity (default: 6)", | |
| ) | |
| args = parser.parse_args() | |
| # Check for GitHub token | |
| github_token = os.getenv('GITHUB_TOKEN') | |
| if not github_token: | |
| print("Error: GITHUB_TOKEN environment variable is required") | |
| print("Please set your GitHub token: export GITHUB_TOKEN=your_token_here") | |
| sys.exit(1) | |
| # Initialize checker | |
| checker = GitHubActivityChecker(token=github_token, org=args.org, months=args.months) | |
| # Extract usernames | |
| usernames = checker.extract_usernames_from_maintainers(args.maintainers_file) | |
| if not usernames: | |
| print("No usernames found in MAINTAINERS.yml") | |
| sys.exit(1) | |
| # # create array of first 10 usernames | |
| # usernames = list(usernames)[:10] | |
| # Check activity | |
| results = checker.check_all_users( | |
| usernames, | |
| verbose=args.verbose, | |
| chunk_large_windows=args.chunk_large_windows, | |
| max_workers=args.max_workers, | |
| ) | |
| # Print results | |
| if args.output_format == "json": | |
| checker.print_results_json(results) | |
| elif args.output_format == "csv": | |
| checker.print_results_csv(results) | |
| else: | |
| checker.print_results_table(results) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment