Skip to content

Instantly share code, notes, and snippets.

@ashavijit
Created August 7, 2024 07:00
Show Gist options
  • Select an option

  • Save ashavijit/f0c20ce31d78d4d83f0f9d63acb6ce87 to your computer and use it in GitHub Desktop.

Select an option

Save ashavijit/f0c20ce31d78d4d83f0f9d63acb6ce87 to your computer and use it in GitHub Desktop.
import re
import pandas as pd
from pymongo import MongoClient, errors
from datetime import datetime, timedelta
import threading
import logging
import os
import pymongo
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
client = MongoClient("mongodb://localhost:27017/")
base_folder = "C:/Users/aviji/Downloads/data/2023"
def round_to_nearest50(x):
"""Rounds a number to the nearest 50."""
remainder = x % 50
lower_bound = x - remainder
upper_bound = lower_bound + 50
if remainder < 25:
return round(lower_bound)
else:
return round(upper_bound)
def next_thursday(date_str):
"""Calculates the next Thursday date for a given date string."""
date = datetime.strptime(date_str, "%d/%m/%Y")
if date.weekday() == 3:
nearest_thursday = date
else:
days_until_thursday = (3 - date.weekday() + 7) % 7
if days_until_thursday == 0:
days_until_thursday = 7
nearest_thursday = date + timedelta(days_until_thursday)
return nearest_thursday.strftime("%d%b").upper()
def extract_year(date_str):
"""Extracts the last two digits of the year from a date string."""
date = datetime.strptime(date_str, "%d/%m/%Y")
return str(date.year)[2:]
def process_and_insert_data(csv_path, db_name, collection_name):
"""Processes CSV data and inserts records into MongoDB."""
db = client[db_name]
collection = db[collection_name]
try:
for chunk in pd.read_csv(csv_path, chunksize=1000):
for _, row in chunk.iterrows():
if row["Ticker"] == "NIFTY-I.NFO":
ticker_base = row["Ticker"].split("-")[0]
future_atm = round_to_nearest50(row["Open"])
next_thurs = next_thursday(row["Date"])
year = extract_year(row["Date"])
ce_string = f"{ticker_base}{next_thurs}{year}{int(future_atm)}CE"
pe_string = f"{ticker_base}{next_thurs}{year}{int(future_atm)}PE"
time = row["Time"].strip()
existing_document = collection.find_one({"FutureATM": future_atm, "CEstring": ce_string})
if existing_document:
if existing_document["PEstring"] != pe_string:
logging.warning(f"Inconsistent PEstring for CEstring: {ce_string}")
continue
else:
obj_for_mongo = {
"Index": ticker_base,
"FutureATM": future_atm,
"NextThursday": next_thurs,
"Year": year,
"CEstring": ce_string,
"PEstring": pe_string,
"SpotATMData": [],
}
collection.insert_one(obj_for_mongo)
logging.info(f"Inserted new document for CEstring: {ce_string}")
logging.info(f"Processed and inserted data from {csv_path}")
except errors.PyMongoError as e:
logging.error(f"Error inserting data into MongoDB: {e}")
except Exception as e:
logging.error(f"Error processing data: {e}")
def calculate_and_store_spot_atm(ceString, peString, csv_path, db_name, collection_name):
"""Calculates the ATM spot price and stores it in a single MongoDB document with a nested SpotATMData array."""
db = client[db_name]
collection = db[collection_name]
ce_data = {}
pe_data = {}
ce_low = {}
ce_high = {}
pe_low = {}
pe_high = {}
try:
for chunk in pd.read_csv(csv_path, chunksize=1000):
for _, row in chunk.iterrows():
ticker = row["Ticker"].strip()
row_date = row["Date"].strip()
row_time = row["Time"].strip()
timestamp = f"{row_date} {row_time}"
if ticker == f"{ceString}.NFO":
ce_data[timestamp] = row["Open"]
ce_low[timestamp] = row["Low"]
ce_high[timestamp] = row["High"]
if ticker == f"{peString}.NFO":
pe_data[timestamp] = row["Open"]
pe_low[timestamp] = row["Low"]
pe_high[timestamp] = row["High"]
common_timestamps = set(ce_data.keys()) & set(pe_data.keys())
spot_atm_data = []
ce_document = collection.find_one({"CEstring": ceString, "PEstring": peString})
if ce_document:
future_atm = ce_document.get("FutureATM", 0)
else:
logging.warning(f"No document found for CEstring: {ceString} and PEstring: {peString}")
return
for timestamp in common_timestamps:
ce_open = ce_data[timestamp]
pe_open = pe_data[timestamp]
ce_low_value = ce_low[timestamp]
ce_high_value = ce_high[timestamp]
pe_low_value = pe_low[timestamp]
pe_high_value = pe_high[timestamp]
spot_atm = (ce_open - pe_open) + (future_atm if future_atm else 0)
rounded_spot_atm = round_to_nearest50(spot_atm)
date, time = timestamp.split()
spot_atm_data.append(
{"Date": date, "Time": time, "spotAtm": rounded_spot_atm, "spot": spot_atm,
"CE_Low": ce_low_value, "CE_High": ce_high_value, "PE_Low": pe_low_value, "PE_High": pe_high_value, "_id": timestamp}
)
spot_atm_data.sort(key=lambda x: datetime.strptime(x["Time"], "%H:%M:%S"))
collection.update_one(
{"CEstring": ceString, "PEstring": peString},
{"$set": {"SpotATMData": spot_atm_data}},
upsert=True
)
logging.info(f"Stored ATM data for CEstring: {ceString} and PEstring: {peString} in MongoDB")
except Exception as e:
logging.error(f"Error calculating and storing spot ATM: {e}")
def process_all_data(csv_path, db_name, collection_name):
"""Processes all data and inserts records into MongoDB."""
db = client[db_name]
collection = db[collection_name]
try:
ce_pe_pairs = {}
for document in collection.find():
ceString = document.get("CEstring")
peString = document.get("PEstring")
if ceString and peString:
ce_pe_pairs[(ceString, peString)] = True
for ceString, peString in ce_pe_pairs.keys():
logging.info(f"Processing CE: {ceString} and PE: {peString}")
calculate_and_store_spot_atm(ceString, peString, csv_path, db_name, collection_name)
updated_document = collection.find_one({"CEstring": ceString})
if updated_document:
logging.info(f"Updated document for CE: {ceString}")
else:
logging.warning(f"No document found for CE: {ceString} after update")
except Exception as e:
logging.error(f"Error processing all data: {e}")
def atm_selection(ceString, peString, db_name, collection_name):
""" Selects the ATM strike price for the given CE and PE strings 10 Strike Prices away from the spot price. Botth upside and downside"""
db = client[db_name]
collection = db[collection_name]
try:
document = collection.find_one({"CEstring": ceString, "PEstring": peString})
if document:
spot_atm_data = document.get("SpotATMData", [])
if spot_atm_data:
spot_atm_values = [data.get("spotAtm") for data in spot_atm_data]
spot_atm_values.sort()
spot_atm = spot_atm_values[-1]
ce_strike = spot_atm + 500
pe_strike = spot_atm - 500
return ce_strike, pe_strike
else:
logging.warning(f"No spot ATM data found for CE: {ceString} and PE: {peString}")
else:
logging.warning(f"No document found for CE: {ceString} and PE: {peString}")
except Exception as e:
logging.error(f"Error selecting ATM strike prices: {e}")
def main():
months = sorted(os.listdir(base_folder))
months = [month for month in months if os.path.isdir(os.path.join(base_folder, month))]
for month in months:
folder_path = os.path.join(base_folder, month)
db_name = month.lower()
csv_files = sorted([file for file in os.listdir(folder_path) if file.endswith(".csv")])
for file in csv_files:
csv_path = os.path.join(folder_path, file)
collection_name = file.split("_")[-1].split(".")[0]
logging.info(f"Processing file: {csv_path} for month: {month}")
process_insert_thread = threading.Thread(
target=process_and_insert_data, args=(csv_path, db_name, collection_name))
process_insert_thread.start()
process_insert_thread.join()
process_all_data_thread = threading.Thread(
target=process_all_data, args=(csv_path, db_name, collection_name))
process_all_data_thread.start()
process_all_data_thread.join()
# def find_premium_closeto5(db_name, collection_name, db_name2, collection_name2, ceString, peString):
# """Find the closest premium to 5 for each CE and PE string and update the document in MongoDB."""
# db = client[db_name]
# collection = db[collection_name]
# db2 = client[db_name2]
# collection2 = db2[collection_name2]
# try:
# # Find the document with the specified CEstring and PEstring
# document = collection.find_one({"CEstring": ceString, "PEstring": peString})
# if not document:
# return None
# # Search strings for CE and PE in collection2
# search_string_list = [f"{ceString}.NFO", f"{peString}.NFO"]
# closest_ids = {"CE": None, "PE": None}
# for string in search_string_list:
# data = list(collection2.find({"strike": string}))
# if not data:
# continue
# close_prices = [(item["Close"], item["Time"]) for item in data[0]["data"]]
# print(close_prices,"close_prices")
# close_prices.sort() #
# # Perform binary search to find the closest value to 5
# prices = [item[0] for item in close_prices]
# idx = binary_search_closest(prices, 5)
# closest_id = close_prices[idx][1]
# # Update the corresponding key in closest_ids
# if string == search_string_list[0]: # CE string
# closest_ids["CE"] = closest_id
# else: # PE string
# closest_ids["PE"] = closest_id
# # Update the document with the closest premium _id references
# if closest_ids["CE"] or closest_ids["PE"]:
# update = {"$set": {"closestPremium5": closest_ids}}
# collection.update_one({"_id": document["_id"]}, update)
# except Exception as e:
# print(f"An error occurred: {e}")
"""
1. FIND the CE and PE strings in the collection2 as String format and store in search_string_list
2. For each string in search_string_list, for each timestamp in the spotATMData, find the closest premium to 5 for both CE and PE strings.
3. Update the document in the collection with the closest premium "timestamp" as "_refId" for both CE and PE strings.
4. update all the elements in the collection SpotATMData with the closest premium to 5 for both CE and PE strings.
"""
def find_premium_close_to_5(db_name, collection_name, db_name2, collection_name2, ceString, peString):
"""Find the closest difference to 5 for each CE and PE string and update the document in MongoDB."""
db = client[db_name]
collection = db[collection_name]
db2 = client[db_name2]
collection2 = db2[collection_name2]
try:
document = collection.find_one({"CEstring": ceString, "PEstring": peString})
if not document:
return None
ce_strike = f"{ceString}.NFO"
pe_strike = f"{peString}.NFO"
data_ce = collection2.find_one({"strike": ce_strike})
data_pe = collection2.find_one({"strike": pe_strike})
if not data_ce or not data_pe or "data" not in data_ce or "data" not in data_pe:
return None
spot_atm_data = document.get("SpotATMData", [])
updated_spot_atm_data = []
for entry in spot_atm_data:
closest_ce_diff = float('inf')
closest_ce_time = None
closest_pe_diff = float('inf')
closest_pe_time = None
for ce_data in data_ce["data"]:
ce_diff = abs(ce_data["Close"] - 5)
if ce_diff < closest_ce_diff:
closest_ce_diff = ce_diff
closest_ce_time = ce_data["Time"]
for pe_data in data_pe["data"]:
pe_diff = abs(pe_data["Close"] - 5)
if pe_diff < closest_pe_diff:
closest_pe_diff = pe_diff
closest_pe_time = pe_data["Time"]
entry["premium5"] = [{"CE_refId": closest_ce_time, "PE_refId": closest_pe_time}]
updated_spot_atm_data.append(entry)
collection.update_one({"_id": document["_id"]}, {"$set": {"SpotATMData": updated_spot_atm_data}})
except Exception as e:
print(f"An error occurred: {e}")
def binary_search_closest(arr, target) -> int:
"""Performs a binary search to find the closest value to the target in a sorted array."""
low = 0
high = len(arr) - 1
while low <= high:
mid = (low + high) // 2
mid_val = arr[mid]
if mid_val == target:
return mid
elif mid_val < target:
low = mid + 1
else:
high = mid - 1
if high < 0:
return low
elif low >= len(arr):
return high
else:
low_diff = abs(arr[low] - target)
high_diff = abs(arr[high] - target)
return low if low_diff < high_diff else high
def nearest_strike(x, strike_diff, range_) -> list:
"""Calculates the nearest strike prices based on the given parameters."""
lower_strike = x - (x % strike_diff)
upper_strike = lower_strike + strike_diff
strike = [lower_strike]
calculate_strike = lambda base, i, diff: base + (i * diff)
for i in range(1, range_):
lower_strike = calculate_strike(lower_strike, i, strike_diff)
upper_strike = calculate_strike(upper_strike, i, strike_diff)
strike.append(lower_strike)
strike.append(upper_strike)
return strike
def find_nearest_elements(db_name, collection_name, db_name2, collection_name2, ceString, peString):
"""Find the nearest 5 elements to CE_Low, CE_High, PE_Low, and PE_High for the same strike."""
db = client[db_name]
collection = db[collection_name]
db2 = client[db_name2]
collection2 = db2[collection_name2]
try:
document = collection.find_one({"CEstring": ceString, "PEstring": peString})
if not document:
return None
ce_strike = f"{ceString}.NFO"
pe_strike = f"{peString}.NFO"
data_ce = collection2.find_one({"strike": ce_strike})
data_pe = collection2.find_one({"strike": pe_strike})
if not data_ce or not data_pe or "data" not in data_ce or "data" not in data_pe:
return None
spot_atm_data = document.get("SpotATMData", [])
updated_spot_atm_data = []
for entry in spot_atm_data:
ce_low = entry.get("CE_Low", None)
ce_high = entry.get("CE_High", None)
pe_low = entry.get("PE_Low", None)
pe_high = entry.get("PE_High", None)
ce_nearest_low = find_nearest(data_ce["data"], ce_low)
ce_nearest_high = find_nearest(data_ce["data"], ce_high)
pe_nearest_low = find_nearest(data_pe["data"], pe_low)
pe_nearest_high = find_nearest(data_pe["data"], pe_high)
entry["CE_Nearest_Low"] = ce_nearest_low
entry["CE_Nearest_High"] = ce_nearest_high
entry["PE_Nearest_Low"] = pe_nearest_low
entry["PE_Nearest_High"] = pe_nearest_high
updated_spot_atm_data.append(entry)
collection.update_one({"_id": document["_id"]}, {"$set": {"SpotATMData": updated_spot_atm_data}})
except Exception as e:
print(f"An error occurred: {e}")
def find_nearest(data_array, target_value):
"""Find the premimu close to 5 for the given target value."""
if not data_array:
return None
closest = data_array[0]
for item in data_array:
if abs(item["Close"] - target_value) < abs(closest["Close"] - target_value):
closest = item
return closest
if __name__ == "__main__":
# main()
find_premium_close_to_5("april", "03042023", "April2023", "03042023", "NIFTY06APR2317500CE", "NIFTY06APR2317500PE")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment