Created
August 7, 2024 07:00
-
-
Save ashavijit/f0c20ce31d78d4d83f0f9d63acb6ce87 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import re | |
| import pandas as pd | |
| from pymongo import MongoClient, errors | |
| from datetime import datetime, timedelta | |
| import threading | |
| import logging | |
| import os | |
| import pymongo | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| client = MongoClient("mongodb://localhost:27017/") | |
| base_folder = "C:/Users/aviji/Downloads/data/2023" | |
| def round_to_nearest50(x): | |
| """Rounds a number to the nearest 50.""" | |
| remainder = x % 50 | |
| lower_bound = x - remainder | |
| upper_bound = lower_bound + 50 | |
| if remainder < 25: | |
| return round(lower_bound) | |
| else: | |
| return round(upper_bound) | |
| def next_thursday(date_str): | |
| """Calculates the next Thursday date for a given date string.""" | |
| date = datetime.strptime(date_str, "%d/%m/%Y") | |
| if date.weekday() == 3: | |
| nearest_thursday = date | |
| else: | |
| days_until_thursday = (3 - date.weekday() + 7) % 7 | |
| if days_until_thursday == 0: | |
| days_until_thursday = 7 | |
| nearest_thursday = date + timedelta(days_until_thursday) | |
| return nearest_thursday.strftime("%d%b").upper() | |
| def extract_year(date_str): | |
| """Extracts the last two digits of the year from a date string.""" | |
| date = datetime.strptime(date_str, "%d/%m/%Y") | |
| return str(date.year)[2:] | |
| def process_and_insert_data(csv_path, db_name, collection_name): | |
| """Processes CSV data and inserts records into MongoDB.""" | |
| db = client[db_name] | |
| collection = db[collection_name] | |
| try: | |
| for chunk in pd.read_csv(csv_path, chunksize=1000): | |
| for _, row in chunk.iterrows(): | |
| if row["Ticker"] == "NIFTY-I.NFO": | |
| ticker_base = row["Ticker"].split("-")[0] | |
| future_atm = round_to_nearest50(row["Open"]) | |
| next_thurs = next_thursday(row["Date"]) | |
| year = extract_year(row["Date"]) | |
| ce_string = f"{ticker_base}{next_thurs}{year}{int(future_atm)}CE" | |
| pe_string = f"{ticker_base}{next_thurs}{year}{int(future_atm)}PE" | |
| time = row["Time"].strip() | |
| existing_document = collection.find_one({"FutureATM": future_atm, "CEstring": ce_string}) | |
| if existing_document: | |
| if existing_document["PEstring"] != pe_string: | |
| logging.warning(f"Inconsistent PEstring for CEstring: {ce_string}") | |
| continue | |
| else: | |
| obj_for_mongo = { | |
| "Index": ticker_base, | |
| "FutureATM": future_atm, | |
| "NextThursday": next_thurs, | |
| "Year": year, | |
| "CEstring": ce_string, | |
| "PEstring": pe_string, | |
| "SpotATMData": [], | |
| } | |
| collection.insert_one(obj_for_mongo) | |
| logging.info(f"Inserted new document for CEstring: {ce_string}") | |
| logging.info(f"Processed and inserted data from {csv_path}") | |
| except errors.PyMongoError as e: | |
| logging.error(f"Error inserting data into MongoDB: {e}") | |
| except Exception as e: | |
| logging.error(f"Error processing data: {e}") | |
| def calculate_and_store_spot_atm(ceString, peString, csv_path, db_name, collection_name): | |
| """Calculates the ATM spot price and stores it in a single MongoDB document with a nested SpotATMData array.""" | |
| db = client[db_name] | |
| collection = db[collection_name] | |
| ce_data = {} | |
| pe_data = {} | |
| ce_low = {} | |
| ce_high = {} | |
| pe_low = {} | |
| pe_high = {} | |
| try: | |
| for chunk in pd.read_csv(csv_path, chunksize=1000): | |
| for _, row in chunk.iterrows(): | |
| ticker = row["Ticker"].strip() | |
| row_date = row["Date"].strip() | |
| row_time = row["Time"].strip() | |
| timestamp = f"{row_date} {row_time}" | |
| if ticker == f"{ceString}.NFO": | |
| ce_data[timestamp] = row["Open"] | |
| ce_low[timestamp] = row["Low"] | |
| ce_high[timestamp] = row["High"] | |
| if ticker == f"{peString}.NFO": | |
| pe_data[timestamp] = row["Open"] | |
| pe_low[timestamp] = row["Low"] | |
| pe_high[timestamp] = row["High"] | |
| common_timestamps = set(ce_data.keys()) & set(pe_data.keys()) | |
| spot_atm_data = [] | |
| ce_document = collection.find_one({"CEstring": ceString, "PEstring": peString}) | |
| if ce_document: | |
| future_atm = ce_document.get("FutureATM", 0) | |
| else: | |
| logging.warning(f"No document found for CEstring: {ceString} and PEstring: {peString}") | |
| return | |
| for timestamp in common_timestamps: | |
| ce_open = ce_data[timestamp] | |
| pe_open = pe_data[timestamp] | |
| ce_low_value = ce_low[timestamp] | |
| ce_high_value = ce_high[timestamp] | |
| pe_low_value = pe_low[timestamp] | |
| pe_high_value = pe_high[timestamp] | |
| spot_atm = (ce_open - pe_open) + (future_atm if future_atm else 0) | |
| rounded_spot_atm = round_to_nearest50(spot_atm) | |
| date, time = timestamp.split() | |
| spot_atm_data.append( | |
| {"Date": date, "Time": time, "spotAtm": rounded_spot_atm, "spot": spot_atm, | |
| "CE_Low": ce_low_value, "CE_High": ce_high_value, "PE_Low": pe_low_value, "PE_High": pe_high_value, "_id": timestamp} | |
| ) | |
| spot_atm_data.sort(key=lambda x: datetime.strptime(x["Time"], "%H:%M:%S")) | |
| collection.update_one( | |
| {"CEstring": ceString, "PEstring": peString}, | |
| {"$set": {"SpotATMData": spot_atm_data}}, | |
| upsert=True | |
| ) | |
| logging.info(f"Stored ATM data for CEstring: {ceString} and PEstring: {peString} in MongoDB") | |
| except Exception as e: | |
| logging.error(f"Error calculating and storing spot ATM: {e}") | |
| def process_all_data(csv_path, db_name, collection_name): | |
| """Processes all data and inserts records into MongoDB.""" | |
| db = client[db_name] | |
| collection = db[collection_name] | |
| try: | |
| ce_pe_pairs = {} | |
| for document in collection.find(): | |
| ceString = document.get("CEstring") | |
| peString = document.get("PEstring") | |
| if ceString and peString: | |
| ce_pe_pairs[(ceString, peString)] = True | |
| for ceString, peString in ce_pe_pairs.keys(): | |
| logging.info(f"Processing CE: {ceString} and PE: {peString}") | |
| calculate_and_store_spot_atm(ceString, peString, csv_path, db_name, collection_name) | |
| updated_document = collection.find_one({"CEstring": ceString}) | |
| if updated_document: | |
| logging.info(f"Updated document for CE: {ceString}") | |
| else: | |
| logging.warning(f"No document found for CE: {ceString} after update") | |
| except Exception as e: | |
| logging.error(f"Error processing all data: {e}") | |
| def atm_selection(ceString, peString, db_name, collection_name): | |
| """ Selects the ATM strike price for the given CE and PE strings 10 Strike Prices away from the spot price. Botth upside and downside""" | |
| db = client[db_name] | |
| collection = db[collection_name] | |
| try: | |
| document = collection.find_one({"CEstring": ceString, "PEstring": peString}) | |
| if document: | |
| spot_atm_data = document.get("SpotATMData", []) | |
| if spot_atm_data: | |
| spot_atm_values = [data.get("spotAtm") for data in spot_atm_data] | |
| spot_atm_values.sort() | |
| spot_atm = spot_atm_values[-1] | |
| ce_strike = spot_atm + 500 | |
| pe_strike = spot_atm - 500 | |
| return ce_strike, pe_strike | |
| else: | |
| logging.warning(f"No spot ATM data found for CE: {ceString} and PE: {peString}") | |
| else: | |
| logging.warning(f"No document found for CE: {ceString} and PE: {peString}") | |
| except Exception as e: | |
| logging.error(f"Error selecting ATM strike prices: {e}") | |
| def main(): | |
| months = sorted(os.listdir(base_folder)) | |
| months = [month for month in months if os.path.isdir(os.path.join(base_folder, month))] | |
| for month in months: | |
| folder_path = os.path.join(base_folder, month) | |
| db_name = month.lower() | |
| csv_files = sorted([file for file in os.listdir(folder_path) if file.endswith(".csv")]) | |
| for file in csv_files: | |
| csv_path = os.path.join(folder_path, file) | |
| collection_name = file.split("_")[-1].split(".")[0] | |
| logging.info(f"Processing file: {csv_path} for month: {month}") | |
| process_insert_thread = threading.Thread( | |
| target=process_and_insert_data, args=(csv_path, db_name, collection_name)) | |
| process_insert_thread.start() | |
| process_insert_thread.join() | |
| process_all_data_thread = threading.Thread( | |
| target=process_all_data, args=(csv_path, db_name, collection_name)) | |
| process_all_data_thread.start() | |
| process_all_data_thread.join() | |
| # def find_premium_closeto5(db_name, collection_name, db_name2, collection_name2, ceString, peString): | |
| # """Find the closest premium to 5 for each CE and PE string and update the document in MongoDB.""" | |
| # db = client[db_name] | |
| # collection = db[collection_name] | |
| # db2 = client[db_name2] | |
| # collection2 = db2[collection_name2] | |
| # try: | |
| # # Find the document with the specified CEstring and PEstring | |
| # document = collection.find_one({"CEstring": ceString, "PEstring": peString}) | |
| # if not document: | |
| # return None | |
| # # Search strings for CE and PE in collection2 | |
| # search_string_list = [f"{ceString}.NFO", f"{peString}.NFO"] | |
| # closest_ids = {"CE": None, "PE": None} | |
| # for string in search_string_list: | |
| # data = list(collection2.find({"strike": string})) | |
| # if not data: | |
| # continue | |
| # close_prices = [(item["Close"], item["Time"]) for item in data[0]["data"]] | |
| # print(close_prices,"close_prices") | |
| # close_prices.sort() # | |
| # # Perform binary search to find the closest value to 5 | |
| # prices = [item[0] for item in close_prices] | |
| # idx = binary_search_closest(prices, 5) | |
| # closest_id = close_prices[idx][1] | |
| # # Update the corresponding key in closest_ids | |
| # if string == search_string_list[0]: # CE string | |
| # closest_ids["CE"] = closest_id | |
| # else: # PE string | |
| # closest_ids["PE"] = closest_id | |
| # # Update the document with the closest premium _id references | |
| # if closest_ids["CE"] or closest_ids["PE"]: | |
| # update = {"$set": {"closestPremium5": closest_ids}} | |
| # collection.update_one({"_id": document["_id"]}, update) | |
| # except Exception as e: | |
| # print(f"An error occurred: {e}") | |
| """ | |
| 1. FIND the CE and PE strings in the collection2 as String format and store in search_string_list | |
| 2. For each string in search_string_list, for each timestamp in the spotATMData, find the closest premium to 5 for both CE and PE strings. | |
| 3. Update the document in the collection with the closest premium "timestamp" as "_refId" for both CE and PE strings. | |
| 4. update all the elements in the collection SpotATMData with the closest premium to 5 for both CE and PE strings. | |
| """ | |
| def find_premium_close_to_5(db_name, collection_name, db_name2, collection_name2, ceString, peString): | |
| """Find the closest difference to 5 for each CE and PE string and update the document in MongoDB.""" | |
| db = client[db_name] | |
| collection = db[collection_name] | |
| db2 = client[db_name2] | |
| collection2 = db2[collection_name2] | |
| try: | |
| document = collection.find_one({"CEstring": ceString, "PEstring": peString}) | |
| if not document: | |
| return None | |
| ce_strike = f"{ceString}.NFO" | |
| pe_strike = f"{peString}.NFO" | |
| data_ce = collection2.find_one({"strike": ce_strike}) | |
| data_pe = collection2.find_one({"strike": pe_strike}) | |
| if not data_ce or not data_pe or "data" not in data_ce or "data" not in data_pe: | |
| return None | |
| spot_atm_data = document.get("SpotATMData", []) | |
| updated_spot_atm_data = [] | |
| for entry in spot_atm_data: | |
| closest_ce_diff = float('inf') | |
| closest_ce_time = None | |
| closest_pe_diff = float('inf') | |
| closest_pe_time = None | |
| for ce_data in data_ce["data"]: | |
| ce_diff = abs(ce_data["Close"] - 5) | |
| if ce_diff < closest_ce_diff: | |
| closest_ce_diff = ce_diff | |
| closest_ce_time = ce_data["Time"] | |
| for pe_data in data_pe["data"]: | |
| pe_diff = abs(pe_data["Close"] - 5) | |
| if pe_diff < closest_pe_diff: | |
| closest_pe_diff = pe_diff | |
| closest_pe_time = pe_data["Time"] | |
| entry["premium5"] = [{"CE_refId": closest_ce_time, "PE_refId": closest_pe_time}] | |
| updated_spot_atm_data.append(entry) | |
| collection.update_one({"_id": document["_id"]}, {"$set": {"SpotATMData": updated_spot_atm_data}}) | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| def binary_search_closest(arr, target) -> int: | |
| """Performs a binary search to find the closest value to the target in a sorted array.""" | |
| low = 0 | |
| high = len(arr) - 1 | |
| while low <= high: | |
| mid = (low + high) // 2 | |
| mid_val = arr[mid] | |
| if mid_val == target: | |
| return mid | |
| elif mid_val < target: | |
| low = mid + 1 | |
| else: | |
| high = mid - 1 | |
| if high < 0: | |
| return low | |
| elif low >= len(arr): | |
| return high | |
| else: | |
| low_diff = abs(arr[low] - target) | |
| high_diff = abs(arr[high] - target) | |
| return low if low_diff < high_diff else high | |
| def nearest_strike(x, strike_diff, range_) -> list: | |
| """Calculates the nearest strike prices based on the given parameters.""" | |
| lower_strike = x - (x % strike_diff) | |
| upper_strike = lower_strike + strike_diff | |
| strike = [lower_strike] | |
| calculate_strike = lambda base, i, diff: base + (i * diff) | |
| for i in range(1, range_): | |
| lower_strike = calculate_strike(lower_strike, i, strike_diff) | |
| upper_strike = calculate_strike(upper_strike, i, strike_diff) | |
| strike.append(lower_strike) | |
| strike.append(upper_strike) | |
| return strike | |
| def find_nearest_elements(db_name, collection_name, db_name2, collection_name2, ceString, peString): | |
| """Find the nearest 5 elements to CE_Low, CE_High, PE_Low, and PE_High for the same strike.""" | |
| db = client[db_name] | |
| collection = db[collection_name] | |
| db2 = client[db_name2] | |
| collection2 = db2[collection_name2] | |
| try: | |
| document = collection.find_one({"CEstring": ceString, "PEstring": peString}) | |
| if not document: | |
| return None | |
| ce_strike = f"{ceString}.NFO" | |
| pe_strike = f"{peString}.NFO" | |
| data_ce = collection2.find_one({"strike": ce_strike}) | |
| data_pe = collection2.find_one({"strike": pe_strike}) | |
| if not data_ce or not data_pe or "data" not in data_ce or "data" not in data_pe: | |
| return None | |
| spot_atm_data = document.get("SpotATMData", []) | |
| updated_spot_atm_data = [] | |
| for entry in spot_atm_data: | |
| ce_low = entry.get("CE_Low", None) | |
| ce_high = entry.get("CE_High", None) | |
| pe_low = entry.get("PE_Low", None) | |
| pe_high = entry.get("PE_High", None) | |
| ce_nearest_low = find_nearest(data_ce["data"], ce_low) | |
| ce_nearest_high = find_nearest(data_ce["data"], ce_high) | |
| pe_nearest_low = find_nearest(data_pe["data"], pe_low) | |
| pe_nearest_high = find_nearest(data_pe["data"], pe_high) | |
| entry["CE_Nearest_Low"] = ce_nearest_low | |
| entry["CE_Nearest_High"] = ce_nearest_high | |
| entry["PE_Nearest_Low"] = pe_nearest_low | |
| entry["PE_Nearest_High"] = pe_nearest_high | |
| updated_spot_atm_data.append(entry) | |
| collection.update_one({"_id": document["_id"]}, {"$set": {"SpotATMData": updated_spot_atm_data}}) | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| def find_nearest(data_array, target_value): | |
| """Find the premimu close to 5 for the given target value.""" | |
| if not data_array: | |
| return None | |
| closest = data_array[0] | |
| for item in data_array: | |
| if abs(item["Close"] - target_value) < abs(closest["Close"] - target_value): | |
| closest = item | |
| return closest | |
| if __name__ == "__main__": | |
| # main() | |
| find_premium_close_to_5("april", "03042023", "April2023", "03042023", "NIFTY06APR2317500CE", "NIFTY06APR2317500PE") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment