Last active
March 11, 2026 01:00
-
-
Save dlip/d0c389bdf10a44eb8ece71814cce10c7 to your computer and use it in GitHub Desktop.
Downloads top videos from channels
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import subprocess | |
| from googleapiclient.discovery import build | |
| from isodate import parse_duration | |
| MIN_VIEWS = 5000 | |
| TOP_VIDEOS_PER_CHANNEL = 20 | |
| MAX_HEIGHT = 720 | |
| MIN_DURATION_SECONDS = 120 # 2 minutes | |
| MAX_DURATION_SECONDS = 20 * 60 # 20 minutes | |
| CHANNELS_FILE = "channels.txt" | |
| APIKEY_FILE = "apikey.txt" | |
| # ---------------- Utilities ---------------- # | |
| def sanitize(name): | |
| return "".join(c for c in name if c not in r'\/:*?"<>|').strip() | |
| def read_api_key(): | |
| if not os.path.exists(APIKEY_FILE): | |
| raise FileNotFoundError("apikey.txt not found") | |
| with open(APIKEY_FILE, "r", encoding="utf-8") as f: | |
| first_line = f.readline().strip() | |
| if not first_line: | |
| raise RuntimeError("apikey.txt first line is empty") | |
| return first_line | |
| # ---------------- YouTube Logic ---------------- # | |
| def lookup_channel_id(youtube, channel_name): | |
| response = ( | |
| youtube.search() | |
| .list(q=channel_name, type="channel", part="snippet", maxResults=1) | |
| .execute() | |
| ) | |
| items = response.get("items", []) | |
| if not items: | |
| return None | |
| return items[0]["snippet"]["channelId"] | |
| def resolve_all_channels(youtube): | |
| if not os.path.exists(CHANNELS_FILE): | |
| raise FileNotFoundError("channels.txt not found") | |
| with open(CHANNELS_FILE, "r", encoding="utf-8") as f: | |
| lines = [line.strip() for line in f if line.strip()] | |
| resolved = [] | |
| updated_lines = [] | |
| print("Resolving channel IDs...") | |
| for line in lines: | |
| if "|" in line: | |
| channel_id, channel_name = line.split("|", 1) | |
| else: | |
| channel_name = line | |
| channel_id = lookup_channel_id(youtube, channel_name) | |
| if not channel_id: | |
| raise RuntimeError( | |
| f"ERROR: Could not resolve channel ID for '{channel_name}'" | |
| ) | |
| print(f"Resolved {channel_name} → {channel_id}") | |
| resolved.append((channel_id, channel_name)) | |
| updated_lines.append(f"{channel_id}|{channel_name}") | |
| with open(CHANNELS_FILE, "w", encoding="utf-8") as f: | |
| for line in updated_lines: | |
| f.write(line + "\n") | |
| print("All channel IDs resolved.\n") | |
| return resolved | |
| def get_uploads_playlist(youtube, channel_id): | |
| response = youtube.channels().list(part="contentDetails", id=channel_id).execute() | |
| return response["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"] | |
| def video_is_available(video): | |
| status = video.get("status", {}) | |
| snippet = video.get("snippet", {}) | |
| if status.get("privacyStatus") != "public": | |
| return False | |
| if status.get("uploadStatus") != "processed": | |
| return False | |
| if snippet.get("liveBroadcastContent") == "live": | |
| return False | |
| region_restriction = video.get("contentDetails", {}).get("regionRestriction") | |
| if region_restriction and "blocked" in region_restriction: | |
| return False | |
| return True | |
| def fetch_top_videos(youtube, uploads_playlist): | |
| videos = [] | |
| next_page = None | |
| while True: | |
| playlist_response = ( | |
| youtube.playlistItems() | |
| .list( | |
| part="contentDetails", | |
| playlistId=uploads_playlist, | |
| maxResults=50, | |
| pageToken=next_page, | |
| ) | |
| .execute() | |
| ) | |
| video_ids = [ | |
| item["contentDetails"]["videoId"] for item in playlist_response["items"] | |
| ] | |
| if not video_ids: | |
| break | |
| video_response = ( | |
| youtube.videos() | |
| .list( | |
| part="snippet,statistics,contentDetails,status", id=",".join(video_ids) | |
| ) | |
| .execute() | |
| ) | |
| for video in video_response["items"]: | |
| if not video_is_available(video): | |
| continue | |
| duration = parse_duration( | |
| video["contentDetails"]["duration"] | |
| ).total_seconds() | |
| views = int(video["statistics"].get("viewCount", 0)) | |
| if duration < MIN_DURATION_SECONDS: | |
| continue | |
| if duration > MAX_DURATION_SECONDS: | |
| continue | |
| if views < MIN_VIEWS: | |
| continue | |
| videos.append( | |
| { | |
| "title": video["snippet"]["title"], | |
| "video_id": video["id"], | |
| "url": f"https://www.youtube.com/watch?v={video['id']}", | |
| "view_count": views, | |
| } | |
| ) | |
| next_page = playlist_response.get("nextPageToken") | |
| if not next_page: | |
| break | |
| videos.sort(key=lambda x: x["view_count"], reverse=True) | |
| return videos[:TOP_VIDEOS_PER_CHANNEL] | |
| def download_videos(channel_name, videos): | |
| safe_name = sanitize(channel_name) | |
| os.makedirs(safe_name, exist_ok=True) | |
| # Build set of existing video IDs once | |
| existing_ids = set() | |
| for filename in os.listdir(safe_name): | |
| if "[" in filename and "]" in filename: | |
| vid = filename.split("[")[-1].split("]")[0] | |
| if len(vid) == 11: | |
| existing_ids.add(vid) | |
| for video in videos: | |
| video_id = video["video_id"] | |
| if video_id in existing_ids: | |
| print(f"Skipping (already downloaded): {video['title']}") | |
| continue | |
| print(f"Downloading: {video['title']}") | |
| subprocess.run( | |
| [ | |
| "yt-dlp", | |
| "-f", | |
| f"bestvideo[height<={MAX_HEIGHT}]+bestaudio/best[height<={MAX_HEIGHT}]", | |
| "--merge-output-format", | |
| "mp4", | |
| "--continue", | |
| "--no-playlist", | |
| "--ignore-errors", | |
| "-o", | |
| os.path.join(safe_name, "%(title)s [%(id)s].%(ext)s"), | |
| video["url"], | |
| ] | |
| ) | |
| existing_ids.add(video_id) | |
| # ---------------- Main ---------------- # | |
| def main(): | |
| api_key = read_api_key() | |
| youtube = build("youtube", "v3", developerKey=api_key) | |
| channels = resolve_all_channels(youtube) | |
| for channel_id, channel_name in channels: | |
| print(f"\n=== Processing: {channel_name} ===") | |
| uploads_playlist = get_uploads_playlist(youtube, channel_id) | |
| videos = fetch_top_videos(youtube, uploads_playlist) | |
| print(f"Found {len(videos)} qualifying videos") | |
| if videos: | |
| download_videos(channel_name, videos) | |
| if __name__ == "__main__": | |
| main() |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Setup:
Create api key:
Install Python
Create channels.txt
Run
python kids_youtube_downloader.py