Created
February 16, 2026 16:50
-
-
Save MikyPo/4c9602e33e49b30bf83c35da7177b191 to your computer and use it in GitHub Desktop.
Ya-Cloud-Functions_API_YaMetrika_load
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Developed by MikyPo | |
| # More code for DA here: https://dzen.ru/mikypo | |
| # Импорт библиотек | |
| import os | |
| import requests | |
| from datetime import datetime, timedelta, timezone | |
| import time | |
| import json | |
| # URL API Яндекс.Метрики для выгрузки данных в формате CSV | |
| API_URL = 'https://api-metrika.yandex.net/stat/v1/data.csv' | |
| # URL API Яндекс.Диска для получения ссылки на загрузку файла | |
| DISK_UPLOAD_URL = 'https://cloud-api.yandex.net/v1/disk/resources/upload' | |
| LIMIT = 100000 # Количество строк в одном запросе (лимит API Метрики) | |
| MAX_RETRIES = 5 # Максимальное количество попыток повторного запроса при ошибке | |
| UPLOAD_TIMEOUT = 300 # 5 минут на загрузку одного файла (таймаут для больших объёмов) | |
| def fetch_csv_for_date(api_token, counter_id, target_date): | |
| """Получает CSV из Метрики за один день.""" | |
| params = { | |
| 'ids': counter_id, | |
| 'date1': target_date, | |
| 'date2': target_date, | |
| # Выбираемые метрики: визиты и достижения целей | |
| 'metrics': 'ym:s:visits,ym:s:goal_reaches1,ym:s:goal_reaches2,ym:s:goal_reaches3', | |
| # Измерения для детализации: устройство, источник, дата, UTM-метки и т.д. | |
| 'dimensions': 'ym:s:deviceCategory,ym:s:externalRefererDomain,ym:s:date,ym:s:referer,ym:s:UTMSource,ym:s:UTMMedium,ym:s:UTMCampaign,ym:s:UTMContent,ym:s:UTMTerm', | |
| 'limit': LIMIT, | |
| 'accuracy': 'full', # Полная точность данных | |
| 'attribution': 'lastsign', # Модель атрибуции | |
| 'include_undefined': True, | |
| # Фильтр: оставляем только трафик из referrals (переходы с сайтов) | |
| 'filters': "ym:s:lastsignTrafficSource=='referral'" | |
| } | |
| # Заголовок авторизации для доступа к API | |
| headers = {'Authorization': f'OAuth {api_token}'} | |
| all_lines = [] | |
| offset = 1 # Начальное смещение для пагинации | |
| first_page = True | |
| # Цикл пагинации: выгружаем данные частями, если их больше лимита | |
| while True: | |
| params['offset'] = offset | |
| # Механизм повторных попыток (retry logic) для устойчивости к сбоям сети/API | |
| for attempt in range(1, MAX_RETRIES + 1): | |
| try: | |
| resp = requests.get(API_URL, params=params, headers=headers, timeout=30) | |
| if resp.status_code == 200: | |
| break # Успешный ответ, выходим из цикла попыток | |
| print(f" ⚠️ Метрика [{target_date}] offset={offset}, попытка {attempt}: статус {resp.status_code}") | |
| except Exception as e: | |
| print(f" ⚠️ Метрика [{target_date}] ошибка: {e}") | |
| # Экспоненциальная задержка перед следующей попыткой | |
| if attempt < MAX_RETRIES: | |
| time.sleep(2 ** (attempt - 1)) | |
| else: | |
| raise Exception(f"Не удалось получить данные за {target_date} (offset={offset})") | |
| text = resp.text | |
| if not text.strip(): | |
| break | |
| lines = text.splitlines() | |
| if not lines: | |
| break | |
| # Добавляем строки: для первой страницы берём все (включая заголовок), для последующих — без заголовка | |
| if first_page: | |
| all_lines.extend(lines) | |
| first_page = False | |
| else: | |
| all_lines.extend(lines[1:] if len(lines) > 1 else lines) | |
| # Проверка условия окончания пагинации: если строк меньше лимита, значит данные кончились | |
| if len(lines) <= 1 or (not first_page and len(lines) - 1 < LIMIT): | |
| break | |
| offset += LIMIT # Увеличиваем смещение для следующей страницы | |
| # Если данных нет (только заголовок или пустота), возвращаем None | |
| if len(all_lines) <= 1: | |
| return None | |
| return "\n".join(all_lines) | |
| def upload_csv_to_disk(csv_content, token_disk, filename): | |
| """Загружает CSV-файл на Яндекс.Диск с защитой от HTML-ошибок.""" | |
| # Шаг 1: Получаем временную ссылку для загрузки (upload URL) | |
| for attempt in range(1, MAX_RETRIES + 1): | |
| try: | |
| resp = requests.get( | |
| DISK_UPLOAD_URL, | |
| params={'path': f'app:/{filename}', 'overwrite': 'true'}, | |
| headers={'Authorization': f'OAuth {token_disk}'}, | |
| timeout=10 | |
| ) | |
| if resp.status_code == 200: | |
| # 🔒 Защита от HTML-ответов (например, "service unavailable" вместо JSON) | |
| content_type = resp.headers.get('content-type', '').lower() | |
| if 'application/json' in content_type: | |
| try: | |
| data = resp.json() | |
| upload_url = data.get('href') | |
| if upload_url: | |
| break # Ссылка получена успешно | |
| except (ValueError, json.JSONDecodeError): | |
| print(f" ⚠️ Диск [{filename}]: ответ не в формате JSON") | |
| else: | |
| print(f" ⚠️ Диск [{filename}]: получен не JSON (Content-Type: {content_type})") | |
| snippet = resp.text[:300].replace('\n', ' ').strip() | |
| print(f" Тело ответа: {snippet}...") | |
| else: | |
| print(f" ⚠️ Диск [{filename}]: статус {resp.status_code}") | |
| except Exception as e: | |
| print(f" ⚠️ Диск [{filename}]: ошибка получения URL: {e}") | |
| if attempt < MAX_RETRIES: | |
| time.sleep(2 ** (attempt - 1)) | |
| else: | |
| raise Exception(f"Не удалось получить upload URL для {filename}") | |
| # Шаг 2: Загружаем файл по полученной ссылке методом PUT | |
| for attempt in range(1, MAX_RETRIES + 1): | |
| try: | |
| put_resp = requests.put( | |
| upload_url, | |
| data=csv_content.encode('utf-8'), | |
| timeout=UPLOAD_TIMEOUT | |
| ) | |
| if put_resp.status_code == 201: | |
| return True # Файл успешно создан | |
| except requests.exceptions.ReadTimeout: | |
| print(f" ⚠️ Диск [{filename}]: таймаут загрузки (попытка {attempt})") | |
| except Exception as e: | |
| print(f" ⚠️ Диск [{filename}]: ошибка загрузки: {e}") | |
| if attempt < MAX_RETRIES: | |
| time.sleep(5) | |
| else: | |
| raise Exception(f"Не удалось загрузить файл {filename} после {MAX_RETRIES} попыток") | |
| return False | |
| # Точка входа: index.handler | |
| def handler(event, context): | |
| # Получаем секретные переменные окружения (токены и ID) | |
| api_token = os.getenv('API_TOKEN') | |
| counter_id = os.getenv('COUNTER_ID') | |
| token_disk = os.getenv('TOKEN_DISK') | |
| # Проверка наличия всех необходимых конфигураций | |
| if not all([api_token, counter_id, token_disk]): | |
| return { | |
| 'statusCode': 400, | |
| 'body': '❌ Отсутствуют переменные: API_TOKEN, COUNTER_ID, TOKEN_DISK' | |
| } | |
| # Определяем период: прошлая неделя (Пн–Вс) | |
| today = datetime.utcnow().date() | |
| monday_this_week = today - timedelta(days=today.weekday()) | |
| start_date = monday_this_week - timedelta(days=7) # Пн прошлой недели | |
| end_date = start_date + timedelta(days=6) # Вс | |
| print(f"📅 Обрабатываю период: {start_date} – {end_date}") | |
| uploaded_files = [] | |
| failed_days = [] | |
| no_data_days = [] | |
| # Основной цикл обработки каждого дня в выбранном периоде | |
| current = start_date | |
| while current <= end_date: | |
| date_str = current.strftime('%Y-%m-%d') | |
| print(f"📥 Обрабатываю день: {date_str}") | |
| try: | |
| # Выгрузка данных из Метрики | |
| csv_data = fetch_csv_for_date(api_token, counter_id, date_str) | |
| # Проверка на отсутствие данных за день | |
| if csv_data is None or len(csv_data.strip()) == 0: | |
| print(f" ℹ️ Нет данных за {date_str}") | |
| no_data_days.append(date_str) | |
| current += timedelta(days=1) | |
| continue | |
| # Формируем время в UTC+3 (Москва) для именования файлов | |
| moscow_time = datetime.now(timezone(timedelta(hours=3))) | |
| current_ts = moscow_time.strftime("%Y-%m-%d_%H-%M-%S") | |
| filename = f"metrika_referral_{date_str}_saved_{current_ts}.csv" | |
| # Загрузка файла на Диск | |
| if upload_csv_to_disk(csv_data, token_disk, filename): | |
| uploaded_files.append(filename) | |
| print(f" ✅ Успешно сохранён: {filename}") | |
| else: | |
| print(f" ❌ Не удалось сохранить: {filename}") | |
| failed_days.append(date_str) | |
| except Exception as e: | |
| error_msg = str(e) | |
| print(f" ❌ Ошибка при обработке {date_str}: {error_msg}") | |
| failed_days.append(date_str) | |
| current += timedelta(days=1) | |
| # Формируем итоговый отчёт о выполнении функции | |
| total_days = (end_date - start_date).days + 1 | |
| success_count = len(uploaded_files) | |
| failed_count = len(failed_days) | |
| no_data_count = len(no_data_days) | |
| message_lines = [ | |
| f"✅ Обработано дней: {total_days}", | |
| f"✅ Успешно сохранено: {success_count}", | |
| f"⚠️ Ошибки: {failed_count}", | |
| f"ℹ️ Без данных: {no_data_count}" | |
| ] | |
| if uploaded_files: | |
| message_lines.append(f"📁 Файлы: {', '.join(uploaded_files)}") | |
| if failed_days: | |
| message_lines.append(f"❌ Неудачные дни: {', '.join(failed_days)}") | |
| full_message = "\n".join(message_lines) | |
| print(full_message) | |
| # Возвращаем ответ в формате API Gateway | |
| return { | |
| 'statusCode': 200, | |
| 'body': full_message | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment