Last active
February 25, 2025 03:52
-
-
Save Richard-Barrett/c17cea79b0b2b7b2a3cd48da3b53d13c to your computer and use it in GitHub Desktop.
Scrape Metrics on Pull Requests on GitHub and/or Bitbucket
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/bin/env python3 | |
| import argparse | |
| import requests | |
| import json | |
| import csv | |
| import concurrent.futures | |
| from unittest.mock import patch | |
| import os | |
| import datetime | |
| from dateutil.relativedelta import relativedelta | |
| from tqdm import tqdm # Progress bar | |
| import urllib3 | |
| # Suppress only SSL warnings, keep others | |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
| LOG_FILE = "scrape_log.txt" | |
| def log_message(message): | |
| """Logs messages to a file and prints it to the console.""" | |
| with open(LOG_FILE, "a") as log: | |
| log.write(f"{datetime.datetime.now()} - {message}\n") | |
| print(message) # Print to console as well | |
| def get_github_user_email(token, username): | |
| """Fetch the email of a GitHub user if it's publicly available.""" | |
| headers = {"Authorization": f"token {token}"} | |
| user_url = f"https://api.github.com/users/{username}" | |
| response = requests.get(user_url, headers=headers) | |
| if response.status_code == 200: | |
| email = response.json().get("email", "N/A") | |
| if email: | |
| return email | |
| return "N/A" | |
| def check_github_rate_limit(token): | |
| """Check GitHub API rate limit before running a scrape.""" | |
| headers = {"Authorization": f"token {token}"} | |
| url = "https://api.github.com/rate_limit" | |
| try: | |
| response = requests.get(url, headers=headers) | |
| if response.status_code == 200: | |
| data = response.json() | |
| remaining = data["rate"]["remaining"] | |
| reset_time = datetime.datetime.utcfromtimestamp(data["rate"]["reset"]).strftime('%Y-%m-%d %H:%M:%S UTC') | |
| log_message(f"π GitHub API Rate Limit: {remaining} requests remaining. Resets at: {reset_time}") | |
| if remaining < 10: | |
| log_message("β οΈ API rate limit is critically low. Exiting to prevent failures.") | |
| exit(1) | |
| else: | |
| log_message(f"β Failed to fetch rate limit. Response: {response.text}") | |
| except Exception as e: | |
| log_message(f"β Error checking GitHub rate limit: {e}") | |
| exit(1) | |
| def load_repositories(file_path): | |
| """Load repositories from a file, skipping invalid lines.""" | |
| repositories = [] | |
| try: | |
| with open(file_path, "r") as f: | |
| for line in f: | |
| line = line.strip() | |
| if line and "/" in line: # Ensure line contains org/repo format | |
| repositories.append(line) | |
| else: | |
| log_message(f"β οΈ Skipping invalid repository line: {line}") | |
| log_message(f"π Loaded {len(repositories)} valid repositories from {file_path}") | |
| return repositories | |
| except Exception as e: | |
| log_message(f"β Error reading repository file: {e}") | |
| return [] | |
| def parse_scrape_range(scrape_range): | |
| """Convert scrape_range (e.g., '1y', '2m', '3w', '4d') into a datetime object.""" | |
| now = datetime.datetime.utcnow() | |
| if not scrape_range: | |
| return now - relativedelta(months=1) # Default: last 1 month | |
| unit = scrape_range[-1] | |
| value = scrape_range[:-1] | |
| try: | |
| value = int(value) | |
| except ValueError: | |
| raise ValueError(f"Invalid scrape range format: {scrape_range}") | |
| if unit == 'y': | |
| since_date = now - relativedelta(years=value) | |
| elif unit == 'm': | |
| since_date = now - relativedelta(months=value) | |
| elif unit == 'w': | |
| since_date = now - relativedelta(weeks=value) | |
| elif unit == 'd': | |
| since_date = now - relativedelta(days=value) | |
| else: | |
| raise ValueError("Invalid --scrape-range format. Use '1y', '2m', '3w', '4d'.") | |
| return since_date # This ensures a valid datetime object is returned | |
| def parse_scrape_range_monthly(scrape_range): | |
| """Convert scrape_range into a list of months.""" | |
| now = datetime.datetime.utcnow() | |
| if not scrape_range: | |
| return [now.strftime("%Y-%m")] # Default: current month | |
| unit = scrape_range[-1] | |
| value = scrape_range[:-1] | |
| try: | |
| value = int(value) | |
| except ValueError: | |
| raise ValueError(f"Invalid scrape range format: {scrape_range}") | |
| start_date = now | |
| if unit == 'y': | |
| start_date = now - relativedelta(years=value) | |
| elif unit == 'm': | |
| start_date = now - relativedelta(months=value) | |
| elif unit == 'w': | |
| start_date = now - relativedelta(weeks=value) | |
| elif unit == 'd': | |
| start_date = now - relativedelta(days=value) | |
| else: | |
| raise ValueError("Invalid --scrape-range format. Use '1y', '2m', '3w', '4d'.") | |
| # Generate a list of months | |
| months = [] | |
| current_date = start_date.replace(day=1) # Start from the first day of the start month | |
| while current_date <= now: | |
| months.append(current_date.strftime("%Y-%m")) # Ensure proper formatting | |
| current_date += relativedelta(months=1) | |
| log_message(f"π Months to scrape: {months}") | |
| return months | |
| def save_to_file(data, target, output_format): | |
| """Saves the aggregated data to a file.""" | |
| date_str = datetime.datetime.now().strftime("%m-%d-%Y") | |
| filename = f"{target}_{date_str}.{output_format}" | |
| if output_format == "json": | |
| with open(filename, "w") as f: | |
| json.dump(data, f, indent=4) | |
| elif output_format == "csv": | |
| with open(filename, "w", newline="") as f: | |
| writer = csv.writer(f) | |
| writer.writerow(["User", "Email", "Total PRs", "Merged PRs", "Closed but Not Merged PRs", "Denied PRs", "Source"]) | |
| for user, stats in data.items(): | |
| writer.writerow([ | |
| user, stats["email"], stats["total_prs"], stats["merged_prs"], | |
| stats["closed_not_merged"], stats["denied_prs"], stats["source"] | |
| ]) | |
| print(f"β Results saved to {filename}") | |
| def combine_github_bitbucket(github_data, bitbucket_data): | |
| """Combines PR metrics from GitHub and Bitbucket using email as the unique identifier.""" | |
| combined_data = {} | |
| # Process GitHub data | |
| for user, stats in github_data.items(): | |
| email = stats["email"] if stats["email"] != "N/A" else f"unknown_github_{user}" | |
| if email not in combined_data: | |
| combined_data[email] = { | |
| "user": user, | |
| "email": stats["email"], | |
| "total_prs": stats["total_prs"], | |
| "merged_prs": stats["merged_prs"], | |
| "closed_not_merged": stats["closed_not_merged"], | |
| "denied_prs": stats["denied_prs"], | |
| "source": "github" | |
| } | |
| # Process Bitbucket data | |
| for user, stats in bitbucket_data.items(): | |
| email = stats["email"] if stats["email"] != "N/A" else f"unknown_bitbucket_{user}" | |
| if email in combined_data: | |
| # Combine metrics if the email matches | |
| combined_data[email]["total_prs"] += stats["total_prs"] | |
| combined_data[email]["merged_prs"] += stats["merged_prs"] | |
| combined_data[email]["closed_not_merged"] += stats["closed_not_merged"] | |
| combined_data[email]["denied_prs"] += stats["denied_prs"] | |
| combined_data[email]["source"] = "combined" # Update source to combined | |
| else: | |
| # If no match, add the Bitbucket user | |
| combined_data[email] = { | |
| "user": user, | |
| "email": stats["email"], | |
| "total_prs": stats["total_prs"], | |
| "merged_prs": stats["merged_prs"], | |
| "closed_not_merged": stats["closed_not_merged"], | |
| "denied_prs": stats["denied_prs"], | |
| "source": "bitbucket" | |
| } | |
| return combined_data | |
| def fetch_bitbucket_prs(token, bitbucket_url, workspaces, limit, scrape_range, threads=4): | |
| """Fetch PR metrics from Bitbucket repositories.""" | |
| headers = {"Authorization": f"Bearer {token}"} | |
| user_stats = {} | |
| since_date = parse_scrape_range(scrape_range) | |
| def get_repositories(workspace): | |
| """Fetch all repositories for a given workspace.""" | |
| repo_list = [] | |
| start = 0 | |
| while True: | |
| repo_url = f"{bitbucket_url}/rest/api/latest/projects/{workspace}/repos?limit=100&start={start}" | |
| response = requests.get(repo_url, headers=headers, verify=False) | |
| if response.status_code != 200: | |
| log_message(f"β οΈ Failed to fetch repositories for workspace {workspace}: {response.text}") | |
| break | |
| data = response.json() | |
| repos = data.get("values", []) | |
| repo_list.extend([f"{workspace}/{repo['slug']}" for repo in repos]) | |
| if data.get("isLastPage", True): | |
| break | |
| start = data.get("nextPageStart", start + len(repos)) | |
| return repo_list | |
| def process_repo(full_repo_name): | |
| """Fetch PRs for a single repository.""" | |
| try: | |
| workspace_name, repo_name = full_repo_name.split("/") | |
| pr_url = f"{bitbucket_url}/rest/api/latest/projects/{workspace_name}/repos/{repo_name}/pull-requests" | |
| params = { | |
| "state": "ALL", | |
| "limit": 100, | |
| "start": 0 | |
| } | |
| repo_user_stats = {} | |
| while True: | |
| response = requests.get(pr_url, headers=headers, params=params, verify=False) | |
| if response.status_code != 200: | |
| log_message(f"β οΈ Failed to fetch PRs from {repo_name}. Response: {response.text}") | |
| break | |
| data = response.json() | |
| prs = data.get("values", []) | |
| for pr in prs: | |
| created_at = datetime.datetime.fromtimestamp(pr["createdDate"] / 1000.0) | |
| if created_at < since_date: | |
| continue # Skip old PRs | |
| user = pr["author"]["user"]["name"] | |
| email = pr["author"]["user"].get("emailAddress", "N/A") | |
| state = pr["state"].lower() | |
| if user not in repo_user_stats: | |
| repo_user_stats[user] = { | |
| "email": email, | |
| "total_prs": 0, | |
| "merged_prs": 0, | |
| "closed_not_merged": 0, | |
| "denied_prs": 0, | |
| "source": "bitbucket" | |
| } | |
| repo_user_stats[user]["total_prs"] += 1 | |
| if state == "merged": | |
| repo_user_stats[user]["merged_prs"] += 1 | |
| elif state in ["declined", "closed"]: | |
| repo_user_stats[user]["denied_prs"] += 1 | |
| repo_user_stats[user]["closed_not_merged"] += 1 | |
| if data.get("isLastPage", True): | |
| break | |
| params["start"] = data.get("nextPageStart", params["start"] + len(prs)) | |
| return repo_user_stats | |
| except Exception as e: | |
| log_message(f"β Error processing repo {full_repo_name}: {str(e)}") | |
| return {} | |
| # Step 1: Get all repositories from all workspaces | |
| all_repositories = [] | |
| for workspace in workspaces: | |
| log_message(f"π Fetching repositories for workspace '{workspace}'...") | |
| all_repositories.extend(get_repositories(workspace)) | |
| if not all_repositories: | |
| log_message("β οΈ No repositories found. Exiting.") | |
| return {} | |
| log_message(f"π Found {len(all_repositories)} repositories to process...") | |
| # Step 2: Fetch PRs from all repositories in parallel | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: | |
| futures = [executor.submit(process_repo, repo) for repo in all_repositories] | |
| for future in concurrent.futures.as_completed(futures): | |
| try: | |
| repo_stats = future.result() | |
| for user, stats in repo_stats.items(): | |
| if user not in user_stats: | |
| user_stats[user] = stats | |
| else: | |
| user_stats[user]["total_prs"] += stats["total_prs"] | |
| user_stats[user]["merged_prs"] += stats["merged_prs"] | |
| user_stats[user]["denied_prs"] += stats["denied_prs"] | |
| user_stats[user]["closed_not_merged"] += stats["closed_not_merged"] | |
| except Exception as e: | |
| log_message(f"β Error processing repository: {str(e)}") | |
| return user_stats | |
| def fetch_github_prs(token, orgs, limit, scrape_range, repo_list=None, threads=4): | |
| """Fetch PR metrics from GitHub repositories for a given scrape range.""" | |
| headers = {"Authorization": f"token {token}"} | |
| user_stats = {} | |
| # Convert scrape_range to a proper datetime object | |
| since_date = parse_scrape_range(scrape_range) | |
| until_date = datetime.datetime.utcnow() # PRs up until now | |
| repositories_to_scrape = repo_list if repo_list else [] | |
| if not repositories_to_scrape: | |
| log_message("β οΈ No repositories found to scrape.") | |
| return {} | |
| log_message(f"π Processing {len(repositories_to_scrape)} repositories since {since_date} with {threads} threads...") | |
| def process_repo(full_repo_name): | |
| """Process a single repository and return user stats.""" | |
| repo_user_stats = {} | |
| try: | |
| if "/" not in full_repo_name: | |
| log_message(f"β οΈ Skipping invalid repository format: {full_repo_name}") | |
| return {} | |
| org_name, repo_name = full_repo_name.split("/") | |
| pr_url = f"https://api.github.com/repos/{org_name}/{repo_name}/pulls" | |
| params = { | |
| "state": "all", | |
| "per_page": 100, # Max allowed by GitHub | |
| "sort": "created", | |
| "direction": "desc" | |
| } | |
| page = 1 | |
| total_prs = 0 | |
| while True: | |
| params["page"] = page | |
| response = requests.get(pr_url, headers=headers, params=params) | |
| if response.status_code != 200: | |
| log_message(f"β Failed to fetch PRs from {repo_name}. Response: {response.status_code} - {response.text}") | |
| break | |
| prs = response.json() | |
| if not prs: | |
| break | |
| log_message(f"π Found {len(prs)} PRs in {repo_name} (Page {page})") | |
| for pr in prs: | |
| created_at = datetime.datetime.strptime(pr["created_at"], "%Y-%m-%dT%H:%M:%SZ") | |
| if created_at < since_date: | |
| log_message(f"β οΈ PR {pr['number']} is older than {since_date}. Stopping pagination.") | |
| break # Stop fetching older PRs | |
| if created_at >= until_date: | |
| continue # Skip PRs from the future | |
| if total_prs >= limit: | |
| log_message(f"β οΈ Reached PR limit of {limit} for {repo_name}") | |
| break | |
| # Process PR... | |
| total_prs += 1 | |
| user = pr["user"]["login"] | |
| email = get_github_user_email(token, user) | |
| # Fetch PR details to determine if it was merged | |
| pr_details_url = f"https://api.github.com/repos/{org_name}/{repo_name}/pulls/{pr['number']}" | |
| pr_details_response = requests.get(pr_details_url, headers=headers) | |
| if pr_details_response.status_code != 200: | |
| log_message(f"β οΈ Failed to fetch PR details for PR {pr['number']} in {repo_name}. Response: {pr_details_response.status_code}") | |
| continue | |
| pr_details = pr_details_response.json() | |
| is_merged = pr_details.get("merged", False) | |
| state = pr["state"] # 'open' or 'closed' | |
| if user not in repo_user_stats: | |
| repo_user_stats[user] = { | |
| "email": email, | |
| "total_prs": 0, | |
| "merged_prs": 0, | |
| "closed_not_merged": 0, | |
| "denied_prs": 0, | |
| "source": "github" | |
| } | |
| repo_user_stats[user]["total_prs"] += 1 | |
| if is_merged: | |
| repo_user_stats[user]["merged_prs"] += 1 | |
| elif state == "closed" and not is_merged: | |
| repo_user_stats[user]["closed_not_merged"] += 1 | |
| else: | |
| repo_user_stats[user]["denied_prs"] += 1 # Open PRs or denied PRs | |
| log_message(f"β Processed PR {pr['number']} | Merged: {is_merged} | State: {state} | User: {user}") | |
| if len(prs) < 100 or total_prs >= limit: | |
| break | |
| page += 1 | |
| except Exception as e: | |
| log_message(f"β Error processing repo {full_repo_name}: {str(e)}") | |
| return repo_user_stats | |
| # Process repositories in parallel | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: | |
| futures = {executor.submit(process_repo, repo): repo for repo in repositories_to_scrape} | |
| for future in concurrent.futures.as_completed(futures): | |
| repo = futures[future] | |
| try: | |
| repo_stats = future.result() | |
| for user, stats in repo_stats.items(): | |
| if user not in user_stats: | |
| user_stats[user] = stats | |
| else: | |
| user_stats[user]["total_prs"] += stats["total_prs"] | |
| user_stats[user]["merged_prs"] += stats["merged_prs"] | |
| user_stats[user]["closed_not_merged"] += stats["closed_not_merged"] | |
| user_stats[user]["denied_prs"] += stats["denied_prs"] | |
| except Exception as e: | |
| log_message(f"β Error processing repository {repo}: {str(e)}") | |
| return user_stats | |
| def fetch_bitbucket_prs(token, bitbucket_url, workspaces, limit, scrape_range, threads=4): | |
| """Fetch PR metrics from Bitbucket repositories across all workspaces.""" | |
| headers = {"Authorization": f"Bearer {token}"} | |
| user_stats = {} | |
| since_date = parse_scrape_range(scrape_range) # Function to convert range (e.g., "3y") into datetime | |
| def get_repositories(workspace): | |
| """Fetch all repositories for a given workspace.""" | |
| repo_list = [] | |
| start = 0 | |
| while True: | |
| repo_url = f"{bitbucket_url}/rest/api/latest/projects/{workspace}/repos?limit=100&start={start}" | |
| response = requests.get(repo_url, headers=headers, verify=False) | |
| if response.status_code != 200: | |
| log_message(f"β οΈ Failed to fetch repositories for workspace {workspace}: {response.text}") | |
| return [] | |
| data = response.json() | |
| repos = data.get("values", []) | |
| repo_list.extend([f"{workspace}/{repo['slug']}" for repo in repos]) | |
| if data.get("isLastPage", True): | |
| break | |
| start = data.get("nextPageStart", 0) | |
| return repo_list | |
| def process_repo(full_repo_name): | |
| """Fetch PRs for a single repository.""" | |
| try: | |
| workspace_name, repo_name = full_repo_name.split("/") | |
| pr_url = f"{bitbucket_url}/rest/api/latest/projects/{workspace_name}/repos/{repo_name}/pull-requests?state=ALL&reviewer=" | |
| pr_response = requests.get(pr_url, headers=headers, verify=False) | |
| if pr_response.status_code != 200: | |
| log_message(f"β οΈ Failed to fetch PRs from {repo_name}. Response: {pr_response.text}") | |
| return {} | |
| prs = pr_response.json().get("values", []) | |
| repo_user_stats = {} | |
| for pr in prs: | |
| created_at = datetime.datetime.fromtimestamp(pr["createdDate"] / 1000.0) | |
| if created_at < since_date: | |
| continue # Skip old PRs | |
| user = pr["author"]["user"]["name"] | |
| email = pr["author"]["user"].get("emailAddress", "N/A") | |
| state = pr["state"].lower() | |
| if user not in repo_user_stats: | |
| repo_user_stats[user] = { | |
| "email": email, | |
| "total_prs": 0, | |
| "merged_prs": 0, | |
| "closed_not_merged": 0, # β Ensuring the key always exists | |
| "denied_prs": 0, # β Ensuring the key always exists | |
| "source": "bitbucket" | |
| } | |
| repo_user_stats[user]["total_prs"] += 1 | |
| if state == "merged": | |
| repo_user_stats[user]["merged_prs"] += 1 | |
| elif state in ["declined", "closed"]: | |
| repo_user_stats[user]["denied_prs"] += 1 # β Treat declined and closed as denied | |
| repo_user_stats[user]["closed_not_merged"] += 1 # β Same as denied | |
| return repo_user_stats | |
| except Exception as e: | |
| log_message(f"β Error processing repo {full_repo_name}: {str(e)}") | |
| return {} | |
| # Step 1: Get all repositories from all workspaces | |
| all_repositories = [] | |
| for workspace in workspaces: | |
| log_message(f"π Fetching repositories for workspace '{workspace}'...") | |
| all_repositories.extend(get_repositories(workspace)) | |
| if not all_repositories: | |
| log_message("β οΈ No repositories found. Exiting.") | |
| return {} | |
| log_message(f"π Found {len(all_repositories)} repositories to process...") | |
| # Step 2: Fetch PRs from all repositories in parallel | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: | |
| results = list(executor.map(process_repo, all_repositories)) | |
| # Step 3: Merge all PR stats | |
| for result in results: | |
| for user, stats in result.items(): | |
| if user not in user_stats: | |
| user_stats[user] = stats | |
| else: | |
| user_stats[user]["total_prs"] += stats["total_prs"] | |
| user_stats[user]["merged_prs"] += stats["merged_prs"] | |
| user_stats[user]["denied_prs"] += stats["denied_prs"] | |
| user_stats[user]["closed_not_merged"] += stats["closed_not_merged"] # β Same as denied | |
| return user_stats | |
| def fetch_monthly_data(args, month): | |
| """Fetch GitHub and Bitbucket data for a specific month.""" | |
| # Convert the month string (e.g., "2024-12") to a datetime object for the start of the month | |
| since_date = parse_scrape_range(args.scrape_range) # Ensure it returns a valid datetime | |
| github_data = {} | |
| bitbucket_data = {} | |
| # Fetch GitHub data if target is GitHub or not specified | |
| if args.target in ["github", None] and args.github_token: | |
| github_data = fetch_github_prs( | |
| args.github_token, | |
| args.org.split(",") if args.org else [], | |
| args.limit, | |
| since_date, # Pass the start of the month as since_date | |
| args.repositories_file | |
| ) | |
| # Fetch Bitbucket data if target is Bitbucket or not specified | |
| if args.target in ["bitbucket", None] and args.bitbucket_token and args.bitbucket_url: | |
| bitbucket_data = fetch_bitbucket_prs( | |
| args.bitbucket_token, | |
| args.bitbucket_url, | |
| args.workspace.split(",") if args.workspace else [], | |
| args.limit, | |
| since_date, # Pass the start of the month as since_date | |
| args.threads | |
| ) | |
| # Combine data if requested | |
| if args.combined: | |
| return combine_github_bitbucket(github_data, bitbucket_data) | |
| else: | |
| return {**github_data, **bitbucket_data} | |
| def save_monthly_to_file(monthly_data, target, output_format): | |
| """Saves the monthly aggregated data to a file.""" | |
| date_str = datetime.datetime.now().strftime("%m-%d-%Y") | |
| filename = f"{target}_monthly_{date_str}.{output_format}" | |
| if output_format == "json": | |
| with open(filename, "w") as f: | |
| json.dump(monthly_data, f, indent=4) | |
| elif output_format == "csv": | |
| with open(filename, "w", newline="") as f: | |
| writer = csv.writer(f) | |
| # Add a "Month" column to the CSV | |
| writer.writerow(["Month", "User", "Email", "Total PRs", "Merged PRs", "Closed but Not Merged PRs", "Denied PRs", "Source"]) | |
| for month, data in monthly_data.items(): | |
| for user, stats in data.items(): | |
| writer.writerow([ | |
| month, # Add the month column | |
| user, | |
| stats["email"], | |
| stats["total_prs"], | |
| stats["merged_prs"], | |
| stats["closed_not_merged"], | |
| stats["denied_prs"], | |
| stats["source"] | |
| ]) | |
| print(f"β Monthly results saved to {filename}") | |
| def get_all_github_repos(token, orgs): | |
| """Fetch all repositories for specified GitHub organizations.""" | |
| headers = {"Authorization": f"token {token}"} | |
| all_repos = [] | |
| for org in orgs: | |
| page = 1 | |
| while True: | |
| url = f"https://api.github.com/orgs/{org}/repos?per_page=100&page={page}" | |
| response = requests.get(url, headers=headers) | |
| if response.status_code != 200: | |
| log_message(f"β Failed to fetch repos for {org}: {response.text}") | |
| break | |
| repos = response.json() | |
| if not repos: | |
| break | |
| all_repos.extend([f"{org}/{repo['name']}" for repo in repos]) | |
| page += 1 | |
| return all_repos | |
| def get_all_bitbucket_repos(token, bitbucket_url, workspaces): | |
| """Fetch all repositories for specified Bitbucket workspaces.""" | |
| headers = {"Authorization": f"Bearer {token}"} | |
| all_repos = [] | |
| for workspace in workspaces: | |
| start = 0 | |
| while True: | |
| url = f"{bitbucket_url}/rest/api/latest/projects/{workspace}/repos?limit=100&start={start}" | |
| response = requests.get(url, headers=headers, verify=False) | |
| if response.status_code != 200: | |
| log_message(f"β Failed to fetch repos for {workspace}: {response.text}") | |
| break | |
| data = response.json() | |
| repos = data.get("values", []) | |
| all_repos.extend([f"{workspace}/{repo['slug']}" for repo in repos]) | |
| if data.get("isLastPage", True): | |
| break | |
| start = data.get("nextPageStart", 0) | |
| return all_repos | |
| def gather_repository_metrics(args): | |
| """Main function to gather repository metrics based on CLI flags.""" | |
| repo_metrics = {} | |
| repositories = [] | |
| # Get repository list | |
| if args.repositories_file: | |
| repositories = load_repositories(args.repositories_file) | |
| else: | |
| if args.target == "github" and args.org: | |
| repositories = get_all_github_repos(args.github_token, args.org.split(",")) | |
| elif args.target == "bitbucket" and args.workspace: | |
| repositories = get_all_bitbucket_repos( | |
| args.bitbucket_token, | |
| args.bitbucket_url, | |
| args.workspace.split(",") | |
| ) | |
| # Process repositories | |
| for repo in repositories: | |
| if args.target == "github": | |
| org, repo_name = repo.split("/") | |
| pr_url = f"https://api.github.com/repos/{org}/{repo_name}/pulls?state=all" | |
| response = requests.get( | |
| pr_url, | |
| headers={"Authorization": f"token {args.github_token}"} | |
| ) | |
| if response.status_code == 200: | |
| prs = response.json() | |
| open_prs = sum(1 for pr in prs if pr["state"] == "open") | |
| closed_prs = sum(1 for pr in prs if pr["state"] == "closed") | |
| repo_metrics[repo] = {"open_prs": open_prs, "closed_prs": closed_prs} | |
| elif args.target == "bitbucket": | |
| workspace, repo_name = repo.split("/") | |
| pr_url = f"{args.bitbucket_url}/rest/api/latest/projects/{workspace}/repos/{repo_name}/pull-requests?state=ALL" | |
| response = requests.get( | |
| pr_url, | |
| headers={"Authorization": f"Bearer {args.bitbucket_token}"}, | |
| verify=False | |
| ) | |
| if response.status_code == 200: | |
| prs = response.json().get("values", []) | |
| open_prs = sum(1 for pr in prs if pr["state"] == "OPEN") | |
| closed_prs = sum(1 for pr in prs if pr["state"] in ["MERGED", "DECLINED"]) | |
| repo_metrics[repo] = {"open_prs": open_prs, "closed_prs": closed_prs} | |
| return repo_metrics | |
| def get_repository_list(args): | |
| """Get list of repositories based on input flags.""" | |
| repositories = [] | |
| if args.repositories_file: | |
| repositories = load_repositories(args.repositories_file) | |
| log_message(f"π Loaded {len(repositories)} repositories from file") | |
| elif args.target == "github" and args.org: | |
| log_message(f"π Discovering repositories in GitHub org(s): {args.org}") | |
| repositories = get_all_github_repos(args.github_token, args.org.split(",")) | |
| elif args.target == "bitbucket" and args.workspace: | |
| log_message(f"π Discovering repositories in Bitbucket workspace(s): {args.workspace}") | |
| repositories = get_all_bitbucket_repos( | |
| args.bitbucket_token, | |
| args.bitbucket_url, | |
| args.workspace.split(",") | |
| ) | |
| log_message(f"π Total repositories to analyze: {len(repositories)}") | |
| return repositories | |
| def get_github_repo_stats(token, repo_full_name, limit=100): | |
| """Get PR statistics for a single GitHub repository with pagination.""" | |
| headers = {"Authorization": f"token {token}"} | |
| org, repo_name = repo_full_name.split("/") | |
| stats = {"open_prs": 0, "closed_prs": 0} | |
| total_prs = 0 | |
| page = 1 | |
| while True: | |
| url = f"https://api.github.com/repos/{org}/{repo_name}/pulls?state=all&per_page=100&page={page}" | |
| response = requests.get(url, headers=headers) | |
| if response.status_code != 200: | |
| log_message(f"β οΈ Failed to fetch PRs for {repo_full_name}: {response.text}") | |
| break | |
| prs = response.json() | |
| if not prs: | |
| break | |
| for pr in prs: | |
| if total_prs >= limit: | |
| break | |
| if pr["state"] == "open": | |
| stats["open_prs"] += 1 | |
| else: | |
| stats["closed_prs"] += 1 | |
| total_prs += 1 | |
| if len(prs) < 100 or total_prs >= limit: | |
| break | |
| page += 1 | |
| return repo_full_name, stats | |
| def get_bitbucket_repo_stats(token, base_url, repo_full_name, limit=100): | |
| """Get PR statistics for a single Bitbucket repository with pagination.""" | |
| headers = {"Authorization": f"Bearer {token}"} | |
| workspace, repo_name = repo_full_name.split("/") | |
| stats = {"open_prs": 0, "closed_prs": 0} | |
| total_prs = 0 | |
| start = 0 | |
| while True: | |
| url = f"{base_url}/rest/api/latest/projects/{workspace}/repos/{repo_name}/pull-requests?state=ALL&limit=100&start={start}" | |
| response = requests.get(url, headers=headers, verify=False) | |
| if response.status_code != 200: | |
| log_message(f"β οΈ Failed to fetch PRs for {repo_full_name}: {response.text}") | |
| break | |
| data = response.json() | |
| prs = data.get("values", []) | |
| for pr in prs: | |
| if total_prs >= limit: | |
| break | |
| if pr["state"] == "OPEN": | |
| stats["open_prs"] += 1 | |
| else: | |
| stats["closed_prs"] += 1 | |
| total_prs += 1 | |
| if data.get("isLastPage", True) or total_prs >= limit: | |
| break | |
| start = data.get("nextPageStart", start + len(prs)) | |
| return repo_full_name, stats | |
| def fetch_repo_metrics(args): | |
| """Fetch open/closed PR counts for repositories with limit support.""" | |
| repo_metrics = {} | |
| repositories = get_repository_list(args) | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor: | |
| futures = [] | |
| for repo in repositories: | |
| if args.target == "github": | |
| futures.append(executor.submit( | |
| get_github_repo_stats, | |
| args.github_token, | |
| repo, | |
| args.limit | |
| )) | |
| elif args.target == "bitbucket": | |
| futures.append(executor.submit( | |
| get_bitbucket_repo_stats, | |
| args.bitbucket_token, | |
| args.bitbucket_url, | |
| repo, | |
| args.limit | |
| )) | |
| for future in concurrent.futures.as_completed(futures): | |
| try: | |
| repo_name, stats = future.result() | |
| repo_metrics[repo_name] = stats | |
| log_message(f"β Processed {repo_name} - Open: {stats['open_prs']} | Closed: {stats['closed_prs']} (Limit: {args.limit})") | |
| except Exception as e: | |
| log_message(f"β Error processing {repo_name}: {str(e)}") | |
| return repo_metrics | |
| def save_repo_metrics_to_file(repo_metrics, target, output_format): | |
| """Save repository metrics to file.""" | |
| date_str = datetime.datetime.now().strftime("%Y-%m-%d") | |
| filename = f"{target}_repo_metrics_{date_str}.{output_format}" | |
| if output_format == "json": | |
| with open(filename, "w") as f: | |
| json.dump(repo_metrics, f, indent=4) | |
| elif output_format == "csv": | |
| with open(filename, "w", newline="") as f: | |
| writer = csv.writer(f) | |
| writer.writerow(["Repository", "Open PRs", "Closed PRs"]) | |
| for repo, metrics in repo_metrics.items(): | |
| writer.writerow([repo, metrics["open_prs"], metrics["closed_prs"]]) | |
| log_message(f"β Repository metrics saved to {filename}") | |
| def fetch_monthly_user_data(args, month): | |
| """Fetch user metrics for a specific month.""" | |
| since_date = parse_scrape_range(args.scrape_range) # Ensure it returns a valid datetime | |
| log_message(f"π Fetching data for {month} (since_date: {since_date})") | |
| github_data = {} | |
| bitbucket_data = {} | |
| # Fetch GitHub data | |
| if args.target in ["github", None] and args.github_token: | |
| github_data = fetch_github_prs( | |
| args.github_token, | |
| args.org.split(",") if args.org else [], | |
| args.limit, | |
| since_date, | |
| args.repositories_file | |
| ) | |
| # Fetch Bitbucket data | |
| if args.target in ["bitbucket", None] and args.bitbucket_token and args.bitbucket_url: | |
| bitbucket_data = fetch_bitbucket_prs( | |
| args.bitbucket_token, | |
| args.bitbucket_url, | |
| args.workspace.split(",") if args.workspace else [], | |
| args.limit, | |
| since_date, | |
| args.threads | |
| ) | |
| # Combine data if requested | |
| if args.combined: | |
| return combine_github_bitbucket(github_data, bitbucket_data) | |
| else: | |
| return {**github_data, **bitbucket_data} | |
| def save_monthly_user_data(monthly_data, output_format): | |
| """Save monthly user metrics to a file.""" | |
| date_str = datetime.datetime.now().strftime("%Y-%m-%d") | |
| filename = f"user_metrics_monthly_{date_str}.{output_format}" | |
| if output_format == "json": | |
| with open(filename, "w") as f: | |
| json.dump(monthly_data, f, indent=4) | |
| elif output_format == "csv": | |
| with open(filename, "w", newline="") as f: | |
| writer = csv.writer(f) | |
| # Write header | |
| writer.writerow(["Month", "User", "Email", "Total PRs", "Merged PRs", | |
| "Closed but Not Merged PRs", "Denied PRs", "Source"]) | |
| # Write data | |
| for month, user_data in monthly_data.items(): | |
| for user, stats in user_data.items(): | |
| writer.writerow([ | |
| month, | |
| user, | |
| stats["email"], | |
| stats["total_prs"], | |
| stats["merged_prs"], | |
| stats["closed_not_merged"], | |
| stats["denied_prs"], | |
| stats["source"] | |
| ]) | |
| log_message(f"β Monthly user metrics saved to {filename}") | |
| def fetch_user_data(args): | |
| """Fetch user metrics for a single time range.""" | |
| scrape_range = args.scrape_range if args.scrape_range else "1m" # Default to 1 month | |
| since_date = parse_scrape_range(scrape_range) | |
| log_message(f"π Fetching data since {since_date}") | |
| # Load repositories from file | |
| repositories_to_scrape = [] | |
| if args.repositories_file: | |
| repositories_to_scrape = load_repositories(args.repositories_file) | |
| if not repositories_to_scrape: | |
| log_message("β οΈ No valid repositories found in the file.") | |
| return {} | |
| github_data = {} | |
| bitbucket_data = {} | |
| # Fetch GitHub data | |
| if args.target in ["github", None] and args.github_token: | |
| github_data = fetch_github_prs( | |
| args.github_token, | |
| args.org.split(",") if args.org else [], | |
| args.limit, | |
| args.scrape_range, | |
| repositories_to_scrape # Pass the loaded repositories | |
| ) | |
| # Fetch Bitbucket data | |
| if args.target in ["bitbucket", None] and args.bitbucket_token and args.bitbucket_url: | |
| bitbucket_data = fetch_bitbucket_prs( | |
| args.bitbucket_token, | |
| args.bitbucket_url, | |
| args.workspace.split(",") if args.workspace else [], | |
| args.limit, | |
| args.scrape_range, | |
| args.threads | |
| ) | |
| # Combine data if requested | |
| if args.combined: | |
| return combine_github_bitbucket(github_data, bitbucket_data) | |
| else: | |
| return {**github_data, **bitbucket_data} | |
| def save_user_data_to_file(user_data, target_name, output_format): | |
| """Save user metrics to a file.""" | |
| date_str = datetime.datetime.now().strftime("%Y-%m-%d") | |
| filename = f"{target_name}_user_metrics_{date_str}.{output_format}" | |
| if output_format == "json": | |
| with open(filename, "w") as f: | |
| json.dump(user_data, f, indent=4) | |
| elif output_format == "csv": | |
| with open(filename, "w", newline="") as f: | |
| writer = csv.writer(f) | |
| # Write header | |
| writer.writerow(["User", "Email", "Total PRs", "Merged PRs", | |
| "Closed but Not Merged PRs", "Denied PRs", "Source"]) | |
| # Write data | |
| for user, stats in user_data.items(): | |
| writer.writerow([ | |
| user, | |
| stats["email"], | |
| stats["total_prs"], | |
| stats["merged_prs"], | |
| stats["closed_not_merged"], | |
| stats["denied_prs"], | |
| stats["source"] | |
| ]) | |
| log_message(f"β User metrics saved to {filename}") | |
| def fetch_monthly_data(args): | |
| """Fetch user metrics month-to-month using the existing fetch_monthly_user_data function.""" | |
| months_to_scrape = parse_scrape_range_monthly(args.scrape_range) | |
| monthly_data = {} | |
| for month in months_to_scrape: | |
| log_message(f"π Processing month: {month}") | |
| monthly_data[month] = fetch_monthly_user_data(args, month) | |
| return monthly_data | |
| def fetch_monthly_user_stats(args): | |
| """ | |
| Wrapper to fetch user metrics month-to-month. | |
| For each month, we temporarily patch the date boundaries so that | |
| the existing fetch_user_data() functions only process PRs for that month. | |
| """ | |
| monthly_stats = {} | |
| # Get list of months (e.g., ["2023-03", "2023-04", ...]) | |
| months = parse_scrape_range_monthly(args.scrape_range) | |
| # Save the original parse_scrape_range so we can restore it later | |
| original_parse_scrape_range = parse_scrape_range | |
| for month in months: | |
| # Compute the start and end dates for the month | |
| start_date = datetime.datetime.strptime(month + "-01", "%Y-%m-%d") | |
| end_date = start_date + relativedelta(months=1) | |
| log_message(f"π Processing month: {month} (from {start_date} to {end_date})") | |
| # Monkey-patch parse_scrape_range so it always returns start_date | |
| globals()['parse_scrape_range'] = lambda x: start_date | |
| # Create a subclass of datetime.datetime that overrides utcnow() | |
| class FixedDateTime(datetime.datetime): | |
| @classmethod | |
| def utcnow(cls): | |
| return end_date | |
| # Use unittest.mock.patch to temporarily override datetime.datetime | |
| # Note: adjust the patch target if your module name differs (e.g. '__main__.datetime.datetime') | |
| with patch('datetime.datetime', FixedDateTime): | |
| # Call the existing fetch_user_data function which uses parse_scrape_range() | |
| data = fetch_user_data(args) | |
| # Add the month to each user record | |
| for user in data: | |
| data[user]["month"] = month | |
| monthly_stats[month] = data | |
| # Restore the original parse_scrape_range function for the next iteration | |
| globals()['parse_scrape_range'] = original_parse_scrape_range | |
| return monthly_stats | |
| def main(): | |
| try: | |
| # Argument parsing | |
| parser = argparse.ArgumentParser(description="Scrape PR metrics from GitHub and/or Bitbucket.") | |
| # Common arguments | |
| parser.add_argument("--target", choices=["github", "bitbucket"], | |
| help="Specify platform to scrape") | |
| parser.add_argument("--output", required=True, choices=["json", "csv"], | |
| help="Output format") | |
| # Mode selection arguments | |
| parser.add_argument("--repo-metrics", action="store_true", | |
| help="Generate repository-level metrics (open/closed PR counts)") | |
| parser.add_argument("--month-to-month", action="store_true", | |
| help="Show user statistics month-to-month") | |
| # User metrics arguments | |
| parser.add_argument("--combined", action="store_true", | |
| help="Combine GitHub & Bitbucket user metrics") | |
| parser.add_argument("--scrape-range", | |
| help="Time range for PRs (e.g., '1y', '2m', '3w', '4d')") | |
| # Authentication arguments | |
| parser.add_argument("--github-token", | |
| help="GitHub PAT for authentication") | |
| parser.add_argument("--bitbucket-token", | |
| help="Bitbucket API Token") | |
| parser.add_argument("--bitbucket-url", | |
| help="Custom Bitbucket Server URL") | |
| # Data source arguments | |
| parser.add_argument("--org", | |
| help="Comma-separated GitHub organization names") | |
| parser.add_argument("--workspace", | |
| help="Comma-separated Bitbucket workspace names") | |
| parser.add_argument("--repositories-file", | |
| help="Path to file containing specific repositories to scrape") | |
| # Performance arguments | |
| parser.add_argument("--limit", type=int, default=100, | |
| help="Max PRs to fetch per repository") | |
| parser.add_argument("--threads", type=int, default=4, | |
| help="Number of concurrent threads") | |
| parser.add_argument("--check-rate-limit", action="store_true", | |
| help="Check GitHub API rate limit before scraping") | |
| args = parser.parse_args() | |
| # Validate arguments | |
| if args.repo_metrics and args.month_to_month: | |
| log_message("β --month-to-month is not compatible with --repo-metrics") | |
| exit(1) | |
| if args.check_rate_limit: | |
| check_github_rate_limit(args.github_token) | |
| exit(0) | |
| # Repository metrics mode | |
| if args.repo_metrics: | |
| if not args.target: | |
| log_message("β --target is required for repository metrics") | |
| exit(1) | |
| log_message("π Running repository metrics collection") | |
| repo_metrics = fetch_repo_metrics(args) | |
| if repo_metrics: | |
| save_repo_metrics_to_file(repo_metrics, args.target, args.output) | |
| else: | |
| log_message("β οΈ No repository metrics collected") | |
| exit(0) | |
| # User metrics mode | |
| try: | |
| if args.month_to_month: | |
| log_message("π Running month-to-month user analysis") | |
| monthly_data = fetch_monthly_user_stats(args) | |
| # Save using your existing monthly saver, which for CSV writes the month column. | |
| save_monthly_user_data(monthly_data, args.output) | |
| else: | |
| log_message("β³ Running single-range user analysis") | |
| user_data = fetch_user_data(args) | |
| if user_data: | |
| target_name = "combined" if args.combined else args.target or "results" | |
| save_user_data_to_file(user_data, target_name, args.output) | |
| else: | |
| log_message("β οΈ No user data collected") | |
| except Exception as e: | |
| log_message(f"β Critical error: {str(e)}") | |
| exit(1) | |
| log_message("β Operation completed successfully") | |
| except KeyboardInterrupt: | |
| log_message("π Script interrupted by user. Exiting gracefully...") | |
| exit(0) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment