Skip to content

Instantly share code, notes, and snippets.

@robcarver17
Last active January 22, 2026 16:00
Show Gist options
  • Select an option

  • Save robcarver17/f1e70ae38a27471572e7378a72b04e06 to your computer and use it in GitHub Desktop.

Select an option

Save robcarver17/f1e70ae38a27471572e7378a72b04e06 to your computer and use it in GitHub Desktop.
from copy import copy
import matplotlib
import matplotlib.pyplot as plt
from pandas.conftest import skipna
matplotlib.use("TkAgg")
matplotlib.rcParams.update({"font.size": 22})
from private.utils import image_process
import pickle
import datetime
from typing import List
import numpy as np
import pandas as pd
from private.projects.artandscience.core import sh
from private.projects.artandscience.pandl_calculator import calculate_cagr
from private.projects.artandscience.methods import SimpleTrend
from syscore.constants import arg_not_supplied
from syscore.interactive.progress_bar import progressBar
from sysdata.csv.csv_adjusted_prices import csvFuturesAdjustedPricesData
from systems.accounts.account_forecast import pandl_for_instrument_forecast
def generate_state_sequence_at_date(lookback_days: int, state_count: int):
step_size = int(lookback_days / state_count)
steps = list(range(-lookback_days-1, -1, step_size))
return steps+[-1]
def get_states_over_time(price_resamples: pd.Series):
list_of_states = [
get_state_at_time(price_resamples, date_index)
for date_index in price_resamples.index
]
return pd.Series(list_of_states, price_resamples.index)
no_state = -1
def get_state_at_time(price_resamples: pd.Series, date_index: datetime.datetime):
price_up_to_time = price_resamples[:date_index]
if len(price_up_to_time)<(-state_seq[0]+1):
return no_state
values = price_up_to_time[state_seq]
returns = values.diff()
returns_excluding_first_na = returns[1:]
bin_digits = [one_else_zero(value) for value in returns_excluding_first_na]
bin_digits = ''.join(bin_digits)
return int(bin_digits, 2)
def one_else_zero(value):
x= '1' if value>0 else '0'
return x
## 1 is U, 0 is D
## So UDD would be 100, which in decimal is 4
def fit_model(price_resamples: pd. Series, states_over_time: pd.Series,
period_end: datetime.datetime, lookforward_days: int):
fit_price = price_resamples[:period_end]
vol = fit_price.diff().rolling(30).std().ffill()
norm_returns_following = (fit_price.shift(-lookforward_days) - fit_price)/vol
all_state_means = {}
for state in range(2**state_count):
in_state = norm_returns_following[states_over_time==state]
if len(in_state)==0:
all_state_means[state]= np.nan
continue
in_state_mean = in_state.mean()
all_state_means[state] = in_state_mean
scale_factor =1/pd.Series(all_state_means).abs().mean()
all_state_means = dict([(key, value*scale_factor) for key, value in all_state_means.items()])
all_state_means[no_state] = np.nan
return all_state_means
def get_forecast(price_resamples: pd. Series, all_state_means: dict[int, float], states_over_time: pd.Series = arg_not_supplied):
if states_over_time is arg_not_supplied:
## cross instrument fitting
states_over_time = get_states_over_time(price_resamples)
state_numbers = [all_state_means[state] for state in states_over_time.values]
state_numbers = pd.Series(state_numbers, states_over_time.index)
forecast = state_numbers * 10.0
return forecast
def translate_simple_into_forecast(state_numbers: pd.Series, price: pd.Series):
x = copy(state_numbers)
x[x<0] = -10
x[x>0] = 10
x = x.reindex(price.index, method="ffill")
return x
### optimisation example
import numpy as np
from sysquant.optimisation.shared import sigma_from_corr_and_std
from typing import Dict, List
import matplotlib
from scipy.optimize import minimize
### Following is the optimisation code:
### First the main function.
def optimise_with_sigma(sigma, mean_list, objective_func):
"""
Given mean and covariance matrix, find portfolio weights that maximise SR
We assume we can have as much leverage as we like to hit a given risk target, hence we can
constrain weights to sum to 1 i.e. no cash option
:param sigma: np.array NxN, covariance matrix
:param mean_list: list of floats N in length, means
:return: list of weights
"""
mus = np.array(mean_list, ndmin=2).transpose()
number_assets = sigma.shape[1]
# Starting weights, equal weighting
start_weights = [1.0 / number_assets] * number_assets
# Set up constraints - positive weights, adding to 1.0
bounds = [(0.0, 1.0)] * number_assets
cdict = [{"type": "eq", "fun": addem}]
ans = minimize(
objective_func,
start_weights,
(sigma, mus),
method="SLSQP",
bounds=bounds,
constraints=cdict,
tol=0.00001,
)
weights = ans["x"]
return weights
## This is the function we optimise over. We assume we can use required leverage, so we maximise SR with weights set to one
def neg_SR(weights, sigma, mus):
weights = np.matrix(weights)
estreturn = (weights * mus)[0, 0]
pvar = variance(weights, sigma)
std_dev = pvar** 0.5
return -estreturn / std_dev
def variance(weights, sigma):
# returns the variance (NOT standard deviation) given weights and sigma
return (weights * sigma * weights.transpose())[0, 0]
def addem(weights):
# Used for constraints, weights must sum to 1
return 1.0 - sum(weights)
# Useful for working in standard deviation and correlation space
# rather than covariance space
def optimise_with_corr_and_std(mean_list, stdev_list, corrmatrix, objective_func):
"""
Given mean, standard deviation and correlation matrix, find portfolio weights that maximise SR
:param mean_list: list of N floats
:param stdev_list: list of N floats
:param corrmatrix: NxN np.array
:return: list of floats
"""
sigma = sigma_from_corr_and_std(stdev_list, corrmatrix)
weights = optimise_with_sigma(sigma, mean_list, objective_func)
return weights
WEEKS_IN_YEAR = 365.25 / 7
## Some functions to analyse data and return various statistics
## Assumes weekly returns
def annual_returns_from_weekly_data(data):
return data.mean() * WEEKS_IN_YEAR
def annual_stdev_from_weekly_data(data):
return data.std() * (WEEKS_IN_YEAR**0.5)
def sharpe_ratio(data):
SR = annual_returns_from_weekly_data(data) / annual_stdev_from_weekly_data(data)
return SR
def optimise_with_data(data, objective_func):
"""
Given some data, find the optimal SR portfolio
:param data: np.DataFrame containing N columns
:return: list of N floats
"""
mean_list = annual_returns_from_weekly_data(data).values
stdev_list = annual_stdev_from_weekly_data(data).values
corrmatrix = data.corr().values
weights = optimise_with_corr_and_std(mean_list, stdev_list, corrmatrix, objective_func)
return weights
def optimise_with_shrinkage(data, mean_factor: float=0, corr_factor: float=0, objective_func=neg_SR):
mean_list = annual_returns_from_weekly_data(data).values
stdev_list = np.array([.3]*len(mean_list)) ## arbitrary, max SR
corrmatrix = data.corr().values
mean_list = shrink_mean(mean_list, mean_factor)
corrmatrix = shrink_corr(corrmatrix, corr_factor)
weights = optimise_with_corr_and_std(mean_list, stdev_list, corrmatrix, objective_func)
weights = dict([(key,wt) for key,wt in zip(data.columns, list(weights))])
return weights
def shrink_mean(mean_list, factor):
mean_series = pd.Series(mean_list)
mean_series_avg = mean_series.mean()
return np.array([x*(1-factor)+mean_series_avg*(factor) for x in mean_series])
def shrink_corr(corrmatrix, factor):
df_corr = pd.DataFrame(corrmatrix)
copy_df_corr = copy(df_corr)
np.fill_diagonal(copy_df_corr.values, np.nan)
avg = np.nanmean(copy_df_corr.values)
avg_corr = pd.DataFrame(avg, index=range(len(df_corr.columns)), columns=range(len(df_corr.columns)))
np.fill_diagonal(avg_corr.values, 1)
shrunk_df = df_corr*(1-factor) + avg_corr*factor
return shrunk_df.values
def calculate_idm(data, weights):
std_weights = data.std().mean()
portfolio = pd.Series(weights)*data
portfolio = portfolio.sum(axis=1)
std_portfolio = portfolio.std()
return std_weights /std_portfolio
def portfolio_with_weights(data, weights: dict, idm: float):
portfolio = pd.Series(weights) * data
portfolio = portfolio.sum(axis=1)
portfolio = portfolio * idm
return portfolio
def portfolio_with_weights_df(data, weights: pd.DataFrame):
indexed_weights = weights.reindex(data.index, method="ffill")
portfolio = data*indexed_weights
portfolio = portfolio.sum(axis=1)
return portfolio
""" This code is used by me to get the data from my trading system. You can ignore it
import pandas as pd
from systems.provided.rob_system.run_system import futures_system
system = futures_system()
underlying_price_dict = dict([(key,
system.data[key])
for key in ['US10', 'SP500', 'CORN', 'CRUDE_W', 'GOLD', 'GBP', 'US5', 'SILVER']
]
)
underlying_price_df = pd.DataFrame(underlying_price_dict)/100
underlying_price= underlying_price_df.resample("1B").last()
underlying_price_df.to_csv('/home/rob/underlying_price_data.csv')
strategy_return_dict = dict(
[("%s_%s" % (instrument, rule_name),
system.accounts.pandl_for_instrument_forecast(instrument, rule_name ).percent)
for instrument, rule_name in [
('SP500', 'momentum4'), ('SP500', 'momentum16'),('SP500', 'skewabs365'),
('GOLD', 'momentum4'), ('GOLD', 'momentum16'), ('GOLD', 'skewabs365'),
('US10', 'momentum4'), ('US10', 'momentum16'), ('US10', 'skewabs365'),
('GBP', 'momentum4'), ('GBP', 'momentum16'), ('GBP', 'skewabs365'),
]
]
)
strategy_return_df = pd.DataFrame(strategy_return_dict)/100
strategy_return_df = strategy_return_df.resample("7D").sum() ## small approx
strategy_return_df[strategy_return_df==0]= np.nan
strategy_return_df.to_csv('/home/rob/strategy_data.csv')
"""
import pandas as pd
strategy_return_df = pd.read_csv('strategy_data.csv') # replace with file location
underlying_price_df = pd.read_csv('strategy_data.csv') # replace with file location
lookback_days = 16
lookforward_days = 5
instrument = "GOLD"
price= underlying_price_df[instrument].ffill()
price = price.dropna()
break_point = price.index[0]+(price.index[-1] - price.index[0])/2
all_pandl = {}
perf_stats = dict(in_sample={}, out_sample={})
for state_count in [1,2,4,8,16]:
print(state_count)
state_seq = generate_state_sequence_at_date(lookback_days, state_count)
states_over_time = get_states_over_time(price)
all_state_means_in_sample = fit_model(price, states_over_time,break_point,
lookforward_days=lookforward_days)
forecast= get_forecast(price, states_over_time=states_over_time,
all_state_means=all_state_means_in_sample)
pandl = pandl_for_instrument_forecast(forecast, price).as_ts
pandl_together = pd.concat([pandl[:break_point].cumsum(), pandl[break_point:].cumsum()], axis=0)
perf_stats['in_sample'][state_count] = pandl[:break_point].mean()*256
perf_stats['out_sample'][state_count] = pandl[break_point:].mean()*256
all_pandl[state_count] = pandl_together
all_pandl = pd.DataFrame(all_pandl)
all_pandl.plot()
all_stats = pd.DataFrame(perf_stats)
all_stats.plot()
plt.title(instrument)
sh()
from copy import copy
matplotlib.rcParams.update({"font.size": 10})
strategy_return_df.cumsum().plot()
sh()
matplotlib.rcParams.update({"font.size": 10})
fig = plt.figure()
count = 1
for mean_factor in [0, .25, .5, .75, 1]:
for cor_factor in [0, .25, .5, .75, 1]:
plt.subplot(5, 5, count)
to_plot = optimise_with_shrinkage(strategy_return_df, mean_factor, cor_factor)
to_plot = pd.Series(to_plot)
to_plot.plot.bar()
plt.gca().set_xticklabels([])
count += 1
break_point = strategy_return_df.index[0]+(strategy_return_df.index[-1] - strategy_return_df.index[0])/2
## half and half plots
all_h2 = {}
mean_factor = 0.95
for cor_factor in [ .5, .6,.7,.8,.9, 1]:
print("%f %f" % (mean_factor, cor_factor))
weights =optimise_with_shrinkage(strategy_return_df[:break_point], mean_factor, cor_factor)
idm = calculate_idm(strategy_return_df[:break_point], weights)
results = portfolio_with_weights(strategy_return_df, weights, idm)
results_joint = pd.concat([results[:break_point].cumsum(), results[break_point:].cumsum()],axis=0)
all_h2[cor_factor] = results_joint
all_h2=pd.DataFrame(all_h2)
all_h2 = {}
perf_stats = dict(in_sample={}, out_sample={})
cor_factor = .5
for mean_factor in [ 0, .5, .85,.9,.95, 1]:
weights =optimise_with_shrinkage(strategy_return_df[:break_point], mean_factor, cor_factor)
idm = calculate_idm(strategy_return_df[:break_point], weights)
results = portfolio_with_weights(strategy_return_df, weights, idm)
results_joint = pd.concat([results[:break_point].cumsum(), results[break_point:].cumsum()],axis=0)
perf_stats['in_sample'][mean_factor] = results[:break_point].mean()*52
perf_stats['out_sample'][mean_factor] = results[break_point:].mean()*52
all_h2[mean_factor] = results_joint
all_h2=pd.DataFrame(all_h2)
all_stats = pd.DataFrame(perf_stats)
#### Rolling OOS using some reasonable shrinkage
period_ends = pd.date_range(strategy_return_df.index[0], end=strategy_return_df.index[-1], freq="12M")
all_curves = {}
all_returns = {}
for mean_factor in [0.5, .6, .7, .8, .9, 1]:
all_returns[mean_factor] ={}
for cor_factor in [0, .25, .5, .75, 1]:
weight_list = []
for end_of_period in period_ends:
subset = strategy_return_df[:end_of_period]
subset = subset.dropna(axis=1, how="all")
weights = optimise_with_shrinkage(subset, mean_factor, cor_factor)
weights_ts = pd.DataFrame([weights], index=[end_of_period])
idm = calculate_idm(subset, weights)
weights_ts = weights_ts*idm
weight_list.append(weights_ts)
weight_df = pd.concat(weight_list, axis=0)
results = portfolio_with_weights_df(strategy_return_df, weight_df)
all_curves["%f_%f" % (mean_factor, cor_factor)] = results
all_returns[mean_factor][cor_factor] = results.mean()
all_curves_df = pd.DataFrame(all_curves)
matplotlib.rcParams.update({"font.size": 10})
all_curves_df.cumsum().plot()
matplotlib.rcParams.update({"font.size": 20})
all_returns_df = pd.DataFrame(all_returns)
#### Rolling OOS using some reasonable shrinkage
period_ends = pd.date_range(strategy_return_df.index[0], end=strategy_return_df.index[-1], freq="12M")
all_curves = {}
all_returns = {}
mean_factor = .8
cor_factor = 0.2
for period_length in [1,2,5,10,20,999]:
weight_list = []
for idx in range(len(period_ends))[1:]:
start_of_period_idx = max([0,idx-period_length])
start_of_period = period_ends[start_of_period_idx]
end_of_period = period_ends[idx]
subset = strategy_return_df[start_of_period:end_of_period]
subset = subset.dropna(axis=1, how="all")
weights = optimise_with_shrinkage(subset, mean_factor, cor_factor)
weights_ts = pd.DataFrame([weights], index=[end_of_period])
idm = calculate_idm(subset, weights)
weights_ts = weights_ts*idm
weight_list.append(weights_ts)
weight_df = pd.concat(weight_list, axis=0)
results = portfolio_with_weights_df(strategy_return_df, weight_df)
all_curves[period_length] = results
all_returns[period_length] = results.mean()
all_curves_df = pd.DataFrame(all_curves)
matplotlib.rcParams.update({"font.size": 20})
all_curves_df.cumsum().plot()
matplotlib.rcParams.update({"font.size": 20})
all_returns_df = pd.Series(all_returns)
all_returns_df.index = all_returns_df.index.astype(float)
all_returns_df.plot()
sh()
## do one and repeat
all_curves = {}
mean_factor = .8
cor_factor = 0.2
period_length = 999
weight_list = []
for idx in range(len(period_ends))[1:]:
start_of_period_idx = max([0,idx-period_length])
start_of_period = period_ends[start_of_period_idx]
end_of_period = period_ends[idx]
subset = strategy_return_df[start_of_period:end_of_period]
subset = subset.dropna(axis=1, how="all")
weights = optimise_with_shrinkage(subset, mean_factor, cor_factor)
weights_ts = pd.DataFrame([weights], index=[end_of_period])
idm = calculate_idm(subset, weights)
weights_ts = weights_ts*idm
weight_list.append(weights_ts)
weight_df = pd.concat(weight_list, axis=0)
results = portfolio_with_weights_df(strategy_return_df, weight_df)
all_curves["oos"] = results
## in sample more tricky due to markets moving in and out will need to recalc IDM
final_weights = weights
pd.Series(final_weights).plot.bar()
matplotlib.pyplot.subplots_adjust(bottom=.3)
weight_list = []
for idx in range(len(period_ends))[1:]:
start_of_period_idx = max([0,idx-period_length])
start_of_period = period_ends[start_of_period_idx]
end_of_period = period_ends[idx]
subset = strategy_return_df[start_of_period:end_of_period]
subset = subset.dropna(axis=1, how="all")
subset_columns = list(subset.columns)
new_weights = {}
for instrument in weights.keys():
if instrument in subset_columns:
new_weights[instrument] = weights[instrument]
else:
new_weights[instrument] = 0
weights_ts = pd.DataFrame([new_weights], index=[end_of_period])
idm = calculate_idm(subset, new_weights)
weights_ts = weights_ts*idm
weight_list.append(weights_ts)
weight_df = pd.concat(weight_list, axis=0)
results = portfolio_with_weights_df(strategy_return_df, weight_df)
all_curves["is"] = results
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment