Skip to content

Instantly share code, notes, and snippets.

@Richard-Barrett
Last active February 25, 2025 03:52
Show Gist options
  • Select an option

  • Save Richard-Barrett/c17cea79b0b2b7b2a3cd48da3b53d13c to your computer and use it in GitHub Desktop.

Select an option

Save Richard-Barrett/c17cea79b0b2b7b2a3cd48da3b53d13c to your computer and use it in GitHub Desktop.
Scrape Metrics on Pull Requests on GitHub and/or Bitbucket
#!/bin/env python3
import argparse
import requests
import json
import csv
import concurrent.futures
from unittest.mock import patch
import os
import datetime
from dateutil.relativedelta import relativedelta
from tqdm import tqdm # Progress bar
import urllib3
# Suppress only SSL warnings, keep others
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
LOG_FILE = "scrape_log.txt"
def log_message(message):
"""Logs messages to a file and prints it to the console."""
with open(LOG_FILE, "a") as log:
log.write(f"{datetime.datetime.now()} - {message}\n")
print(message) # Print to console as well
def get_github_user_email(token, username):
"""Fetch the email of a GitHub user if it's publicly available."""
headers = {"Authorization": f"token {token}"}
user_url = f"https://api.github.com/users/{username}"
response = requests.get(user_url, headers=headers)
if response.status_code == 200:
email = response.json().get("email", "N/A")
if email:
return email
return "N/A"
def check_github_rate_limit(token):
"""Check GitHub API rate limit before running a scrape."""
headers = {"Authorization": f"token {token}"}
url = "https://api.github.com/rate_limit"
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
remaining = data["rate"]["remaining"]
reset_time = datetime.datetime.utcfromtimestamp(data["rate"]["reset"]).strftime('%Y-%m-%d %H:%M:%S UTC')
log_message(f"πŸ“Š GitHub API Rate Limit: {remaining} requests remaining. Resets at: {reset_time}")
if remaining < 10:
log_message("⚠️ API rate limit is critically low. Exiting to prevent failures.")
exit(1)
else:
log_message(f"❌ Failed to fetch rate limit. Response: {response.text}")
except Exception as e:
log_message(f"❌ Error checking GitHub rate limit: {e}")
exit(1)
def load_repositories(file_path):
"""Load repositories from a file, skipping invalid lines."""
repositories = []
try:
with open(file_path, "r") as f:
for line in f:
line = line.strip()
if line and "/" in line: # Ensure line contains org/repo format
repositories.append(line)
else:
log_message(f"⚠️ Skipping invalid repository line: {line}")
log_message(f"πŸ“‚ Loaded {len(repositories)} valid repositories from {file_path}")
return repositories
except Exception as e:
log_message(f"❌ Error reading repository file: {e}")
return []
def parse_scrape_range(scrape_range):
"""Convert scrape_range (e.g., '1y', '2m', '3w', '4d') into a datetime object."""
now = datetime.datetime.utcnow()
if not scrape_range:
return now - relativedelta(months=1) # Default: last 1 month
unit = scrape_range[-1]
value = scrape_range[:-1]
try:
value = int(value)
except ValueError:
raise ValueError(f"Invalid scrape range format: {scrape_range}")
if unit == 'y':
since_date = now - relativedelta(years=value)
elif unit == 'm':
since_date = now - relativedelta(months=value)
elif unit == 'w':
since_date = now - relativedelta(weeks=value)
elif unit == 'd':
since_date = now - relativedelta(days=value)
else:
raise ValueError("Invalid --scrape-range format. Use '1y', '2m', '3w', '4d'.")
return since_date # This ensures a valid datetime object is returned
def parse_scrape_range_monthly(scrape_range):
"""Convert scrape_range into a list of months."""
now = datetime.datetime.utcnow()
if not scrape_range:
return [now.strftime("%Y-%m")] # Default: current month
unit = scrape_range[-1]
value = scrape_range[:-1]
try:
value = int(value)
except ValueError:
raise ValueError(f"Invalid scrape range format: {scrape_range}")
start_date = now
if unit == 'y':
start_date = now - relativedelta(years=value)
elif unit == 'm':
start_date = now - relativedelta(months=value)
elif unit == 'w':
start_date = now - relativedelta(weeks=value)
elif unit == 'd':
start_date = now - relativedelta(days=value)
else:
raise ValueError("Invalid --scrape-range format. Use '1y', '2m', '3w', '4d'.")
# Generate a list of months
months = []
current_date = start_date.replace(day=1) # Start from the first day of the start month
while current_date <= now:
months.append(current_date.strftime("%Y-%m")) # Ensure proper formatting
current_date += relativedelta(months=1)
log_message(f"πŸ“… Months to scrape: {months}")
return months
def save_to_file(data, target, output_format):
"""Saves the aggregated data to a file."""
date_str = datetime.datetime.now().strftime("%m-%d-%Y")
filename = f"{target}_{date_str}.{output_format}"
if output_format == "json":
with open(filename, "w") as f:
json.dump(data, f, indent=4)
elif output_format == "csv":
with open(filename, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["User", "Email", "Total PRs", "Merged PRs", "Closed but Not Merged PRs", "Denied PRs", "Source"])
for user, stats in data.items():
writer.writerow([
user, stats["email"], stats["total_prs"], stats["merged_prs"],
stats["closed_not_merged"], stats["denied_prs"], stats["source"]
])
print(f"βœ… Results saved to {filename}")
def combine_github_bitbucket(github_data, bitbucket_data):
"""Combines PR metrics from GitHub and Bitbucket using email as the unique identifier."""
combined_data = {}
# Process GitHub data
for user, stats in github_data.items():
email = stats["email"] if stats["email"] != "N/A" else f"unknown_github_{user}"
if email not in combined_data:
combined_data[email] = {
"user": user,
"email": stats["email"],
"total_prs": stats["total_prs"],
"merged_prs": stats["merged_prs"],
"closed_not_merged": stats["closed_not_merged"],
"denied_prs": stats["denied_prs"],
"source": "github"
}
# Process Bitbucket data
for user, stats in bitbucket_data.items():
email = stats["email"] if stats["email"] != "N/A" else f"unknown_bitbucket_{user}"
if email in combined_data:
# Combine metrics if the email matches
combined_data[email]["total_prs"] += stats["total_prs"]
combined_data[email]["merged_prs"] += stats["merged_prs"]
combined_data[email]["closed_not_merged"] += stats["closed_not_merged"]
combined_data[email]["denied_prs"] += stats["denied_prs"]
combined_data[email]["source"] = "combined" # Update source to combined
else:
# If no match, add the Bitbucket user
combined_data[email] = {
"user": user,
"email": stats["email"],
"total_prs": stats["total_prs"],
"merged_prs": stats["merged_prs"],
"closed_not_merged": stats["closed_not_merged"],
"denied_prs": stats["denied_prs"],
"source": "bitbucket"
}
return combined_data
def fetch_bitbucket_prs(token, bitbucket_url, workspaces, limit, scrape_range, threads=4):
"""Fetch PR metrics from Bitbucket repositories."""
headers = {"Authorization": f"Bearer {token}"}
user_stats = {}
since_date = parse_scrape_range(scrape_range)
def get_repositories(workspace):
"""Fetch all repositories for a given workspace."""
repo_list = []
start = 0
while True:
repo_url = f"{bitbucket_url}/rest/api/latest/projects/{workspace}/repos?limit=100&start={start}"
response = requests.get(repo_url, headers=headers, verify=False)
if response.status_code != 200:
log_message(f"⚠️ Failed to fetch repositories for workspace {workspace}: {response.text}")
break
data = response.json()
repos = data.get("values", [])
repo_list.extend([f"{workspace}/{repo['slug']}" for repo in repos])
if data.get("isLastPage", True):
break
start = data.get("nextPageStart", start + len(repos))
return repo_list
def process_repo(full_repo_name):
"""Fetch PRs for a single repository."""
try:
workspace_name, repo_name = full_repo_name.split("/")
pr_url = f"{bitbucket_url}/rest/api/latest/projects/{workspace_name}/repos/{repo_name}/pull-requests"
params = {
"state": "ALL",
"limit": 100,
"start": 0
}
repo_user_stats = {}
while True:
response = requests.get(pr_url, headers=headers, params=params, verify=False)
if response.status_code != 200:
log_message(f"⚠️ Failed to fetch PRs from {repo_name}. Response: {response.text}")
break
data = response.json()
prs = data.get("values", [])
for pr in prs:
created_at = datetime.datetime.fromtimestamp(pr["createdDate"] / 1000.0)
if created_at < since_date:
continue # Skip old PRs
user = pr["author"]["user"]["name"]
email = pr["author"]["user"].get("emailAddress", "N/A")
state = pr["state"].lower()
if user not in repo_user_stats:
repo_user_stats[user] = {
"email": email,
"total_prs": 0,
"merged_prs": 0,
"closed_not_merged": 0,
"denied_prs": 0,
"source": "bitbucket"
}
repo_user_stats[user]["total_prs"] += 1
if state == "merged":
repo_user_stats[user]["merged_prs"] += 1
elif state in ["declined", "closed"]:
repo_user_stats[user]["denied_prs"] += 1
repo_user_stats[user]["closed_not_merged"] += 1
if data.get("isLastPage", True):
break
params["start"] = data.get("nextPageStart", params["start"] + len(prs))
return repo_user_stats
except Exception as e:
log_message(f"❌ Error processing repo {full_repo_name}: {str(e)}")
return {}
# Step 1: Get all repositories from all workspaces
all_repositories = []
for workspace in workspaces:
log_message(f"πŸ” Fetching repositories for workspace '{workspace}'...")
all_repositories.extend(get_repositories(workspace))
if not all_repositories:
log_message("⚠️ No repositories found. Exiting.")
return {}
log_message(f"πŸ“Œ Found {len(all_repositories)} repositories to process...")
# Step 2: Fetch PRs from all repositories in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
futures = [executor.submit(process_repo, repo) for repo in all_repositories]
for future in concurrent.futures.as_completed(futures):
try:
repo_stats = future.result()
for user, stats in repo_stats.items():
if user not in user_stats:
user_stats[user] = stats
else:
user_stats[user]["total_prs"] += stats["total_prs"]
user_stats[user]["merged_prs"] += stats["merged_prs"]
user_stats[user]["denied_prs"] += stats["denied_prs"]
user_stats[user]["closed_not_merged"] += stats["closed_not_merged"]
except Exception as e:
log_message(f"❌ Error processing repository: {str(e)}")
return user_stats
def fetch_github_prs(token, orgs, limit, scrape_range, repo_list=None, threads=4):
"""Fetch PR metrics from GitHub repositories for a given scrape range."""
headers = {"Authorization": f"token {token}"}
user_stats = {}
# Convert scrape_range to a proper datetime object
since_date = parse_scrape_range(scrape_range)
until_date = datetime.datetime.utcnow() # PRs up until now
repositories_to_scrape = repo_list if repo_list else []
if not repositories_to_scrape:
log_message("⚠️ No repositories found to scrape.")
return {}
log_message(f"πŸ“Œ Processing {len(repositories_to_scrape)} repositories since {since_date} with {threads} threads...")
def process_repo(full_repo_name):
"""Process a single repository and return user stats."""
repo_user_stats = {}
try:
if "/" not in full_repo_name:
log_message(f"⚠️ Skipping invalid repository format: {full_repo_name}")
return {}
org_name, repo_name = full_repo_name.split("/")
pr_url = f"https://api.github.com/repos/{org_name}/{repo_name}/pulls"
params = {
"state": "all",
"per_page": 100, # Max allowed by GitHub
"sort": "created",
"direction": "desc"
}
page = 1
total_prs = 0
while True:
params["page"] = page
response = requests.get(pr_url, headers=headers, params=params)
if response.status_code != 200:
log_message(f"❌ Failed to fetch PRs from {repo_name}. Response: {response.status_code} - {response.text}")
break
prs = response.json()
if not prs:
break
log_message(f"πŸ“Š Found {len(prs)} PRs in {repo_name} (Page {page})")
for pr in prs:
created_at = datetime.datetime.strptime(pr["created_at"], "%Y-%m-%dT%H:%M:%SZ")
if created_at < since_date:
log_message(f"⚠️ PR {pr['number']} is older than {since_date}. Stopping pagination.")
break # Stop fetching older PRs
if created_at >= until_date:
continue # Skip PRs from the future
if total_prs >= limit:
log_message(f"⚠️ Reached PR limit of {limit} for {repo_name}")
break
# Process PR...
total_prs += 1
user = pr["user"]["login"]
email = get_github_user_email(token, user)
# Fetch PR details to determine if it was merged
pr_details_url = f"https://api.github.com/repos/{org_name}/{repo_name}/pulls/{pr['number']}"
pr_details_response = requests.get(pr_details_url, headers=headers)
if pr_details_response.status_code != 200:
log_message(f"⚠️ Failed to fetch PR details for PR {pr['number']} in {repo_name}. Response: {pr_details_response.status_code}")
continue
pr_details = pr_details_response.json()
is_merged = pr_details.get("merged", False)
state = pr["state"] # 'open' or 'closed'
if user not in repo_user_stats:
repo_user_stats[user] = {
"email": email,
"total_prs": 0,
"merged_prs": 0,
"closed_not_merged": 0,
"denied_prs": 0,
"source": "github"
}
repo_user_stats[user]["total_prs"] += 1
if is_merged:
repo_user_stats[user]["merged_prs"] += 1
elif state == "closed" and not is_merged:
repo_user_stats[user]["closed_not_merged"] += 1
else:
repo_user_stats[user]["denied_prs"] += 1 # Open PRs or denied PRs
log_message(f"βœ… Processed PR {pr['number']} | Merged: {is_merged} | State: {state} | User: {user}")
if len(prs) < 100 or total_prs >= limit:
break
page += 1
except Exception as e:
log_message(f"❌ Error processing repo {full_repo_name}: {str(e)}")
return repo_user_stats
# Process repositories in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
futures = {executor.submit(process_repo, repo): repo for repo in repositories_to_scrape}
for future in concurrent.futures.as_completed(futures):
repo = futures[future]
try:
repo_stats = future.result()
for user, stats in repo_stats.items():
if user not in user_stats:
user_stats[user] = stats
else:
user_stats[user]["total_prs"] += stats["total_prs"]
user_stats[user]["merged_prs"] += stats["merged_prs"]
user_stats[user]["closed_not_merged"] += stats["closed_not_merged"]
user_stats[user]["denied_prs"] += stats["denied_prs"]
except Exception as e:
log_message(f"❌ Error processing repository {repo}: {str(e)}")
return user_stats
def fetch_bitbucket_prs(token, bitbucket_url, workspaces, limit, scrape_range, threads=4):
"""Fetch PR metrics from Bitbucket repositories across all workspaces."""
headers = {"Authorization": f"Bearer {token}"}
user_stats = {}
since_date = parse_scrape_range(scrape_range) # Function to convert range (e.g., "3y") into datetime
def get_repositories(workspace):
"""Fetch all repositories for a given workspace."""
repo_list = []
start = 0
while True:
repo_url = f"{bitbucket_url}/rest/api/latest/projects/{workspace}/repos?limit=100&start={start}"
response = requests.get(repo_url, headers=headers, verify=False)
if response.status_code != 200:
log_message(f"⚠️ Failed to fetch repositories for workspace {workspace}: {response.text}")
return []
data = response.json()
repos = data.get("values", [])
repo_list.extend([f"{workspace}/{repo['slug']}" for repo in repos])
if data.get("isLastPage", True):
break
start = data.get("nextPageStart", 0)
return repo_list
def process_repo(full_repo_name):
"""Fetch PRs for a single repository."""
try:
workspace_name, repo_name = full_repo_name.split("/")
pr_url = f"{bitbucket_url}/rest/api/latest/projects/{workspace_name}/repos/{repo_name}/pull-requests?state=ALL&reviewer="
pr_response = requests.get(pr_url, headers=headers, verify=False)
if pr_response.status_code != 200:
log_message(f"⚠️ Failed to fetch PRs from {repo_name}. Response: {pr_response.text}")
return {}
prs = pr_response.json().get("values", [])
repo_user_stats = {}
for pr in prs:
created_at = datetime.datetime.fromtimestamp(pr["createdDate"] / 1000.0)
if created_at < since_date:
continue # Skip old PRs
user = pr["author"]["user"]["name"]
email = pr["author"]["user"].get("emailAddress", "N/A")
state = pr["state"].lower()
if user not in repo_user_stats:
repo_user_stats[user] = {
"email": email,
"total_prs": 0,
"merged_prs": 0,
"closed_not_merged": 0, # βœ… Ensuring the key always exists
"denied_prs": 0, # βœ… Ensuring the key always exists
"source": "bitbucket"
}
repo_user_stats[user]["total_prs"] += 1
if state == "merged":
repo_user_stats[user]["merged_prs"] += 1
elif state in ["declined", "closed"]:
repo_user_stats[user]["denied_prs"] += 1 # βœ… Treat declined and closed as denied
repo_user_stats[user]["closed_not_merged"] += 1 # βœ… Same as denied
return repo_user_stats
except Exception as e:
log_message(f"❌ Error processing repo {full_repo_name}: {str(e)}")
return {}
# Step 1: Get all repositories from all workspaces
all_repositories = []
for workspace in workspaces:
log_message(f"πŸ” Fetching repositories for workspace '{workspace}'...")
all_repositories.extend(get_repositories(workspace))
if not all_repositories:
log_message("⚠️ No repositories found. Exiting.")
return {}
log_message(f"πŸ“Œ Found {len(all_repositories)} repositories to process...")
# Step 2: Fetch PRs from all repositories in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
results = list(executor.map(process_repo, all_repositories))
# Step 3: Merge all PR stats
for result in results:
for user, stats in result.items():
if user not in user_stats:
user_stats[user] = stats
else:
user_stats[user]["total_prs"] += stats["total_prs"]
user_stats[user]["merged_prs"] += stats["merged_prs"]
user_stats[user]["denied_prs"] += stats["denied_prs"]
user_stats[user]["closed_not_merged"] += stats["closed_not_merged"] # βœ… Same as denied
return user_stats
def fetch_monthly_data(args, month):
"""Fetch GitHub and Bitbucket data for a specific month."""
# Convert the month string (e.g., "2024-12") to a datetime object for the start of the month
since_date = parse_scrape_range(args.scrape_range) # Ensure it returns a valid datetime
github_data = {}
bitbucket_data = {}
# Fetch GitHub data if target is GitHub or not specified
if args.target in ["github", None] and args.github_token:
github_data = fetch_github_prs(
args.github_token,
args.org.split(",") if args.org else [],
args.limit,
since_date, # Pass the start of the month as since_date
args.repositories_file
)
# Fetch Bitbucket data if target is Bitbucket or not specified
if args.target in ["bitbucket", None] and args.bitbucket_token and args.bitbucket_url:
bitbucket_data = fetch_bitbucket_prs(
args.bitbucket_token,
args.bitbucket_url,
args.workspace.split(",") if args.workspace else [],
args.limit,
since_date, # Pass the start of the month as since_date
args.threads
)
# Combine data if requested
if args.combined:
return combine_github_bitbucket(github_data, bitbucket_data)
else:
return {**github_data, **bitbucket_data}
def save_monthly_to_file(monthly_data, target, output_format):
"""Saves the monthly aggregated data to a file."""
date_str = datetime.datetime.now().strftime("%m-%d-%Y")
filename = f"{target}_monthly_{date_str}.{output_format}"
if output_format == "json":
with open(filename, "w") as f:
json.dump(monthly_data, f, indent=4)
elif output_format == "csv":
with open(filename, "w", newline="") as f:
writer = csv.writer(f)
# Add a "Month" column to the CSV
writer.writerow(["Month", "User", "Email", "Total PRs", "Merged PRs", "Closed but Not Merged PRs", "Denied PRs", "Source"])
for month, data in monthly_data.items():
for user, stats in data.items():
writer.writerow([
month, # Add the month column
user,
stats["email"],
stats["total_prs"],
stats["merged_prs"],
stats["closed_not_merged"],
stats["denied_prs"],
stats["source"]
])
print(f"βœ… Monthly results saved to {filename}")
def get_all_github_repos(token, orgs):
"""Fetch all repositories for specified GitHub organizations."""
headers = {"Authorization": f"token {token}"}
all_repos = []
for org in orgs:
page = 1
while True:
url = f"https://api.github.com/orgs/{org}/repos?per_page=100&page={page}"
response = requests.get(url, headers=headers)
if response.status_code != 200:
log_message(f"❌ Failed to fetch repos for {org}: {response.text}")
break
repos = response.json()
if not repos:
break
all_repos.extend([f"{org}/{repo['name']}" for repo in repos])
page += 1
return all_repos
def get_all_bitbucket_repos(token, bitbucket_url, workspaces):
"""Fetch all repositories for specified Bitbucket workspaces."""
headers = {"Authorization": f"Bearer {token}"}
all_repos = []
for workspace in workspaces:
start = 0
while True:
url = f"{bitbucket_url}/rest/api/latest/projects/{workspace}/repos?limit=100&start={start}"
response = requests.get(url, headers=headers, verify=False)
if response.status_code != 200:
log_message(f"❌ Failed to fetch repos for {workspace}: {response.text}")
break
data = response.json()
repos = data.get("values", [])
all_repos.extend([f"{workspace}/{repo['slug']}" for repo in repos])
if data.get("isLastPage", True):
break
start = data.get("nextPageStart", 0)
return all_repos
def gather_repository_metrics(args):
"""Main function to gather repository metrics based on CLI flags."""
repo_metrics = {}
repositories = []
# Get repository list
if args.repositories_file:
repositories = load_repositories(args.repositories_file)
else:
if args.target == "github" and args.org:
repositories = get_all_github_repos(args.github_token, args.org.split(","))
elif args.target == "bitbucket" and args.workspace:
repositories = get_all_bitbucket_repos(
args.bitbucket_token,
args.bitbucket_url,
args.workspace.split(",")
)
# Process repositories
for repo in repositories:
if args.target == "github":
org, repo_name = repo.split("/")
pr_url = f"https://api.github.com/repos/{org}/{repo_name}/pulls?state=all"
response = requests.get(
pr_url,
headers={"Authorization": f"token {args.github_token}"}
)
if response.status_code == 200:
prs = response.json()
open_prs = sum(1 for pr in prs if pr["state"] == "open")
closed_prs = sum(1 for pr in prs if pr["state"] == "closed")
repo_metrics[repo] = {"open_prs": open_prs, "closed_prs": closed_prs}
elif args.target == "bitbucket":
workspace, repo_name = repo.split("/")
pr_url = f"{args.bitbucket_url}/rest/api/latest/projects/{workspace}/repos/{repo_name}/pull-requests?state=ALL"
response = requests.get(
pr_url,
headers={"Authorization": f"Bearer {args.bitbucket_token}"},
verify=False
)
if response.status_code == 200:
prs = response.json().get("values", [])
open_prs = sum(1 for pr in prs if pr["state"] == "OPEN")
closed_prs = sum(1 for pr in prs if pr["state"] in ["MERGED", "DECLINED"])
repo_metrics[repo] = {"open_prs": open_prs, "closed_prs": closed_prs}
return repo_metrics
def get_repository_list(args):
"""Get list of repositories based on input flags."""
repositories = []
if args.repositories_file:
repositories = load_repositories(args.repositories_file)
log_message(f"πŸ“‚ Loaded {len(repositories)} repositories from file")
elif args.target == "github" and args.org:
log_message(f"πŸ” Discovering repositories in GitHub org(s): {args.org}")
repositories = get_all_github_repos(args.github_token, args.org.split(","))
elif args.target == "bitbucket" and args.workspace:
log_message(f"πŸ” Discovering repositories in Bitbucket workspace(s): {args.workspace}")
repositories = get_all_bitbucket_repos(
args.bitbucket_token,
args.bitbucket_url,
args.workspace.split(",")
)
log_message(f"πŸ“Š Total repositories to analyze: {len(repositories)}")
return repositories
def get_github_repo_stats(token, repo_full_name, limit=100):
"""Get PR statistics for a single GitHub repository with pagination."""
headers = {"Authorization": f"token {token}"}
org, repo_name = repo_full_name.split("/")
stats = {"open_prs": 0, "closed_prs": 0}
total_prs = 0
page = 1
while True:
url = f"https://api.github.com/repos/{org}/{repo_name}/pulls?state=all&per_page=100&page={page}"
response = requests.get(url, headers=headers)
if response.status_code != 200:
log_message(f"⚠️ Failed to fetch PRs for {repo_full_name}: {response.text}")
break
prs = response.json()
if not prs:
break
for pr in prs:
if total_prs >= limit:
break
if pr["state"] == "open":
stats["open_prs"] += 1
else:
stats["closed_prs"] += 1
total_prs += 1
if len(prs) < 100 or total_prs >= limit:
break
page += 1
return repo_full_name, stats
def get_bitbucket_repo_stats(token, base_url, repo_full_name, limit=100):
"""Get PR statistics for a single Bitbucket repository with pagination."""
headers = {"Authorization": f"Bearer {token}"}
workspace, repo_name = repo_full_name.split("/")
stats = {"open_prs": 0, "closed_prs": 0}
total_prs = 0
start = 0
while True:
url = f"{base_url}/rest/api/latest/projects/{workspace}/repos/{repo_name}/pull-requests?state=ALL&limit=100&start={start}"
response = requests.get(url, headers=headers, verify=False)
if response.status_code != 200:
log_message(f"⚠️ Failed to fetch PRs for {repo_full_name}: {response.text}")
break
data = response.json()
prs = data.get("values", [])
for pr in prs:
if total_prs >= limit:
break
if pr["state"] == "OPEN":
stats["open_prs"] += 1
else:
stats["closed_prs"] += 1
total_prs += 1
if data.get("isLastPage", True) or total_prs >= limit:
break
start = data.get("nextPageStart", start + len(prs))
return repo_full_name, stats
def fetch_repo_metrics(args):
"""Fetch open/closed PR counts for repositories with limit support."""
repo_metrics = {}
repositories = get_repository_list(args)
with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor:
futures = []
for repo in repositories:
if args.target == "github":
futures.append(executor.submit(
get_github_repo_stats,
args.github_token,
repo,
args.limit
))
elif args.target == "bitbucket":
futures.append(executor.submit(
get_bitbucket_repo_stats,
args.bitbucket_token,
args.bitbucket_url,
repo,
args.limit
))
for future in concurrent.futures.as_completed(futures):
try:
repo_name, stats = future.result()
repo_metrics[repo_name] = stats
log_message(f"βœ… Processed {repo_name} - Open: {stats['open_prs']} | Closed: {stats['closed_prs']} (Limit: {args.limit})")
except Exception as e:
log_message(f"❌ Error processing {repo_name}: {str(e)}")
return repo_metrics
def save_repo_metrics_to_file(repo_metrics, target, output_format):
"""Save repository metrics to file."""
date_str = datetime.datetime.now().strftime("%Y-%m-%d")
filename = f"{target}_repo_metrics_{date_str}.{output_format}"
if output_format == "json":
with open(filename, "w") as f:
json.dump(repo_metrics, f, indent=4)
elif output_format == "csv":
with open(filename, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["Repository", "Open PRs", "Closed PRs"])
for repo, metrics in repo_metrics.items():
writer.writerow([repo, metrics["open_prs"], metrics["closed_prs"]])
log_message(f"βœ… Repository metrics saved to {filename}")
def fetch_monthly_user_data(args, month):
"""Fetch user metrics for a specific month."""
since_date = parse_scrape_range(args.scrape_range) # Ensure it returns a valid datetime
log_message(f"πŸ” Fetching data for {month} (since_date: {since_date})")
github_data = {}
bitbucket_data = {}
# Fetch GitHub data
if args.target in ["github", None] and args.github_token:
github_data = fetch_github_prs(
args.github_token,
args.org.split(",") if args.org else [],
args.limit,
since_date,
args.repositories_file
)
# Fetch Bitbucket data
if args.target in ["bitbucket", None] and args.bitbucket_token and args.bitbucket_url:
bitbucket_data = fetch_bitbucket_prs(
args.bitbucket_token,
args.bitbucket_url,
args.workspace.split(",") if args.workspace else [],
args.limit,
since_date,
args.threads
)
# Combine data if requested
if args.combined:
return combine_github_bitbucket(github_data, bitbucket_data)
else:
return {**github_data, **bitbucket_data}
def save_monthly_user_data(monthly_data, output_format):
"""Save monthly user metrics to a file."""
date_str = datetime.datetime.now().strftime("%Y-%m-%d")
filename = f"user_metrics_monthly_{date_str}.{output_format}"
if output_format == "json":
with open(filename, "w") as f:
json.dump(monthly_data, f, indent=4)
elif output_format == "csv":
with open(filename, "w", newline="") as f:
writer = csv.writer(f)
# Write header
writer.writerow(["Month", "User", "Email", "Total PRs", "Merged PRs",
"Closed but Not Merged PRs", "Denied PRs", "Source"])
# Write data
for month, user_data in monthly_data.items():
for user, stats in user_data.items():
writer.writerow([
month,
user,
stats["email"],
stats["total_prs"],
stats["merged_prs"],
stats["closed_not_merged"],
stats["denied_prs"],
stats["source"]
])
log_message(f"βœ… Monthly user metrics saved to {filename}")
def fetch_user_data(args):
"""Fetch user metrics for a single time range."""
scrape_range = args.scrape_range if args.scrape_range else "1m" # Default to 1 month
since_date = parse_scrape_range(scrape_range)
log_message(f"πŸ” Fetching data since {since_date}")
# Load repositories from file
repositories_to_scrape = []
if args.repositories_file:
repositories_to_scrape = load_repositories(args.repositories_file)
if not repositories_to_scrape:
log_message("⚠️ No valid repositories found in the file.")
return {}
github_data = {}
bitbucket_data = {}
# Fetch GitHub data
if args.target in ["github", None] and args.github_token:
github_data = fetch_github_prs(
args.github_token,
args.org.split(",") if args.org else [],
args.limit,
args.scrape_range,
repositories_to_scrape # Pass the loaded repositories
)
# Fetch Bitbucket data
if args.target in ["bitbucket", None] and args.bitbucket_token and args.bitbucket_url:
bitbucket_data = fetch_bitbucket_prs(
args.bitbucket_token,
args.bitbucket_url,
args.workspace.split(",") if args.workspace else [],
args.limit,
args.scrape_range,
args.threads
)
# Combine data if requested
if args.combined:
return combine_github_bitbucket(github_data, bitbucket_data)
else:
return {**github_data, **bitbucket_data}
def save_user_data_to_file(user_data, target_name, output_format):
"""Save user metrics to a file."""
date_str = datetime.datetime.now().strftime("%Y-%m-%d")
filename = f"{target_name}_user_metrics_{date_str}.{output_format}"
if output_format == "json":
with open(filename, "w") as f:
json.dump(user_data, f, indent=4)
elif output_format == "csv":
with open(filename, "w", newline="") as f:
writer = csv.writer(f)
# Write header
writer.writerow(["User", "Email", "Total PRs", "Merged PRs",
"Closed but Not Merged PRs", "Denied PRs", "Source"])
# Write data
for user, stats in user_data.items():
writer.writerow([
user,
stats["email"],
stats["total_prs"],
stats["merged_prs"],
stats["closed_not_merged"],
stats["denied_prs"],
stats["source"]
])
log_message(f"βœ… User metrics saved to {filename}")
def fetch_monthly_data(args):
"""Fetch user metrics month-to-month using the existing fetch_monthly_user_data function."""
months_to_scrape = parse_scrape_range_monthly(args.scrape_range)
monthly_data = {}
for month in months_to_scrape:
log_message(f"πŸ” Processing month: {month}")
monthly_data[month] = fetch_monthly_user_data(args, month)
return monthly_data
def fetch_monthly_user_stats(args):
"""
Wrapper to fetch user metrics month-to-month.
For each month, we temporarily patch the date boundaries so that
the existing fetch_user_data() functions only process PRs for that month.
"""
monthly_stats = {}
# Get list of months (e.g., ["2023-03", "2023-04", ...])
months = parse_scrape_range_monthly(args.scrape_range)
# Save the original parse_scrape_range so we can restore it later
original_parse_scrape_range = parse_scrape_range
for month in months:
# Compute the start and end dates for the month
start_date = datetime.datetime.strptime(month + "-01", "%Y-%m-%d")
end_date = start_date + relativedelta(months=1)
log_message(f"πŸ” Processing month: {month} (from {start_date} to {end_date})")
# Monkey-patch parse_scrape_range so it always returns start_date
globals()['parse_scrape_range'] = lambda x: start_date
# Create a subclass of datetime.datetime that overrides utcnow()
class FixedDateTime(datetime.datetime):
@classmethod
def utcnow(cls):
return end_date
# Use unittest.mock.patch to temporarily override datetime.datetime
# Note: adjust the patch target if your module name differs (e.g. '__main__.datetime.datetime')
with patch('datetime.datetime', FixedDateTime):
# Call the existing fetch_user_data function which uses parse_scrape_range()
data = fetch_user_data(args)
# Add the month to each user record
for user in data:
data[user]["month"] = month
monthly_stats[month] = data
# Restore the original parse_scrape_range function for the next iteration
globals()['parse_scrape_range'] = original_parse_scrape_range
return monthly_stats
def main():
try:
# Argument parsing
parser = argparse.ArgumentParser(description="Scrape PR metrics from GitHub and/or Bitbucket.")
# Common arguments
parser.add_argument("--target", choices=["github", "bitbucket"],
help="Specify platform to scrape")
parser.add_argument("--output", required=True, choices=["json", "csv"],
help="Output format")
# Mode selection arguments
parser.add_argument("--repo-metrics", action="store_true",
help="Generate repository-level metrics (open/closed PR counts)")
parser.add_argument("--month-to-month", action="store_true",
help="Show user statistics month-to-month")
# User metrics arguments
parser.add_argument("--combined", action="store_true",
help="Combine GitHub & Bitbucket user metrics")
parser.add_argument("--scrape-range",
help="Time range for PRs (e.g., '1y', '2m', '3w', '4d')")
# Authentication arguments
parser.add_argument("--github-token",
help="GitHub PAT for authentication")
parser.add_argument("--bitbucket-token",
help="Bitbucket API Token")
parser.add_argument("--bitbucket-url",
help="Custom Bitbucket Server URL")
# Data source arguments
parser.add_argument("--org",
help="Comma-separated GitHub organization names")
parser.add_argument("--workspace",
help="Comma-separated Bitbucket workspace names")
parser.add_argument("--repositories-file",
help="Path to file containing specific repositories to scrape")
# Performance arguments
parser.add_argument("--limit", type=int, default=100,
help="Max PRs to fetch per repository")
parser.add_argument("--threads", type=int, default=4,
help="Number of concurrent threads")
parser.add_argument("--check-rate-limit", action="store_true",
help="Check GitHub API rate limit before scraping")
args = parser.parse_args()
# Validate arguments
if args.repo_metrics and args.month_to_month:
log_message("❌ --month-to-month is not compatible with --repo-metrics")
exit(1)
if args.check_rate_limit:
check_github_rate_limit(args.github_token)
exit(0)
# Repository metrics mode
if args.repo_metrics:
if not args.target:
log_message("❌ --target is required for repository metrics")
exit(1)
log_message("πŸ“Š Running repository metrics collection")
repo_metrics = fetch_repo_metrics(args)
if repo_metrics:
save_repo_metrics_to_file(repo_metrics, args.target, args.output)
else:
log_message("⚠️ No repository metrics collected")
exit(0)
# User metrics mode
try:
if args.month_to_month:
log_message("πŸ“† Running month-to-month user analysis")
monthly_data = fetch_monthly_user_stats(args)
# Save using your existing monthly saver, which for CSV writes the month column.
save_monthly_user_data(monthly_data, args.output)
else:
log_message("⏳ Running single-range user analysis")
user_data = fetch_user_data(args)
if user_data:
target_name = "combined" if args.combined else args.target or "results"
save_user_data_to_file(user_data, target_name, args.output)
else:
log_message("⚠️ No user data collected")
except Exception as e:
log_message(f"❌ Critical error: {str(e)}")
exit(1)
log_message("βœ… Operation completed successfully")
except KeyboardInterrupt:
log_message("πŸ›‘ Script interrupted by user. Exiting gracefully...")
exit(0)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment