Skip to content

Instantly share code, notes, and snippets.

@SUT0L
Created January 29, 2024 06:48
Show Gist options
  • Select an option

  • Save SUT0L/bf5f6587b9da5ac4dfb163183afceab1 to your computer and use it in GitHub Desktop.

Select an option

Save SUT0L/bf5f6587b9da5ac4dfb163183afceab1 to your computer and use it in GitHub Desktop.
Get all emails from a users github accounts across all commits
import asyncio
import aiohttp
import re
import argparse
import time
from aiohttp import ClientSession
class RateLimiter:
def __init__(self, max_rate):
self.max_rate = max_rate
self.tokens = max_rate
self.last_check = time.time()
def acquire(self):
current = time.time()
time_passed = current - self.last_check
self.last_check = current
self.tokens += time_passed * self.max_rate
if self.tokens > self.max_rate:
self.tokens = self.max_rate
if self.tokens < 1:
sleep_time = (1 - self.tokens) / self.max_rate
time.sleep(sleep_time)
self.tokens = 0
else:
self.tokens -= 1
rate_limiter = RateLimiter(900/60) # 900 requests per minute cuz api stuff
email_pattern = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b')
excluded_extensions = {'.png', '.jpg', '.jpeg', '.gif', '.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.zip', '.rar', '.exe', '.dmg', '.iso'}
async def fetch(session, url, headers=None):
while True:
rate_limiter.acquire()
try:
async with session.get(url, headers=headers) as response:
rate_limit_remaining = int(response.headers.get('X-RateLimit-Remaining', 1))
if rate_limit_remaining == 0:
print("API rate limit exceeded. You cannot make any more requests at this time.")
return None
response.raise_for_status()
return await response.json()
except aiohttp.client_exceptions.ClientResponseError as e:
if e.status == 403:
print("403 Forbidden - API key may be required or invalid.")
return None
raise
async def get_user_repos(session, username, include_forks, headers=None):
print(f"Fetching repositories for user: {username}")
url = f"https://api.github.com/users/{username}/repos"
repos = await fetch(session, url, headers)
if repos is None:
print("No data fetched, possibly due to rate limit. Exiting.")
return None
if not include_forks:
repos = [repo for repo in repos if not repo['fork']]
print(f"Found {len(repos)} repositories for user {username} (include_forks={include_forks})")
return [repo['name'] for repo in repos]
async def get_repo_branches(session, username, repo_name, headers=None):
print(f"Fetching branches for repository: {repo_name}")
url = f"https://api.github.com/repos/{username}/{repo_name}/branches"
branches = await fetch(session, url, headers)
print(f"Found {len(branches)} branches for repository {repo_name}")
return [branch['name'] for branch in branches]
async def get_commits(session, username, repo_name, branch_name, headers=None):
print(f"Fetching commits for branch: {branch_name} in repository: {repo_name}")
url = f"https://api.github.com/repos/{username}/{repo_name}/commits?sha={branch_name}"
commits = await fetch(session, url, headers)
print(f"Found {len(commits)} commits for branch {branch_name} in repository {repo_name}")
return [commit['html_url'] for commit in commits]
async def get_emails_from_commit(commit_url, seen_emails):
print(f"Fetching commit data from URL: {commit_url}")
url = commit_url + ".patch"
async with aiohttp.ClientSession() as session:
response = await session.get(url)
text = await response.text()
potential_emails = set(email_pattern.findall(text))
emails = {email for email in potential_emails if not any(email.endswith(ext) for ext in excluded_extensions)}
new_emails = {(email, commit_url) for email in emails if email not in seen_emails}
seen_emails.update(emails)
print(f"Found {len(new_emails)} new emails in commit: {commit_url}")
return new_emails
async def main(username, include_forks, api_key=None):
headers = {"Authorization": f"token {api_key}"} if api_key else {}
seen_emails = set()
async with ClientSession() as session:
repos = await get_user_repos(session, username, include_forks, headers)
if repos is None:
return
branch_tasks = []
for repo in repos:
branch_tasks.append(get_repo_branches(session, username, repo, headers))
all_branches = await asyncio.gather(*branch_tasks)
commit_tasks = []
for repo, branches in zip(repos, all_branches):
for branch in branches:
commit_tasks.append(get_commits(session, username, repo, branch, headers))
all_commits = await asyncio.gather(*commit_tasks)
all_commits = [item for sublist in all_commits for item in sublist]
email_tasks = [get_emails_from_commit(commit_url, seen_emails) for commit_url in all_commits]
all_emails = await asyncio.gather(*email_tasks)
unique_emails = {email for emails in all_emails for email in emails}
print(f"Total unique emails found: {len(unique_emails)}")
for email in unique_emails:
print(email)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Fetch GitHub user's repositories, branches, commits, and extract emails.")
parser.add_argument('username', help='GitHub username to fetch data for')
parser.add_argument('--include-forks', action='store_true', help='Include forked repositories in the search')
parser.add_argument('--api-key', help='GitHub API key for authentication', default=None)
args = parser.parse_args()
if args.api_key is None:
print('We recommend, you provide an API key so you don\'t get rate limited. You can get one from https://github.com/settings/tokens?type=beta')
time.sleep(2)
asyncio.run(main(args.username, args.include_forks, args.api_key))
@DatDraggy
Copy link

Awesome. Found some emails in my commits from years ago that should not be there.
Squash all commits, git commit —amend —reset-author —no-edit then git push —force-with-lease

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment