Created
August 14, 2025 21:31
-
-
Save zone559/050082ccfc1e2dfe79746736e29ccd2e to your computer and use it in GitHub Desktop.
for gallery-dl
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- coding: utf-8 -*- | |
| # Copyright 2023 YourName | |
| # | |
| # This program is free software; you can redistribute it and/or modify | |
| # it under the terms of the GNU General Public License version 2 as | |
| # published by the Free Software Foundation. | |
| """Extractor for https://www.snapchat.com/""" | |
| from .common import Extractor, Message | |
| from .. import text, util | |
| import re | |
| import json | |
| import logging | |
| from datetime import datetime | |
| from concurrent.futures import ThreadPoolExecutor | |
| import yt_dlp | |
| # Disable urllib3 debug logging | |
| logging.getLogger("urllib3").setLevel(logging.WARNING) | |
| class SnapchatExtractor(Extractor): | |
| """Extractor for Snapchat user stories and spotlights""" | |
| category = "snapchat" | |
| subcategory = "user" | |
| directory_fmt = ("{category}", "{username}") | |
| filename_fmt = "{snap_id}.{extension}" | |
| archive_fmt = "{snap_id}" | |
| root = "https://www.snapchat.com" | |
| request_interval = 1.0 | |
| cookiedomain = ".snapchat.com" | |
| pattern = r"(?:https?://)?(?:www\.)?snapchat\.com/add/([^/?#]+)" | |
| test = { | |
| "https://www.snapchat.com/add/username": { | |
| "url": "https://www.snapchat.com/add/username", | |
| "keyword": {"username": "username"}, | |
| }, | |
| } | |
| def __init__(self, match): | |
| Extractor.__init__(self, match) | |
| self.username = match.group(1).lower() | |
| self.executor = ThreadPoolExecutor(max_workers=3) | |
| self.headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3' | |
| } | |
| def items(self): | |
| url = f"{self.root}/add/{self.username}" | |
| try: | |
| response = self.request(url, headers=self.headers) | |
| page = response.text | |
| except Exception as e: | |
| self.log.error("Failed to fetch page: %s", str(e)) | |
| return | |
| data = self._parse_page_data(page) | |
| if not data: | |
| return | |
| page_props = data.get("props", {}).get("pageProps", {}) | |
| yield Message.Directory, {"username": self.username} | |
| # Process regular story snaps | |
| if "story" in page_props and page_props["story"]: | |
| for index, snap in enumerate(page_props["story"].get("snapList", [])): | |
| yield from self._process_snap(snap, index) | |
| # Process curated highlights | |
| for highlight in page_props.get("curatedHighlights", []): | |
| for index, snap in enumerate(highlight.get("snapList", [])): | |
| yield from self._process_snap(snap, index, "highlight_") | |
| # Process spotlights | |
| for highlight in page_props.get("spotlightHighlights", []): | |
| yield from self._process_spotlight(highlight) | |
| # Wait for all threads to complete | |
| self.executor.shutdown(wait=True) | |
| def _parse_page_data(self, page): | |
| """Parse the JSON data from the page""" | |
| script_pattern = re.compile(r'<script[^>]*type="application/json"[^>]*>(.*?)</script>', re.DOTALL) | |
| match = script_pattern.search(page) | |
| if not match: | |
| self.log.error("No JSON data found in the HTML") | |
| return None | |
| try: | |
| return json.loads(match.group(1).strip()) | |
| except json.JSONDecodeError as e: | |
| self.log.error("Failed to decode JSON: %s", str(e)) | |
| return None | |
| def _process_snap(self, snap, index=None, prefix=""): | |
| """Process a single snap (image or video)""" | |
| snap_media_type = snap.get("snapMediaType") | |
| snap_urls = snap.get("snapUrls", {}) | |
| media_url = snap_urls.get("mediaUrl") | |
| preview_url = snap_urls.get("mediaPreviewUrl", {}).get("value") | |
| # Get ID using same method as test.py | |
| snap_id = ( | |
| snap.get("snapId", {}).get("value") or | |
| snap.get("storyId", {}).get("value") or | |
| (media_url.split("/")[-1].split(".")[0] if media_url else f"snap_{index}") | |
| ) | |
| if not snap_id: | |
| return | |
| data = { | |
| "snap_id": snap_id, | |
| "username": self.username, | |
| "date": self._parse_timestamp(snap.get("timestamp")), | |
| "highlight": bool(prefix), | |
| "_fallback": self._fallback_urls(media_url, preview_url), | |
| } | |
| # Image (type 0) | |
| if snap_media_type == 0 and media_url: | |
| data["extension"] = "jpg" | |
| yield Message.Url, media_url, data | |
| # Video (type 1) | |
| elif snap_media_type == 1: | |
| if media_url: | |
| data["extension"] = "mp4" | |
| yield Message.Url, media_url, data | |
| if preview_url: | |
| thumb_data = data.copy() | |
| thumb_data["filename"] = f"{snap_id}_thumb" | |
| thumb_data["extension"] = "jpg" | |
| yield Message.Url, preview_url, thumb_data | |
| def _process_spotlight(self, highlight): | |
| """Process spotlight videos using yt-dlp""" | |
| snap_id = highlight.get("storyId", {}).get("value") | |
| if not snap_id: | |
| return | |
| video_url = highlight.get("videoUrl", {}).get("value") | |
| thumb_url = highlight.get("thumbnailUrl", {}).get("value") | |
| # Submit spotlight download to thread pool | |
| self.executor.submit(self._download_spotlight, snap_id) | |
| # Process thumbnail if available | |
| if thumb_url: | |
| thumb_data = { | |
| "snap_id": f"{snap_id}_thumb", | |
| "username": self.username, | |
| "extension": "jpg", | |
| "highlight": True, | |
| } | |
| yield Message.Url, thumb_url, thumb_data | |
| def _download_spotlight(self, snap_id): | |
| """Download spotlight video using yt-dlp (same as test.py)""" | |
| spotlight_url = f"https://www.snapchat.com/spotlight/{snap_id}" | |
| try: | |
| ydl_opts = { | |
| 'outtmpl': '{}/%(id)s.%(ext)s'.format(self.username), | |
| 'quiet': True, | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(spotlight_url, download=True) | |
| if info: | |
| return info.get('url') | |
| except Exception as e: | |
| self.log.error("Error downloading spotlight %s: %s", snap_id, str(e)) | |
| return None | |
| def _parse_timestamp(self, ts_dict): | |
| if not ts_dict or "value" not in ts_dict: | |
| return None | |
| try: | |
| return datetime.fromtimestamp(ts_dict["value"] / 1000) | |
| except (ValueError, TypeError): | |
| return None | |
| def _fallback_urls(self, *urls): | |
| return [url for url in urls if url] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment