Skip to content

Instantly share code, notes, and snippets.

@zone559
Created August 14, 2025 21:31
Show Gist options
  • Select an option

  • Save zone559/050082ccfc1e2dfe79746736e29ccd2e to your computer and use it in GitHub Desktop.

Select an option

Save zone559/050082ccfc1e2dfe79746736e29ccd2e to your computer and use it in GitHub Desktop.
for gallery-dl
# -*- coding: utf-8 -*-
# Copyright 2023 YourName
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
"""Extractor for https://www.snapchat.com/"""
from .common import Extractor, Message
from .. import text, util
import re
import json
import logging
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
import yt_dlp
# Disable urllib3 debug logging
logging.getLogger("urllib3").setLevel(logging.WARNING)
class SnapchatExtractor(Extractor):
"""Extractor for Snapchat user stories and spotlights"""
category = "snapchat"
subcategory = "user"
directory_fmt = ("{category}", "{username}")
filename_fmt = "{snap_id}.{extension}"
archive_fmt = "{snap_id}"
root = "https://www.snapchat.com"
request_interval = 1.0
cookiedomain = ".snapchat.com"
pattern = r"(?:https?://)?(?:www\.)?snapchat\.com/add/([^/?#]+)"
test = {
"https://www.snapchat.com/add/username": {
"url": "https://www.snapchat.com/add/username",
"keyword": {"username": "username"},
},
}
def __init__(self, match):
Extractor.__init__(self, match)
self.username = match.group(1).lower()
self.executor = ThreadPoolExecutor(max_workers=3)
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
}
def items(self):
url = f"{self.root}/add/{self.username}"
try:
response = self.request(url, headers=self.headers)
page = response.text
except Exception as e:
self.log.error("Failed to fetch page: %s", str(e))
return
data = self._parse_page_data(page)
if not data:
return
page_props = data.get("props", {}).get("pageProps", {})
yield Message.Directory, {"username": self.username}
# Process regular story snaps
if "story" in page_props and page_props["story"]:
for index, snap in enumerate(page_props["story"].get("snapList", [])):
yield from self._process_snap(snap, index)
# Process curated highlights
for highlight in page_props.get("curatedHighlights", []):
for index, snap in enumerate(highlight.get("snapList", [])):
yield from self._process_snap(snap, index, "highlight_")
# Process spotlights
for highlight in page_props.get("spotlightHighlights", []):
yield from self._process_spotlight(highlight)
# Wait for all threads to complete
self.executor.shutdown(wait=True)
def _parse_page_data(self, page):
"""Parse the JSON data from the page"""
script_pattern = re.compile(r'<script[^>]*type="application/json"[^>]*>(.*?)</script>', re.DOTALL)
match = script_pattern.search(page)
if not match:
self.log.error("No JSON data found in the HTML")
return None
try:
return json.loads(match.group(1).strip())
except json.JSONDecodeError as e:
self.log.error("Failed to decode JSON: %s", str(e))
return None
def _process_snap(self, snap, index=None, prefix=""):
"""Process a single snap (image or video)"""
snap_media_type = snap.get("snapMediaType")
snap_urls = snap.get("snapUrls", {})
media_url = snap_urls.get("mediaUrl")
preview_url = snap_urls.get("mediaPreviewUrl", {}).get("value")
# Get ID using same method as test.py
snap_id = (
snap.get("snapId", {}).get("value") or
snap.get("storyId", {}).get("value") or
(media_url.split("/")[-1].split(".")[0] if media_url else f"snap_{index}")
)
if not snap_id:
return
data = {
"snap_id": snap_id,
"username": self.username,
"date": self._parse_timestamp(snap.get("timestamp")),
"highlight": bool(prefix),
"_fallback": self._fallback_urls(media_url, preview_url),
}
# Image (type 0)
if snap_media_type == 0 and media_url:
data["extension"] = "jpg"
yield Message.Url, media_url, data
# Video (type 1)
elif snap_media_type == 1:
if media_url:
data["extension"] = "mp4"
yield Message.Url, media_url, data
if preview_url:
thumb_data = data.copy()
thumb_data["filename"] = f"{snap_id}_thumb"
thumb_data["extension"] = "jpg"
yield Message.Url, preview_url, thumb_data
def _process_spotlight(self, highlight):
"""Process spotlight videos using yt-dlp"""
snap_id = highlight.get("storyId", {}).get("value")
if not snap_id:
return
video_url = highlight.get("videoUrl", {}).get("value")
thumb_url = highlight.get("thumbnailUrl", {}).get("value")
# Submit spotlight download to thread pool
self.executor.submit(self._download_spotlight, snap_id)
# Process thumbnail if available
if thumb_url:
thumb_data = {
"snap_id": f"{snap_id}_thumb",
"username": self.username,
"extension": "jpg",
"highlight": True,
}
yield Message.Url, thumb_url, thumb_data
def _download_spotlight(self, snap_id):
"""Download spotlight video using yt-dlp (same as test.py)"""
spotlight_url = f"https://www.snapchat.com/spotlight/{snap_id}"
try:
ydl_opts = {
'outtmpl': '{}/%(id)s.%(ext)s'.format(self.username),
'quiet': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(spotlight_url, download=True)
if info:
return info.get('url')
except Exception as e:
self.log.error("Error downloading spotlight %s: %s", snap_id, str(e))
return None
def _parse_timestamp(self, ts_dict):
if not ts_dict or "value" not in ts_dict:
return None
try:
return datetime.fromtimestamp(ts_dict["value"] / 1000)
except (ValueError, TypeError):
return None
def _fallback_urls(self, *urls):
return [url for url in urls if url]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment