Skip to content

Instantly share code, notes, and snippets.

@MikyPo
Created February 16, 2026 16:50
Show Gist options
  • Select an option

  • Save MikyPo/4c9602e33e49b30bf83c35da7177b191 to your computer and use it in GitHub Desktop.

Select an option

Save MikyPo/4c9602e33e49b30bf83c35da7177b191 to your computer and use it in GitHub Desktop.
Ya-Cloud-Functions_API_YaMetrika_load
# Developed by MikyPo
# More code for DA here: https://dzen.ru/mikypo
# Импорт библиотек
import os
import requests
from datetime import datetime, timedelta, timezone
import time
import json
# URL API Яндекс.Метрики для выгрузки данных в формате CSV
API_URL = 'https://api-metrika.yandex.net/stat/v1/data.csv'
# URL API Яндекс.Диска для получения ссылки на загрузку файла
DISK_UPLOAD_URL = 'https://cloud-api.yandex.net/v1/disk/resources/upload'
LIMIT = 100000 # Количество строк в одном запросе (лимит API Метрики)
MAX_RETRIES = 5 # Максимальное количество попыток повторного запроса при ошибке
UPLOAD_TIMEOUT = 300 # 5 минут на загрузку одного файла (таймаут для больших объёмов)
def fetch_csv_for_date(api_token, counter_id, target_date):
"""Получает CSV из Метрики за один день."""
params = {
'ids': counter_id,
'date1': target_date,
'date2': target_date,
# Выбираемые метрики: визиты и достижения целей
'metrics': 'ym:s:visits,ym:s:goal_reaches1,ym:s:goal_reaches2,ym:s:goal_reaches3',
# Измерения для детализации: устройство, источник, дата, UTM-метки и т.д.
'dimensions': 'ym:s:deviceCategory,ym:s:externalRefererDomain,ym:s:date,ym:s:referer,ym:s:UTMSource,ym:s:UTMMedium,ym:s:UTMCampaign,ym:s:UTMContent,ym:s:UTMTerm',
'limit': LIMIT,
'accuracy': 'full', # Полная точность данных
'attribution': 'lastsign', # Модель атрибуции
'include_undefined': True,
# Фильтр: оставляем только трафик из referrals (переходы с сайтов)
'filters': "ym:s:lastsignTrafficSource=='referral'"
}
# Заголовок авторизации для доступа к API
headers = {'Authorization': f'OAuth {api_token}'}
all_lines = []
offset = 1 # Начальное смещение для пагинации
first_page = True
# Цикл пагинации: выгружаем данные частями, если их больше лимита
while True:
params['offset'] = offset
# Механизм повторных попыток (retry logic) для устойчивости к сбоям сети/API
for attempt in range(1, MAX_RETRIES + 1):
try:
resp = requests.get(API_URL, params=params, headers=headers, timeout=30)
if resp.status_code == 200:
break # Успешный ответ, выходим из цикла попыток
print(f" ⚠️ Метрика [{target_date}] offset={offset}, попытка {attempt}: статус {resp.status_code}")
except Exception as e:
print(f" ⚠️ Метрика [{target_date}] ошибка: {e}")
# Экспоненциальная задержка перед следующей попыткой
if attempt < MAX_RETRIES:
time.sleep(2 ** (attempt - 1))
else:
raise Exception(f"Не удалось получить данные за {target_date} (offset={offset})")
text = resp.text
if not text.strip():
break
lines = text.splitlines()
if not lines:
break
# Добавляем строки: для первой страницы берём все (включая заголовок), для последующих — без заголовка
if first_page:
all_lines.extend(lines)
first_page = False
else:
all_lines.extend(lines[1:] if len(lines) > 1 else lines)
# Проверка условия окончания пагинации: если строк меньше лимита, значит данные кончились
if len(lines) <= 1 or (not first_page and len(lines) - 1 < LIMIT):
break
offset += LIMIT # Увеличиваем смещение для следующей страницы
# Если данных нет (только заголовок или пустота), возвращаем None
if len(all_lines) <= 1:
return None
return "\n".join(all_lines)
def upload_csv_to_disk(csv_content, token_disk, filename):
"""Загружает CSV-файл на Яндекс.Диск с защитой от HTML-ошибок."""
# Шаг 1: Получаем временную ссылку для загрузки (upload URL)
for attempt in range(1, MAX_RETRIES + 1):
try:
resp = requests.get(
DISK_UPLOAD_URL,
params={'path': f'app:/{filename}', 'overwrite': 'true'},
headers={'Authorization': f'OAuth {token_disk}'},
timeout=10
)
if resp.status_code == 200:
# 🔒 Защита от HTML-ответов (например, "service unavailable" вместо JSON)
content_type = resp.headers.get('content-type', '').lower()
if 'application/json' in content_type:
try:
data = resp.json()
upload_url = data.get('href')
if upload_url:
break # Ссылка получена успешно
except (ValueError, json.JSONDecodeError):
print(f" ⚠️ Диск [{filename}]: ответ не в формате JSON")
else:
print(f" ⚠️ Диск [{filename}]: получен не JSON (Content-Type: {content_type})")
snippet = resp.text[:300].replace('\n', ' ').strip()
print(f" Тело ответа: {snippet}...")
else:
print(f" ⚠️ Диск [{filename}]: статус {resp.status_code}")
except Exception as e:
print(f" ⚠️ Диск [{filename}]: ошибка получения URL: {e}")
if attempt < MAX_RETRIES:
time.sleep(2 ** (attempt - 1))
else:
raise Exception(f"Не удалось получить upload URL для {filename}")
# Шаг 2: Загружаем файл по полученной ссылке методом PUT
for attempt in range(1, MAX_RETRIES + 1):
try:
put_resp = requests.put(
upload_url,
data=csv_content.encode('utf-8'),
timeout=UPLOAD_TIMEOUT
)
if put_resp.status_code == 201:
return True # Файл успешно создан
except requests.exceptions.ReadTimeout:
print(f" ⚠️ Диск [{filename}]: таймаут загрузки (попытка {attempt})")
except Exception as e:
print(f" ⚠️ Диск [{filename}]: ошибка загрузки: {e}")
if attempt < MAX_RETRIES:
time.sleep(5)
else:
raise Exception(f"Не удалось загрузить файл {filename} после {MAX_RETRIES} попыток")
return False
# Точка входа: index.handler
def handler(event, context):
# Получаем секретные переменные окружения (токены и ID)
api_token = os.getenv('API_TOKEN')
counter_id = os.getenv('COUNTER_ID')
token_disk = os.getenv('TOKEN_DISK')
# Проверка наличия всех необходимых конфигураций
if not all([api_token, counter_id, token_disk]):
return {
'statusCode': 400,
'body': '❌ Отсутствуют переменные: API_TOKEN, COUNTER_ID, TOKEN_DISK'
}
# Определяем период: прошлая неделя (Пн–Вс)
today = datetime.utcnow().date()
monday_this_week = today - timedelta(days=today.weekday())
start_date = monday_this_week - timedelta(days=7) # Пн прошлой недели
end_date = start_date + timedelta(days=6) # Вс
print(f"📅 Обрабатываю период: {start_date} – {end_date}")
uploaded_files = []
failed_days = []
no_data_days = []
# Основной цикл обработки каждого дня в выбранном периоде
current = start_date
while current <= end_date:
date_str = current.strftime('%Y-%m-%d')
print(f"📥 Обрабатываю день: {date_str}")
try:
# Выгрузка данных из Метрики
csv_data = fetch_csv_for_date(api_token, counter_id, date_str)
# Проверка на отсутствие данных за день
if csv_data is None or len(csv_data.strip()) == 0:
print(f" ℹ️ Нет данных за {date_str}")
no_data_days.append(date_str)
current += timedelta(days=1)
continue
# Формируем время в UTC+3 (Москва) для именования файлов
moscow_time = datetime.now(timezone(timedelta(hours=3)))
current_ts = moscow_time.strftime("%Y-%m-%d_%H-%M-%S")
filename = f"metrika_referral_{date_str}_saved_{current_ts}.csv"
# Загрузка файла на Диск
if upload_csv_to_disk(csv_data, token_disk, filename):
uploaded_files.append(filename)
print(f" ✅ Успешно сохранён: {filename}")
else:
print(f" ❌ Не удалось сохранить: {filename}")
failed_days.append(date_str)
except Exception as e:
error_msg = str(e)
print(f" ❌ Ошибка при обработке {date_str}: {error_msg}")
failed_days.append(date_str)
current += timedelta(days=1)
# Формируем итоговый отчёт о выполнении функции
total_days = (end_date - start_date).days + 1
success_count = len(uploaded_files)
failed_count = len(failed_days)
no_data_count = len(no_data_days)
message_lines = [
f"✅ Обработано дней: {total_days}",
f"✅ Успешно сохранено: {success_count}",
f"⚠️ Ошибки: {failed_count}",
f"ℹ️ Без данных: {no_data_count}"
]
if uploaded_files:
message_lines.append(f"📁 Файлы: {', '.join(uploaded_files)}")
if failed_days:
message_lines.append(f"❌ Неудачные дни: {', '.join(failed_days)}")
full_message = "\n".join(message_lines)
print(full_message)
# Возвращаем ответ в формате API Gateway
return {
'statusCode': 200,
'body': full_message
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment