Last active
January 22, 2026 16:00
-
-
Save robcarver17/f1e70ae38a27471572e7378a72b04e06 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from copy import copy | |
| import matplotlib | |
| import matplotlib.pyplot as plt | |
| from pandas.conftest import skipna | |
| matplotlib.use("TkAgg") | |
| matplotlib.rcParams.update({"font.size": 22}) | |
| from private.utils import image_process | |
| import pickle | |
| import datetime | |
| from typing import List | |
| import numpy as np | |
| import pandas as pd | |
| from private.projects.artandscience.core import sh | |
| from private.projects.artandscience.pandl_calculator import calculate_cagr | |
| from private.projects.artandscience.methods import SimpleTrend | |
| from syscore.constants import arg_not_supplied | |
| from syscore.interactive.progress_bar import progressBar | |
| from sysdata.csv.csv_adjusted_prices import csvFuturesAdjustedPricesData | |
| from systems.accounts.account_forecast import pandl_for_instrument_forecast | |
| def generate_state_sequence_at_date(lookback_days: int, state_count: int): | |
| step_size = int(lookback_days / state_count) | |
| steps = list(range(-lookback_days-1, -1, step_size)) | |
| return steps+[-1] | |
| def get_states_over_time(price_resamples: pd.Series): | |
| list_of_states = [ | |
| get_state_at_time(price_resamples, date_index) | |
| for date_index in price_resamples.index | |
| ] | |
| return pd.Series(list_of_states, price_resamples.index) | |
| no_state = -1 | |
| def get_state_at_time(price_resamples: pd.Series, date_index: datetime.datetime): | |
| price_up_to_time = price_resamples[:date_index] | |
| if len(price_up_to_time)<(-state_seq[0]+1): | |
| return no_state | |
| values = price_up_to_time[state_seq] | |
| returns = values.diff() | |
| returns_excluding_first_na = returns[1:] | |
| bin_digits = [one_else_zero(value) for value in returns_excluding_first_na] | |
| bin_digits = ''.join(bin_digits) | |
| return int(bin_digits, 2) | |
| def one_else_zero(value): | |
| x= '1' if value>0 else '0' | |
| return x | |
| ## 1 is U, 0 is D | |
| ## So UDD would be 100, which in decimal is 4 | |
| def fit_model(price_resamples: pd. Series, states_over_time: pd.Series, | |
| period_end: datetime.datetime, lookforward_days: int): | |
| fit_price = price_resamples[:period_end] | |
| vol = fit_price.diff().rolling(30).std().ffill() | |
| norm_returns_following = (fit_price.shift(-lookforward_days) - fit_price)/vol | |
| all_state_means = {} | |
| for state in range(2**state_count): | |
| in_state = norm_returns_following[states_over_time==state] | |
| if len(in_state)==0: | |
| all_state_means[state]= np.nan | |
| continue | |
| in_state_mean = in_state.mean() | |
| all_state_means[state] = in_state_mean | |
| scale_factor =1/pd.Series(all_state_means).abs().mean() | |
| all_state_means = dict([(key, value*scale_factor) for key, value in all_state_means.items()]) | |
| all_state_means[no_state] = np.nan | |
| return all_state_means | |
| def get_forecast(price_resamples: pd. Series, all_state_means: dict[int, float], states_over_time: pd.Series = arg_not_supplied): | |
| if states_over_time is arg_not_supplied: | |
| ## cross instrument fitting | |
| states_over_time = get_states_over_time(price_resamples) | |
| state_numbers = [all_state_means[state] for state in states_over_time.values] | |
| state_numbers = pd.Series(state_numbers, states_over_time.index) | |
| forecast = state_numbers * 10.0 | |
| return forecast | |
| def translate_simple_into_forecast(state_numbers: pd.Series, price: pd.Series): | |
| x = copy(state_numbers) | |
| x[x<0] = -10 | |
| x[x>0] = 10 | |
| x = x.reindex(price.index, method="ffill") | |
| return x | |
| ### optimisation example | |
| import numpy as np | |
| from sysquant.optimisation.shared import sigma_from_corr_and_std | |
| from typing import Dict, List | |
| import matplotlib | |
| from scipy.optimize import minimize | |
| ### Following is the optimisation code: | |
| ### First the main function. | |
| def optimise_with_sigma(sigma, mean_list, objective_func): | |
| """ | |
| Given mean and covariance matrix, find portfolio weights that maximise SR | |
| We assume we can have as much leverage as we like to hit a given risk target, hence we can | |
| constrain weights to sum to 1 i.e. no cash option | |
| :param sigma: np.array NxN, covariance matrix | |
| :param mean_list: list of floats N in length, means | |
| :return: list of weights | |
| """ | |
| mus = np.array(mean_list, ndmin=2).transpose() | |
| number_assets = sigma.shape[1] | |
| # Starting weights, equal weighting | |
| start_weights = [1.0 / number_assets] * number_assets | |
| # Set up constraints - positive weights, adding to 1.0 | |
| bounds = [(0.0, 1.0)] * number_assets | |
| cdict = [{"type": "eq", "fun": addem}] | |
| ans = minimize( | |
| objective_func, | |
| start_weights, | |
| (sigma, mus), | |
| method="SLSQP", | |
| bounds=bounds, | |
| constraints=cdict, | |
| tol=0.00001, | |
| ) | |
| weights = ans["x"] | |
| return weights | |
| ## This is the function we optimise over. We assume we can use required leverage, so we maximise SR with weights set to one | |
| def neg_SR(weights, sigma, mus): | |
| weights = np.matrix(weights) | |
| estreturn = (weights * mus)[0, 0] | |
| pvar = variance(weights, sigma) | |
| std_dev = pvar** 0.5 | |
| return -estreturn / std_dev | |
| def variance(weights, sigma): | |
| # returns the variance (NOT standard deviation) given weights and sigma | |
| return (weights * sigma * weights.transpose())[0, 0] | |
| def addem(weights): | |
| # Used for constraints, weights must sum to 1 | |
| return 1.0 - sum(weights) | |
| # Useful for working in standard deviation and correlation space | |
| # rather than covariance space | |
| def optimise_with_corr_and_std(mean_list, stdev_list, corrmatrix, objective_func): | |
| """ | |
| Given mean, standard deviation and correlation matrix, find portfolio weights that maximise SR | |
| :param mean_list: list of N floats | |
| :param stdev_list: list of N floats | |
| :param corrmatrix: NxN np.array | |
| :return: list of floats | |
| """ | |
| sigma = sigma_from_corr_and_std(stdev_list, corrmatrix) | |
| weights = optimise_with_sigma(sigma, mean_list, objective_func) | |
| return weights | |
| WEEKS_IN_YEAR = 365.25 / 7 | |
| ## Some functions to analyse data and return various statistics | |
| ## Assumes weekly returns | |
| def annual_returns_from_weekly_data(data): | |
| return data.mean() * WEEKS_IN_YEAR | |
| def annual_stdev_from_weekly_data(data): | |
| return data.std() * (WEEKS_IN_YEAR**0.5) | |
| def sharpe_ratio(data): | |
| SR = annual_returns_from_weekly_data(data) / annual_stdev_from_weekly_data(data) | |
| return SR | |
| def optimise_with_data(data, objective_func): | |
| """ | |
| Given some data, find the optimal SR portfolio | |
| :param data: np.DataFrame containing N columns | |
| :return: list of N floats | |
| """ | |
| mean_list = annual_returns_from_weekly_data(data).values | |
| stdev_list = annual_stdev_from_weekly_data(data).values | |
| corrmatrix = data.corr().values | |
| weights = optimise_with_corr_and_std(mean_list, stdev_list, corrmatrix, objective_func) | |
| return weights | |
| def optimise_with_shrinkage(data, mean_factor: float=0, corr_factor: float=0, objective_func=neg_SR): | |
| mean_list = annual_returns_from_weekly_data(data).values | |
| stdev_list = np.array([.3]*len(mean_list)) ## arbitrary, max SR | |
| corrmatrix = data.corr().values | |
| mean_list = shrink_mean(mean_list, mean_factor) | |
| corrmatrix = shrink_corr(corrmatrix, corr_factor) | |
| weights = optimise_with_corr_and_std(mean_list, stdev_list, corrmatrix, objective_func) | |
| weights = dict([(key,wt) for key,wt in zip(data.columns, list(weights))]) | |
| return weights | |
| def shrink_mean(mean_list, factor): | |
| mean_series = pd.Series(mean_list) | |
| mean_series_avg = mean_series.mean() | |
| return np.array([x*(1-factor)+mean_series_avg*(factor) for x in mean_series]) | |
| def shrink_corr(corrmatrix, factor): | |
| df_corr = pd.DataFrame(corrmatrix) | |
| copy_df_corr = copy(df_corr) | |
| np.fill_diagonal(copy_df_corr.values, np.nan) | |
| avg = np.nanmean(copy_df_corr.values) | |
| avg_corr = pd.DataFrame(avg, index=range(len(df_corr.columns)), columns=range(len(df_corr.columns))) | |
| np.fill_diagonal(avg_corr.values, 1) | |
| shrunk_df = df_corr*(1-factor) + avg_corr*factor | |
| return shrunk_df.values | |
| def calculate_idm(data, weights): | |
| std_weights = data.std().mean() | |
| portfolio = pd.Series(weights)*data | |
| portfolio = portfolio.sum(axis=1) | |
| std_portfolio = portfolio.std() | |
| return std_weights /std_portfolio | |
| def portfolio_with_weights(data, weights: dict, idm: float): | |
| portfolio = pd.Series(weights) * data | |
| portfolio = portfolio.sum(axis=1) | |
| portfolio = portfolio * idm | |
| return portfolio | |
| def portfolio_with_weights_df(data, weights: pd.DataFrame): | |
| indexed_weights = weights.reindex(data.index, method="ffill") | |
| portfolio = data*indexed_weights | |
| portfolio = portfolio.sum(axis=1) | |
| return portfolio | |
| """ This code is used by me to get the data from my trading system. You can ignore it | |
| import pandas as pd | |
| from systems.provided.rob_system.run_system import futures_system | |
| system = futures_system() | |
| underlying_price_dict = dict([(key, | |
| system.data[key]) | |
| for key in ['US10', 'SP500', 'CORN', 'CRUDE_W', 'GOLD', 'GBP', 'US5', 'SILVER'] | |
| ] | |
| ) | |
| underlying_price_df = pd.DataFrame(underlying_price_dict)/100 | |
| underlying_price= underlying_price_df.resample("1B").last() | |
| underlying_price_df.to_csv('/home/rob/underlying_price_data.csv') | |
| strategy_return_dict = dict( | |
| [("%s_%s" % (instrument, rule_name), | |
| system.accounts.pandl_for_instrument_forecast(instrument, rule_name ).percent) | |
| for instrument, rule_name in [ | |
| ('SP500', 'momentum4'), ('SP500', 'momentum16'),('SP500', 'skewabs365'), | |
| ('GOLD', 'momentum4'), ('GOLD', 'momentum16'), ('GOLD', 'skewabs365'), | |
| ('US10', 'momentum4'), ('US10', 'momentum16'), ('US10', 'skewabs365'), | |
| ('GBP', 'momentum4'), ('GBP', 'momentum16'), ('GBP', 'skewabs365'), | |
| ] | |
| ] | |
| ) | |
| strategy_return_df = pd.DataFrame(strategy_return_dict)/100 | |
| strategy_return_df = strategy_return_df.resample("7D").sum() ## small approx | |
| strategy_return_df[strategy_return_df==0]= np.nan | |
| strategy_return_df.to_csv('/home/rob/strategy_data.csv') | |
| """ | |
| import pandas as pd | |
| strategy_return_df = pd.read_csv('strategy_data.csv') # replace with file location | |
| underlying_price_df = pd.read_csv('strategy_data.csv') # replace with file location | |
| lookback_days = 16 | |
| lookforward_days = 5 | |
| instrument = "GOLD" | |
| price= underlying_price_df[instrument].ffill() | |
| price = price.dropna() | |
| break_point = price.index[0]+(price.index[-1] - price.index[0])/2 | |
| all_pandl = {} | |
| perf_stats = dict(in_sample={}, out_sample={}) | |
| for state_count in [1,2,4,8,16]: | |
| print(state_count) | |
| state_seq = generate_state_sequence_at_date(lookback_days, state_count) | |
| states_over_time = get_states_over_time(price) | |
| all_state_means_in_sample = fit_model(price, states_over_time,break_point, | |
| lookforward_days=lookforward_days) | |
| forecast= get_forecast(price, states_over_time=states_over_time, | |
| all_state_means=all_state_means_in_sample) | |
| pandl = pandl_for_instrument_forecast(forecast, price).as_ts | |
| pandl_together = pd.concat([pandl[:break_point].cumsum(), pandl[break_point:].cumsum()], axis=0) | |
| perf_stats['in_sample'][state_count] = pandl[:break_point].mean()*256 | |
| perf_stats['out_sample'][state_count] = pandl[break_point:].mean()*256 | |
| all_pandl[state_count] = pandl_together | |
| all_pandl = pd.DataFrame(all_pandl) | |
| all_pandl.plot() | |
| all_stats = pd.DataFrame(perf_stats) | |
| all_stats.plot() | |
| plt.title(instrument) | |
| sh() | |
| from copy import copy | |
| matplotlib.rcParams.update({"font.size": 10}) | |
| strategy_return_df.cumsum().plot() | |
| sh() | |
| matplotlib.rcParams.update({"font.size": 10}) | |
| fig = plt.figure() | |
| count = 1 | |
| for mean_factor in [0, .25, .5, .75, 1]: | |
| for cor_factor in [0, .25, .5, .75, 1]: | |
| plt.subplot(5, 5, count) | |
| to_plot = optimise_with_shrinkage(strategy_return_df, mean_factor, cor_factor) | |
| to_plot = pd.Series(to_plot) | |
| to_plot.plot.bar() | |
| plt.gca().set_xticklabels([]) | |
| count += 1 | |
| break_point = strategy_return_df.index[0]+(strategy_return_df.index[-1] - strategy_return_df.index[0])/2 | |
| ## half and half plots | |
| all_h2 = {} | |
| mean_factor = 0.95 | |
| for cor_factor in [ .5, .6,.7,.8,.9, 1]: | |
| print("%f %f" % (mean_factor, cor_factor)) | |
| weights =optimise_with_shrinkage(strategy_return_df[:break_point], mean_factor, cor_factor) | |
| idm = calculate_idm(strategy_return_df[:break_point], weights) | |
| results = portfolio_with_weights(strategy_return_df, weights, idm) | |
| results_joint = pd.concat([results[:break_point].cumsum(), results[break_point:].cumsum()],axis=0) | |
| all_h2[cor_factor] = results_joint | |
| all_h2=pd.DataFrame(all_h2) | |
| all_h2 = {} | |
| perf_stats = dict(in_sample={}, out_sample={}) | |
| cor_factor = .5 | |
| for mean_factor in [ 0, .5, .85,.9,.95, 1]: | |
| weights =optimise_with_shrinkage(strategy_return_df[:break_point], mean_factor, cor_factor) | |
| idm = calculate_idm(strategy_return_df[:break_point], weights) | |
| results = portfolio_with_weights(strategy_return_df, weights, idm) | |
| results_joint = pd.concat([results[:break_point].cumsum(), results[break_point:].cumsum()],axis=0) | |
| perf_stats['in_sample'][mean_factor] = results[:break_point].mean()*52 | |
| perf_stats['out_sample'][mean_factor] = results[break_point:].mean()*52 | |
| all_h2[mean_factor] = results_joint | |
| all_h2=pd.DataFrame(all_h2) | |
| all_stats = pd.DataFrame(perf_stats) | |
| #### Rolling OOS using some reasonable shrinkage | |
| period_ends = pd.date_range(strategy_return_df.index[0], end=strategy_return_df.index[-1], freq="12M") | |
| all_curves = {} | |
| all_returns = {} | |
| for mean_factor in [0.5, .6, .7, .8, .9, 1]: | |
| all_returns[mean_factor] ={} | |
| for cor_factor in [0, .25, .5, .75, 1]: | |
| weight_list = [] | |
| for end_of_period in period_ends: | |
| subset = strategy_return_df[:end_of_period] | |
| subset = subset.dropna(axis=1, how="all") | |
| weights = optimise_with_shrinkage(subset, mean_factor, cor_factor) | |
| weights_ts = pd.DataFrame([weights], index=[end_of_period]) | |
| idm = calculate_idm(subset, weights) | |
| weights_ts = weights_ts*idm | |
| weight_list.append(weights_ts) | |
| weight_df = pd.concat(weight_list, axis=0) | |
| results = portfolio_with_weights_df(strategy_return_df, weight_df) | |
| all_curves["%f_%f" % (mean_factor, cor_factor)] = results | |
| all_returns[mean_factor][cor_factor] = results.mean() | |
| all_curves_df = pd.DataFrame(all_curves) | |
| matplotlib.rcParams.update({"font.size": 10}) | |
| all_curves_df.cumsum().plot() | |
| matplotlib.rcParams.update({"font.size": 20}) | |
| all_returns_df = pd.DataFrame(all_returns) | |
| #### Rolling OOS using some reasonable shrinkage | |
| period_ends = pd.date_range(strategy_return_df.index[0], end=strategy_return_df.index[-1], freq="12M") | |
| all_curves = {} | |
| all_returns = {} | |
| mean_factor = .8 | |
| cor_factor = 0.2 | |
| for period_length in [1,2,5,10,20,999]: | |
| weight_list = [] | |
| for idx in range(len(period_ends))[1:]: | |
| start_of_period_idx = max([0,idx-period_length]) | |
| start_of_period = period_ends[start_of_period_idx] | |
| end_of_period = period_ends[idx] | |
| subset = strategy_return_df[start_of_period:end_of_period] | |
| subset = subset.dropna(axis=1, how="all") | |
| weights = optimise_with_shrinkage(subset, mean_factor, cor_factor) | |
| weights_ts = pd.DataFrame([weights], index=[end_of_period]) | |
| idm = calculate_idm(subset, weights) | |
| weights_ts = weights_ts*idm | |
| weight_list.append(weights_ts) | |
| weight_df = pd.concat(weight_list, axis=0) | |
| results = portfolio_with_weights_df(strategy_return_df, weight_df) | |
| all_curves[period_length] = results | |
| all_returns[period_length] = results.mean() | |
| all_curves_df = pd.DataFrame(all_curves) | |
| matplotlib.rcParams.update({"font.size": 20}) | |
| all_curves_df.cumsum().plot() | |
| matplotlib.rcParams.update({"font.size": 20}) | |
| all_returns_df = pd.Series(all_returns) | |
| all_returns_df.index = all_returns_df.index.astype(float) | |
| all_returns_df.plot() | |
| sh() | |
| ## do one and repeat | |
| all_curves = {} | |
| mean_factor = .8 | |
| cor_factor = 0.2 | |
| period_length = 999 | |
| weight_list = [] | |
| for idx in range(len(period_ends))[1:]: | |
| start_of_period_idx = max([0,idx-period_length]) | |
| start_of_period = period_ends[start_of_period_idx] | |
| end_of_period = period_ends[idx] | |
| subset = strategy_return_df[start_of_period:end_of_period] | |
| subset = subset.dropna(axis=1, how="all") | |
| weights = optimise_with_shrinkage(subset, mean_factor, cor_factor) | |
| weights_ts = pd.DataFrame([weights], index=[end_of_period]) | |
| idm = calculate_idm(subset, weights) | |
| weights_ts = weights_ts*idm | |
| weight_list.append(weights_ts) | |
| weight_df = pd.concat(weight_list, axis=0) | |
| results = portfolio_with_weights_df(strategy_return_df, weight_df) | |
| all_curves["oos"] = results | |
| ## in sample more tricky due to markets moving in and out will need to recalc IDM | |
| final_weights = weights | |
| pd.Series(final_weights).plot.bar() | |
| matplotlib.pyplot.subplots_adjust(bottom=.3) | |
| weight_list = [] | |
| for idx in range(len(period_ends))[1:]: | |
| start_of_period_idx = max([0,idx-period_length]) | |
| start_of_period = period_ends[start_of_period_idx] | |
| end_of_period = period_ends[idx] | |
| subset = strategy_return_df[start_of_period:end_of_period] | |
| subset = subset.dropna(axis=1, how="all") | |
| subset_columns = list(subset.columns) | |
| new_weights = {} | |
| for instrument in weights.keys(): | |
| if instrument in subset_columns: | |
| new_weights[instrument] = weights[instrument] | |
| else: | |
| new_weights[instrument] = 0 | |
| weights_ts = pd.DataFrame([new_weights], index=[end_of_period]) | |
| idm = calculate_idm(subset, new_weights) | |
| weights_ts = weights_ts*idm | |
| weight_list.append(weights_ts) | |
| weight_df = pd.concat(weight_list, axis=0) | |
| results = portfolio_with_weights_df(strategy_return_df, weight_df) | |
| all_curves["is"] = results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment