Created
January 29, 2024 06:48
-
-
Save SUT0L/bf5f6587b9da5ac4dfb163183afceab1 to your computer and use it in GitHub Desktop.
Get all emails from a users github accounts across all commits
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import aiohttp | |
| import re | |
| import argparse | |
| import time | |
| from aiohttp import ClientSession | |
| class RateLimiter: | |
| def __init__(self, max_rate): | |
| self.max_rate = max_rate | |
| self.tokens = max_rate | |
| self.last_check = time.time() | |
| def acquire(self): | |
| current = time.time() | |
| time_passed = current - self.last_check | |
| self.last_check = current | |
| self.tokens += time_passed * self.max_rate | |
| if self.tokens > self.max_rate: | |
| self.tokens = self.max_rate | |
| if self.tokens < 1: | |
| sleep_time = (1 - self.tokens) / self.max_rate | |
| time.sleep(sleep_time) | |
| self.tokens = 0 | |
| else: | |
| self.tokens -= 1 | |
| rate_limiter = RateLimiter(900/60) # 900 requests per minute cuz api stuff | |
| email_pattern = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b') | |
| excluded_extensions = {'.png', '.jpg', '.jpeg', '.gif', '.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.zip', '.rar', '.exe', '.dmg', '.iso'} | |
| async def fetch(session, url, headers=None): | |
| while True: | |
| rate_limiter.acquire() | |
| try: | |
| async with session.get(url, headers=headers) as response: | |
| rate_limit_remaining = int(response.headers.get('X-RateLimit-Remaining', 1)) | |
| if rate_limit_remaining == 0: | |
| print("API rate limit exceeded. You cannot make any more requests at this time.") | |
| return None | |
| response.raise_for_status() | |
| return await response.json() | |
| except aiohttp.client_exceptions.ClientResponseError as e: | |
| if e.status == 403: | |
| print("403 Forbidden - API key may be required or invalid.") | |
| return None | |
| raise | |
| async def get_user_repos(session, username, include_forks, headers=None): | |
| print(f"Fetching repositories for user: {username}") | |
| url = f"https://api.github.com/users/{username}/repos" | |
| repos = await fetch(session, url, headers) | |
| if repos is None: | |
| print("No data fetched, possibly due to rate limit. Exiting.") | |
| return None | |
| if not include_forks: | |
| repos = [repo for repo in repos if not repo['fork']] | |
| print(f"Found {len(repos)} repositories for user {username} (include_forks={include_forks})") | |
| return [repo['name'] for repo in repos] | |
| async def get_repo_branches(session, username, repo_name, headers=None): | |
| print(f"Fetching branches for repository: {repo_name}") | |
| url = f"https://api.github.com/repos/{username}/{repo_name}/branches" | |
| branches = await fetch(session, url, headers) | |
| print(f"Found {len(branches)} branches for repository {repo_name}") | |
| return [branch['name'] for branch in branches] | |
| async def get_commits(session, username, repo_name, branch_name, headers=None): | |
| print(f"Fetching commits for branch: {branch_name} in repository: {repo_name}") | |
| url = f"https://api.github.com/repos/{username}/{repo_name}/commits?sha={branch_name}" | |
| commits = await fetch(session, url, headers) | |
| print(f"Found {len(commits)} commits for branch {branch_name} in repository {repo_name}") | |
| return [commit['html_url'] for commit in commits] | |
| async def get_emails_from_commit(commit_url, seen_emails): | |
| print(f"Fetching commit data from URL: {commit_url}") | |
| url = commit_url + ".patch" | |
| async with aiohttp.ClientSession() as session: | |
| response = await session.get(url) | |
| text = await response.text() | |
| potential_emails = set(email_pattern.findall(text)) | |
| emails = {email for email in potential_emails if not any(email.endswith(ext) for ext in excluded_extensions)} | |
| new_emails = {(email, commit_url) for email in emails if email not in seen_emails} | |
| seen_emails.update(emails) | |
| print(f"Found {len(new_emails)} new emails in commit: {commit_url}") | |
| return new_emails | |
| async def main(username, include_forks, api_key=None): | |
| headers = {"Authorization": f"token {api_key}"} if api_key else {} | |
| seen_emails = set() | |
| async with ClientSession() as session: | |
| repos = await get_user_repos(session, username, include_forks, headers) | |
| if repos is None: | |
| return | |
| branch_tasks = [] | |
| for repo in repos: | |
| branch_tasks.append(get_repo_branches(session, username, repo, headers)) | |
| all_branches = await asyncio.gather(*branch_tasks) | |
| commit_tasks = [] | |
| for repo, branches in zip(repos, all_branches): | |
| for branch in branches: | |
| commit_tasks.append(get_commits(session, username, repo, branch, headers)) | |
| all_commits = await asyncio.gather(*commit_tasks) | |
| all_commits = [item for sublist in all_commits for item in sublist] | |
| email_tasks = [get_emails_from_commit(commit_url, seen_emails) for commit_url in all_commits] | |
| all_emails = await asyncio.gather(*email_tasks) | |
| unique_emails = {email for emails in all_emails for email in emails} | |
| print(f"Total unique emails found: {len(unique_emails)}") | |
| for email in unique_emails: | |
| print(email) | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="Fetch GitHub user's repositories, branches, commits, and extract emails.") | |
| parser.add_argument('username', help='GitHub username to fetch data for') | |
| parser.add_argument('--include-forks', action='store_true', help='Include forked repositories in the search') | |
| parser.add_argument('--api-key', help='GitHub API key for authentication', default=None) | |
| args = parser.parse_args() | |
| if args.api_key is None: | |
| print('We recommend, you provide an API key so you don\'t get rate limited. You can get one from https://github.com/settings/tokens?type=beta') | |
| time.sleep(2) | |
| asyncio.run(main(args.username, args.include_forks, args.api_key)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Awesome. Found some emails in my commits from years ago that should not be there.
Squash all commits,
git commit —amend —reset-author —no-editthengit push —force-with-lease