Last active
September 15, 2025 22:24
-
-
Save hn4002/3c83f19707cf95e4c413762dfc908772 to your computer and use it in GitHub Desktop.
A script to download daily data from polygon.io at the end of the day
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # This script downloads daily data at EOD. It should be run 30 mins after the market close. | |
| # You also need a paid subscription of polygon.io for the script to work. It takes about 75 secs | |
| # to get data for 10k+ symbols (stocks + ETFs). | |
| # Once the daily data is downloaded, it generates weekly and monthly data automatically. This data | |
| # should match with the weekly and monthly data of Polygon. | |
| import datetime | |
| import json | |
| import os | |
| import pytz | |
| import requests | |
| import sys | |
| from dataclasses import dataclass, asdict | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import pandas as pd | |
| import polygon | |
| from polygon.rest import models | |
| HOME_DIR = os.path.expanduser('~') | |
| PYTHON_BIN_PATH = os.path.join(HOME_DIR, "Trading", "python-venv", "bin", "python") | |
| polygon_client = polygon.RESTClient("XXXX") # Polygon API key | |
| symbol_stats = {} | |
| out_data_base_dir = os.path.join(HOME_DIR, "Trading/Trading-WorkingData/PolygonData/Latest") | |
| #======================================================================================================================= | |
| def get_price_data(ticker, timeframe, out_data_dir, from_date=None, to_date=None): | |
| """ | |
| Get minute aggregates for a given ticker between from_date and to_date. | |
| :param ticker: | |
| :param from_date: YYYY-MM-DD | |
| :param to_date: YYYY-MM-DD | |
| :return: | |
| """ | |
| tz_market = pytz.timezone('US/Eastern') | |
| tz_local = pytz.timezone('US/Pacific') | |
| start_time = datetime.datetime.now() | |
| # Determine multiplier and timespan based on timeframe | |
| dt_len = 19 | |
| if timeframe.endswith("m"): | |
| multiplier = int(timeframe[:-1]) | |
| timespan = "minute" | |
| if multiplier == 60: | |
| multiplier = 1 | |
| timespan = "hour" | |
| num_days = 30 | |
| elif timeframe == "daily": | |
| multiplier = 1 | |
| timespan = "day" | |
| dt_len = 10 | |
| num_days = 365*5 | |
| elif timeframe == "weekly": | |
| multiplier = 1 | |
| timespan = "week" | |
| dt_len = 10 | |
| num_days = 365*5 | |
| elif timeframe == "monthly": | |
| multiplier = 1 | |
| timespan = "month" | |
| dt_len = 10 | |
| num_days = 365*5 | |
| else: | |
| raise ValueError(f"Unsupported timeframe: {timeframe}") | |
| now = datetime.datetime.now(datetime.UTC) | |
| if from_date is None: | |
| from_date = (now - datetime.timedelta(days=num_days)).strftime("%Y-%m-%d") | |
| if to_date is None: | |
| to_date = now.strftime("%Y-%m-%d") | |
| # Call the polygon API | |
| print(f"Getting {timeframe} data for {ticker} from {from_date} to {to_date}. Multiplier={multiplier}, Timespan={timespan}") | |
| aggs = polygon_client.get_aggs(ticker, multiplier, timespan, from_date, to_date,) | |
| # Save the results | |
| tz_result = tz_local # For intradday data, we want timestamp in the local timezone | |
| if timeframe in ["daily", "weekly", "monthly"]: | |
| tz_result = tz_market # For daily or longer, we want timezone in the market timezone (polygon uses market timezone to decide the date) | |
| out_filepath = os.path.join(out_data_dir, f"{ticker}.csv") | |
| with open(out_filepath, "w") as f: | |
| if timeframe in ["daily", "weekly", "monthly"]: | |
| f.write("date,open,high,low,close,volume\n") | |
| else: | |
| f.write("datetime,open,high,low,close,volume\n") | |
| dates = [] | |
| for agg in aggs: | |
| # Agg(open=220.82, high=221.27, low=216.71, close=220.91, volume=67179965.0, vwap=219.3818, timestamp=1725854400000, transactions=945464, otc=None) | |
| dt_str = datetime.datetime.fromtimestamp(agg.timestamp / 1000, tz=tz_result).strftime("%Y-%m-%dT%H:%M:%S") | |
| dt_str = dt_str[:dt_len] | |
| dates.append(dt_str) | |
| f.write(f"{dt_str},{agg.open},{agg.high},{agg.low},{agg.close},{agg.volume}\n") | |
| # Stats | |
| end_time = datetime.datetime.now() | |
| time_taken = end_time - start_time | |
| num_bars = len(aggs) | |
| first_date = dates[0] | |
| last_date = dates[-1] | |
| symbol_stats[ticker]["time_taken"] = str(time_taken.total_seconds()) | |
| symbol_stats[ticker]["num_bars"] = num_bars | |
| symbol_stats[ticker]["first_date"] = first_date | |
| symbol_stats[ticker]["last_date"] = last_date | |
| return aggs | |
| #======================================================================================================================= | |
| def get_price_data_parallel(symbols, timeframe, out_data_dir): | |
| # Initialize global stats | |
| for symbol in symbols: | |
| symbol_stats[symbol] = { | |
| "symbol": symbol, | |
| "time_taken_ms": 0, | |
| "num_bars": 0, | |
| "first_date": None, | |
| "last_date": None, | |
| } | |
| max_workers = 200 # Adjust based on your system and API rate limits | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| future_to_symbol = {executor.submit(get_price_data, symbol, timeframe, out_data_dir): symbol for symbol in symbols} | |
| for future in as_completed(future_to_symbol): | |
| symbol = future_to_symbol[future] | |
| try: | |
| data = future.result() | |
| print(f"Completed data retrieval for {symbol}, {len(data)} records.") | |
| except Exception as exc: | |
| print(f"{symbol} generated an exception: {exc}") | |
| # Print summary stats | |
| print("\nSummary Stats:") | |
| print(f"{'#':>5} {'Symbol':<10} {'Time Taken':>15} {'Num Bars':>10} {'First Date':>15} {'Last Date':>15}") | |
| i = 0 | |
| for symbol, stats in symbol_stats.items(): | |
| print(f"{i:>5} {stats['symbol']:<10} {stats['time_taken']:>15} {stats['num_bars']:>10} {stats['first_date']:>15} {stats['last_date']:>15}") | |
| i += 1 | |
| #======================================================================================================================= | |
| def get_symbol_list(today_str): | |
| # Get grouped daily aggs for a specific date | |
| grouped = polygon_client.get_grouped_daily_aggs(today_str) | |
| # print(grouped) | |
| # Print as indented JSON for better readability | |
| # for res in grouped: | |
| # print(json.dumps(asdict(res), indent=4)) | |
| # i = 0 | |
| # for t in grouped: | |
| # dt = datetime.datetime.fromtimestamp(t.timestamp / 1000, tz=pytz.timezone('US/Eastern')).strftime("%Y-%m-%d") | |
| # print(f"{i:5} {dt} {t.ticker:6} {t.open:10} {t.high:10} {t.low:10} {t.close:10} {t.volume:10} {t.vwap:10} {t.transactions:10} {t.otc}") | |
| # i += 1 | |
| symbols = [] | |
| for t in grouped: | |
| symbols.append(t.ticker) | |
| return symbols | |
| #======================================================================================================================= | |
| def download_daily_data_from_polygon(): | |
| # Today's date in YYYY-MM-DD format | |
| today_str = datetime.datetime.now(pytz.timezone('US/Eastern')).strftime("%Y-%m-%d") | |
| print(f"Downloading daily data for {today_str}") | |
| start_time = datetime.datetime.now() | |
| # Get the list of symbols to process | |
| symbols = get_symbol_list(today_str) | |
| # Sort | |
| symbols.sort() | |
| #symbols = symbols[:10] # For testing, limit to first 10 symbols | |
| print(f"Number of symbols to process: {len(symbols)}") | |
| #print(symbols) | |
| # Create the dirs | |
| timeframe = "daily" | |
| os.makedirs(out_data_base_dir, exist_ok=True) | |
| out_data_dir = os.path.join(out_data_base_dir, timeframe) | |
| os.makedirs(out_data_dir, exist_ok=True) | |
| # Download daily data for all symbols in parallel | |
| get_price_data_parallel(symbols, timeframe, out_data_dir) | |
| # Done | |
| print("\n✅ Finished downloading daily data.") | |
| # Total time taken | |
| end_time = datetime.datetime.now() | |
| time_taken = end_time - start_time | |
| print(f"Total time taken: {time_taken}") | |
| #======================================================================================================================= | |
| def generate_resampled_data_from_daily(timeframe, df_resmaple_rule): | |
| """ | |
| Generate resampled data (weekly, monthly, quarterly, yearly) from daily data | |
| :param timeframe: "weekly", "monthly", "quarterly", "yearly" | |
| :param df_resmaple_rule: DataFrame Resample Rule. "W-SUN", "MS", "QS", "YS" | |
| :return: | |
| """ | |
| start_time = datetime.datetime.now() | |
| # Input directory with daily stock files | |
| input_dir = os.path.join(out_data_base_dir, "daily") | |
| # Output directory for weekly stock files | |
| output_dir = os.path.join(out_data_base_dir, timeframe) | |
| # Create output dir if it doesn't exist | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Clear existing files in output_dir | |
| # for f in os.listdir(output_dir): | |
| # os.remove(os.path.join(output_dir, f)) | |
| # Get list of CSV files and sort alphabetically | |
| files = sorted([f for f in os.listdir(input_dir) if f.endswith(".csv")]) | |
| # Loop over each CSV file in the daily directory | |
| print(f"Generating {timeframe} data from daily data for {len(files)} symbols:") | |
| last_ticker_letter = "" | |
| for index, filename in enumerate(files): | |
| ticker = filename.replace(".csv", "") | |
| input_path = os.path.join(input_dir, filename) | |
| output_path = os.path.join(output_dir, filename) | |
| # Read daily data | |
| df = pd.read_csv(input_path) | |
| # Ensure datetime index | |
| df["date"] = pd.to_datetime(df["date"], format="%Y-%m-%d") | |
| df.set_index("date", inplace=True) | |
| # Resample to weekly frequency | |
| resampled = pd.DataFrame() | |
| resampled["open"] = df["open"].resample(df_resmaple_rule, label="left", closed="left").first() | |
| resampled["high"] = df["high"].resample(df_resmaple_rule, label="left", closed="left").max() | |
| resampled["low"] = df["low"].resample(df_resmaple_rule, label="left", closed="left").min() | |
| resampled["close"] = df["close"].resample(df_resmaple_rule, label="left", closed="left").last() | |
| resampled["volume"] = df["volume"].resample(df_resmaple_rule, label="left", closed="left").sum() | |
| # Drop weeks where there was no trading data | |
| resampled.dropna(inplace=True) | |
| # Reset index so "date" is a column again | |
| resampled.reset_index(inplace=True) | |
| # Format date as YYYY-MM-DD (Polygon style) | |
| resampled["date"] = resampled["date"].dt.strftime("%Y-%m-%d") | |
| # Save weekly data | |
| resampled.to_csv(output_path, index=False) | |
| #print(f"Converted {ticker} → weekly data saved at {output_path}") | |
| if (index + 1) % 100 == 0: | |
| print(".", end="", flush=True) | |
| if last_ticker_letter != ticker[0]: | |
| last_ticker_letter = ticker[0] | |
| print(f"{last_ticker_letter}", end="", flush=True) | |
| # Done | |
| print(f"\n✅ Finished generating {timeframe} data for {len(files)} symbols.") | |
| # Total time taken | |
| end_time = datetime.datetime.now() | |
| time_taken = end_time - start_time | |
| print(f"Time taken: {time_taken}\n") | |
| #======================================================================================================================= | |
| def generate_weekly_data_from_daily(): | |
| generate_resampled_data_from_daily("weekly" , "W-SUN") | |
| #======================================================================================================================= | |
| def generate_monthly_data_from_daily(): | |
| generate_resampled_data_from_daily("monthly" , "MS") | |
| #======================================================================================================================= | |
| def generate_quarterly_data_from_daily(): | |
| generate_resampled_data_from_daily("quarterly" , "QS") | |
| #======================================================================================================================= | |
| def generate_yearly_data_from_daily(): | |
| generate_resampled_data_from_daily("yearly" , "YS") | |
| #======================================================================================================================= | |
| def main(): | |
| start_time = datetime.datetime.now() | |
| print(f"# Started the SCRIPT at {start_time}\n") | |
| # Steps | |
| download_daily_data_from_polygon() | |
| generate_weekly_data_from_daily() | |
| generate_monthly_data_from_daily() | |
| # generate_quarterly_data_from_daily() | |
| # generate_yearly_data_from_daily() | |
| # Done | |
| end_time = datetime.datetime.now() | |
| print(f"\n# SCRIPT: Started: {start_time}") | |
| print( f"# SCRIPT: Finished: {end_time}") | |
| print( f"# SCRIPT: Time Taken: {end_time - start_time}") | |
| #======================================================================================================================= | |
| if __name__ == "__main__": | |
| main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment