# Import the python libraries
import pandas as pd
import numpy as np
import math
from datetime import datetime
import matplotlib.pyplot as plt
Securities = "MSFT AAPL AMZN GOOG NVDA BRK-A JNJ V PG JPM UNH MA INTC VZ HD T PFE MRK PEP WMT BAC XOM DIS KO CVX CSCO CMCSA WFC BA ADBE"
Start = "2016-05-30"
End = "2020-06-30"
#Start = "2010-06-30"
#End = "2020-06-30"
# Select File Type for upload of Security Data
filetype =".csv"
# Specify Local Storage Location
path = r"C:\Users\delga\Desktop\NYU\CQF_Work\Portfolio_Management"
# Convert data parameters to string for file naming purposes
Sec_Dates = Securities,Start, End
def convertTuple(tup):
str = '_'.join(tup)
return str
conv = convertTuple(Sec_Dates)
# Converted data parameters + filetype = filename
filename = conv+filetype
# Join path, filename & filetype for single reference "File"
import os
File = os.path.join(path,filename)
print(File)
import yfinance as yf
data = yf.download(Securities, start=Start, end=End)
# View data
data
# Save data to named file
data.to_csv(File)
# Inspecting the first 3 lines and all columns lines of the saved CSV file
f =open(File,"r")
f.readlines()[:3]
#The filename passed to the pd.read_csv() function creates the daily price dataframe.
#Specified that the first two rows shall be handled as headers.
#Specified that the first column shall be handled as an index.
#Specified that the index values are of type datetime
df_csv = pd.read_csv(File, header= [0,1], index_col=0, parse_dates=True,)
df_csv.info()
df_csv
# Define string and substring to count securities in portfolio. Reduce Dataframe to daily adj close for 30 securities
string = Securities
substring = " "
Sec_count = string.count(substring)+1
df_csv = df_csv.iloc[:,0:Sec_count]
df_csv
# Check Column names
for col in df_csv.columns:
print(col)
# Create single level header from multilevel header
df_csv.columns = df_csv.columns.map('|'.join).str.strip('|')
df_csv.columns = df_csv.columns.str.replace(r'Adj Close|$', '')
df_csv.columns = df_csv.columns.str.lstrip('|') # strip suffix at the right end only.
# Check column names
print(df_csv.columns)
# View Adj Close Dataframe
df_csv
# View metadata
df_csv.info()
# Identify null values in dataset
df_csv.isnull().any()
# Drop null values in dataset
df_csv = pd.DataFrame(df_csv.dropna().round(4))
df_csv.info()
# Plot Daily Price Evolution
df_csv.plot(figsize=(12, 60), subplots=True);
# Calculate and plot daily returns
returns_daily = df_csv.pct_change()
returns_daily.plot(figsize=(12, 60), subplots=True);
# Calculate and plot monthly returns (from first day of each mth)
"""Date Offset
"""
# Take prices at start of month...
prices_BOM = df_csv.resample("BMS").first()
prices_BOM
# ...use to calculate monthly returns
ind_return = prices_BOM.pct_change()
ind_return
# Remove null values (first month)
ind_return = ind_return.dropna().round(6)
ind_return
# Format datetime index
ind_return.index = pd.to_datetime(ind_return.index, format="%Y%m").to_period('M')
ind_return
# plot monthly returns
ind_return.plot.bar(figsize=(12, 60), subplots=True);
#prices_BOM.to_excel("mktcap_2008_2020.xlsx", sheet_name='Prices_BOM')
#Import Monthly Market Capitalization data
ind_mktcap = pd.read_excel("mktcap_2008_2020.xlsx", sheet_name='Mkt_Cap', index_col=0, parse_dates=True)
#Slice by specified starting and ending dates
ind_mktcap =ind_mktcap.loc[Start:End]
ind_mktcap
# Datetime format for index to ensure consistency
ind_mktcap.index = pd.to_datetime(ind_mktcap.index, format="%Y%m").to_period('M')
ind_mktcap
# Compute and inspect price evolution of benchmark expressed in terms of capitalization
total_mktcap = ind_mktcap.sum(axis="columns")
total_mktcap.plot(figsize=(12,6), title='Evolution of Market Capitalization of Index' );
# Compute benchmark capweights by dividing the total mkt cap at each date by each security mkt cap
ind_capweight = ind_mktcap.divide(total_mktcap, axis="rows")
ind_capweight = ind_capweight.iloc[0:]
ind_capweight
#Check that all rows add to one by summing each all elements in each row
ind_capweight.sum(axis="columns")
# Compute monthly market return
total_market_return = (ind_capweight * ind_return).sum(axis="columns")
total_market_return
# Plot monthly market return
total_market_return.plot.bar(figsize=(12,6), title='Monthly Returns Cap-Weighted Index' );
# Plot cumulative market return
total_market_index = 1000*(1+total_market_return).cumprod()
total_market_index.plot(figsize=(12,6), title='Cumulative Return Cap-Weighted Index' );
# Number of Securities in EW Index
n_ew = ind_return.shape[1]
# Weight of Securities in EW Index
w_ew = np.repeat(1/n_ew, n_ew)
ind_equalweight = ind_capweight.multiply(1/ind_capweight/n_ew, axis="rows")
ind_equalweight
# Calculate monthly return of EW index
total_eqweighted_return = (ind_equalweight * ind_return).sum(axis="columns")
total_eqweighted_return.plot.bar(figsize=(12,6), title='Monthly Returns Equal-Weighted Index' );
# Calculate cumulative return of equal-weighted index
total_eqweighted_index = 1000*(1+total_eqweighted_return).cumprod()
total_eqweighted_index.plot(figsize=(12,6), title='Cumulative Return Equal-Weighted Index' );
# Compare cumulative return of cap-weighted and equal-weighted index
total_market_index.plot(figsize=(12,6),title="Market Cap Weighted Index", label="Mkt-weighted", legend=True)
total_eqweighted_index.plot(title="Equal Cap Weighted Vs. Market Cap Weighted Indices", label="Eq-weighted", legend=True);
def annualize_rets(r, periods_per_year):
"""
Gives the annualized return. Takes a times series of returns and their periodicity as arguments
"""
compounded_growth = (1+r).prod()
n_periods = r.shape[0]
return compounded_growth**(periods_per_year/n_periods)-1
def annualize_vol(r, periods_per_year):
"""
Gives the annualized volatility. Takes a times series of returns and their periodicity as arguments.
"""
return r.std()*(periods_per_year**0.5)
rf = 0.00
ann_factor = 12
er = annualize_rets(ind_return, ann_factor) # Expected annualized return of securities
ev = annualize_vol(ind_return, ann_factor) # Expected annualized volatility of securities
corr = ind_return.corr()
cov = ind_return.cov()
covmat_ann = cov*(ann_factor) # Annualized covariance matrix
# Expected annualized return of securities
er
# Expected annualized volatility of securities
ev
# Correlation matrix
corr
# Covariance matrix
cov
# Annualised covariance matrix
covmat_ann
def sharpe_ratio(r, riskfree_rate, periods_per_year):
"""
Computes the annualized sharpe ratio of a set of returns
"""
# convert the annual riskfree rate to per period
rf_per_period = (1+riskfree_rate)**(1/periods_per_year)-1
excess_ret = r - rf_per_period
ann_ex_ret = annualize_rets(excess_ret, periods_per_year)
ann_vol = annualize_vol(r, periods_per_year)
return ann_ex_ret/ann_vol
import scipy.stats
def is_normal(r, level=0.01):
"""
Applies the Jarque-Bera test to determine if a Series is normal or not
Test is applied at the 1% level by default
Returns True if the hypothesis of normality is accepted, False otherwise
"""
if isinstance(r, pd.DataFrame):
return r.aggregate(is_normal)
else:
statistic, p_value = scipy.stats.jarque_bera(r)
return p_value > level
def drawdown(return_series: pd.Series):
"""Takes a time series of asset returns.
returns a DataFrame with columns for
the wealth index,
the previous peaks, and
the percentage drawdown
"""
wealth_index = 1000*(1+return_series).cumprod()
previous_peaks = wealth_index.cummax()
drawdowns = (wealth_index - previous_peaks)/previous_peaks
return pd.DataFrame({"Wealth": wealth_index,
"Previous Peak": previous_peaks,
"Drawdown": drawdowns})
def semideviation(r):
"""
Returns the semideviation aka negative semideviation of r
r must be a Series or a DataFrame, else raises a TypeError
"""
if isinstance(r, pd.Series):
is_negative = r < 0
return r[is_negative].std(ddof=0)
elif isinstance(r, pd.DataFrame):
return r.aggregate(semideviation)
else:
raise TypeError("Expected r to be a Series or DataFrame")
def var_historic(r, level=5):
"""
Returns the historic Value at Risk at a specified level
i.e. returns the number such that "level" percent of the returns
fall below that number, and the (100-level) percent are above
"""
if isinstance(r, pd.DataFrame):
return r.aggregate(var_historic, level=level)
elif isinstance(r, pd.Series):
return -np.percentile(r, level)
else:
raise TypeError("Expected r to be a Series or DataFrame")
def cvar_historic(r, level=5):
"""
Computes the Conditional VaR of Series or DataFrame
"""
if isinstance(r, pd.Series):
is_beyond = r <= -var_historic(r, level=level)
return -r[is_beyond].mean()
elif isinstance(r, pd.DataFrame):
return r.aggregate(cvar_historic, level=level)
else:
raise TypeError("Expected r to be a Series or DataFrame")
from scipy.stats import norm
def var_gaussian(r, level=5, modified=False):
"""
Returns the Parametric Gauusian VaR of a Series or DataFrame
If "modified" is True, then the modified VaR is returned,
using the Cornish-Fisher modification
"""
# compute the Z score assuming it was Gaussian
z = norm.ppf(level/100)
if modified:
# modify the Z score based on observed skewness and kurtosis
s = skewness(r)
k = kurtosis(r)
z = (z +
(z**2 - 1)*s/6 +
(z**3 -3*z)*(k-3)/24 -
(2*z**3 - 5*z)*(s**2)/36
)
return -(r.mean() + z*r.std(ddof=0))
def skewness(r):
"""
Alternative to scipy.stats.skew()
Computes the skewness of the supplied Series or DataFrame
Returns a float or a Series
"""
demeaned_r = r - r.mean()
# use the population standard deviation, so set dof=0
sigma_r = r.std(ddof=0)
exp = (demeaned_r**3).mean()
return exp/sigma_r**3
def kurtosis(r):
"""
Alternative to scipy.stats.kurtosis()
Computes the kurtosis of the supplied Series or DataFrame
Returns a float or a Series
"""
demeaned_r = r - r.mean()
# use the population standard deviation, so set dof=0
sigma_r = r.std(ddof=0)
exp = (demeaned_r**4).mean()
return exp/sigma_r**4
sharpe_ratio(ind_return, rf, ann_factor).sort_values().plot.bar(title="Sharpe Ratio");
var_gaussian(ind_return, level=5, modified=False).sort_values().plot.bar(title='Parametric (Gaussian) VaR @ 95%');
var_gaussian(ind_return, level=5, modified=True).sort_values().plot.bar(title='Modified (Cornish-Fisher) VaR @ 95%');
var_historic(ind_return, level=5).sort_values().plot.bar(title='Historic VaR @ 95%');
cvar_historic(ind_return, level=5).sort_values().plot.bar(title='Conditional VaR @ 95%');
semideviation(ind_return).sort_values().plot.bar(title='Negative Semi-Deviation');
skewness(ind_return).sort_values().plot.bar(title="Skewness")
kurtosis(ind_return).sort_values().plot.bar(title="Kurtosis");
# Empirical Distribution of monthly returns
ind_return.hist(figsize=(18,20))
plt.show()
# Check for normality of returns
is_normal(ind_return, level=0.01)
# Inspect normality of returns
from scipy import stats
for column in ind_return:
stats.probplot(ind_return[column], dist="norm", plot=plt)
plt.show()
# Define functions for portfolio return and volatility
def portfolio_return(weights, returns):
"""
Computes the return on a portfolio from constituent returns and weights
"""
return weights.T @ returns
def portfolio_vol(weights, covmat):
"""
Computes the vol of a portfolio from a covariance matrix and constituent weights
"""
vol = (weights.T @ covmat @ weights)**0.5
return vol
# Program to return optimal weights for maximization of Sharpe ratio
from scipy.optimize import minimize
def msr(riskfree_rate, er, cov):
"""
Returns the weights of the portfolio that gives you the maximum sharpe ratio
given the riskfree rate, an expected returns vector and a covariance matrix
"""
n = er.shape[0] # Input for initial guess
init_guess = np.repeat(1/n, n) # Equal Weighting for init_guess
bounds = ((0.0, 1.0),) * n # Minimum and maximum individual allocation (No shorting constraint)
# Define the constraint: Sum of portfolio weights variable minus one must equal zero. ("Equality" Constraint)
weights_sum_to_1 = {'type': 'eq',
'fun': lambda weights: np.sum(weights) - 1
}
def neg_sharpe(weights, riskfree_rate, er, cov):
"""
Defining the objective function which we seek to minimize:
The investor seeks weights to maximise Sharpe ratio (Excess Ret/Vol), for given return vector, cov matrix and rfr.
Equivalent to minimizing the negative of this ratio.
"""
r = portfolio_return(weights, er)
vol = portfolio_vol(weights, cov)
return -(r - riskfree_rate)/vol
# Scipy optimize function takes obj fun; init guess, input args for obj fun, constraints on total weights, boundaries
# for individual weights, the optimization method
weights = minimize(neg_sharpe, init_guess,
args=(riskfree_rate, er, cov), method='SLSQP',
options={'disp': False},
constraints=(weights_sum_to_1,),
bounds=bounds)
return weights.x
msr(0.0,er,cov).round(4)
# Program to return optimal weights to minimize vol for a given target return
def minimize_vol(target_return, er, cov):
"""
Returns the optimal weights that achieve the target return
given a set of expected returns and a covariance matrix
"""
n = er.shape[0]
init_guess = np.repeat(1/n, n)
bounds = ((0.0, 1.0),) * n # an N-tuple of 2-tuples!
# construct the constraints
weights_sum_to_1 = {'type': 'eq',
'fun': lambda weights: np.sum(weights) - 1
}
return_is_target = {'type': 'eq',
'args': (er,),
'fun': lambda weights, er: target_return - portfolio_return(weights,er)
}
weights = minimize(portfolio_vol, init_guess,
args=(cov,), method='SLSQP',
options={'disp': False},
constraints=(weights_sum_to_1,return_is_target),
bounds=bounds)
return weights.x
minimize_vol(0.05,er,cov).round(4)
# Weighting scheme returning optimal weights for minimization of global min. variance
def gmv(cov):
"""
Returns the weights of the Global Minimum Volatility portfolio
given a covariance matrix
"""
n = cov.shape[0]
return msr(0, np.repeat(1, n), cov) #Exp. ret set to 1 for all securities
gmv(cov).round(4)
# Weighting scheme returning equal weighted portfolio.
def weight_ew(r):
"""
Returns the weights of the EW portfolio based on the asset returns "r" as a DataFrame
"""
n = len(r.columns)
ew = pd.Series(1/n, index=r.columns)
return ew
weight_ew(ind_return)
def optimal_weights(n_points, er, cov):
"""
Returns a list of weights that represent a grid of n_points on the efficient frontier given a range of
target returns (from the lowest expected return to the highest expected return)
"""
target_rs = np.linspace(er.min(), er.max(), n_points)
weights = [minimize_vol(target_return, er, cov) for target_return in target_rs]
return weights
def plot_ef(n_points, er, cov, style='.-', legend=False, show_cml=False, riskfree_rate=0, show_ew=False, show_gmv=False):
"""
Plots the multi-asset efficient frontier using the "optimal weights" function
"""
weights = optimal_weights(n_points, er, cov)
rets = [portfolio_return(w, er) for w in weights]
vols = [portfolio_vol(w, cov) for w in weights]
ef = pd.DataFrame({
"Returns": rets,
"Volatility": vols
})
ax = ef.plot.line(x="Volatility", y="Returns", style=style, legend=legend)
ax.set_title('Figure 1: Ex-Ante Efficient Frontier (June 2020)')
plt.xlabel('Volatility')
plt.ylabel('Returns')
if show_cml:
ax.set_xlim(left = 0)
# get MSR
w_msr = msr(riskfree_rate, er, cov)
r_msr = portfolio_return(w_msr, er)
vol_msr = portfolio_vol(w_msr, cov)
# add CML
cml_x = [vol_msr]
cml_y = [r_msr]
ax.plot(cml_x, cml_y, color='red', marker="*", linestyle='dashed', linewidth=2, markersize=18, label='msr')
plt.annotate("MSR", xy=(vol_msr, r_msr), ha='right', va='bottom', rotation=45)
if show_ew:
n = er.shape[0]
w_ew = np.repeat(1/n, n)
r_ew = portfolio_return(w_ew, er)
vol_ew = portfolio_vol(w_ew, cov)
# add EW
ax.plot([vol_ew], [r_ew], color='green', marker='o', markersize=10, label='ew')
plt.annotate("EW", xy=(vol_ew, r_ew), horizontalalignment='right', verticalalignment='bottom', rotation=45)
if show_gmv:
w_gmv = gmv(cov)
r_gmv = portfolio_return(w_gmv, er)
vol_gmv = portfolio_vol(w_gmv, cov)
# add GMV
ax.plot([vol_gmv], [r_gmv], color='goldenrod', marker="D", markersize=12, label='gmv')
plt.annotate("GMV", xy=(vol_gmv, r_gmv), horizontalalignment='right', verticalalignment='bottom', rotation=45)
return ax
# Display eff. frontier
plot_ef(100, er, covmat_ann, style='.-', legend=False, show_cml=True, riskfree_rate=rf, show_ew=True, show_gmv=True);
w_msr = msr(rf, er, covmat_ann)
r_msr = portfolio_return(w_msr, er)
vol_msr = portfolio_vol(w_msr, covmat_ann)
w_msr.round(4), r_msr ,vol_msr
w_gmv = gmv(covmat_ann)
r_gmv = portfolio_return(w_gmv, er)
vol_gmv = portfolio_vol(w_gmv, covmat_ann)
w_gmv.round(4), r_gmv, vol_gmv
n = er.shape[0]
w_ew = np.repeat(1/n, n)
r_ew = portfolio_return(w_ew, er)
vol_ew = portfolio_vol(w_ew, covmat_ann)
w_ew, r_ew, vol_ew
def sample_cov(r, **kwargs):
"""
Returns the sample covariance of the supplied returns
"""
return r.cov()
def cc_cov(r, **kwargs):
"""
Estimates a covariance matrix by using the Elton/Gruber Constant Correlation model
"""
rhos = r.corr()
n = rhos.shape[0]
# this is a symmetric matrix with diagonals all 1 - so the mean correlation is ...
rho_bar = (rhos.values.sum()-n)/(n*(n-1))
ccor = np.full_like(rhos, rho_bar)
np.fill_diagonal(ccor, 1.)
sd = r.std()
return pd.DataFrame(ccor * np.outer(sd, sd), index=r.columns, columns=r.columns)
def shrinkage_cov(r, delta=0.5, **kwargs):
"""
Covariance estimator that shrinks between the Sample Covariance and the Constant Correlation Estimators
"""
prior = cc_cov(r, **kwargs)
sample = sample_cov(r, **kwargs)
return delta*prior + (1-delta)*sample
shrink_cov = shrinkage_cov(ind_return, delta=0.5)
shrink_cov
shrink_cov_ann = shrink_cov*(ann_factor)
shrink_cov_ann
# Reconstruct eff. frontier with shrunken covar. matrix
plot_ef(100, er, shrink_cov_ann, style='.-', legend=False, show_cml=True, riskfree_rate=rf, show_ew=True, show_gmv=True);
w_msr_shrink = msr(rf, er, shrink_cov_ann)
r_msr_shrink = portfolio_return(w_msr_shrink, er)
vol_msr_shrink = portfolio_vol(w_msr_shrink, shrink_cov_ann)
w_msr_shrink.round(4), r_msr_shrink, vol_msr_shrink
w_gmv_shrink = gmv(shrink_cov_ann)
r_gmv_shrink = portfolio_return(w_gmv_shrink, er)
vol_gmv_shrink = portfolio_vol(w_gmv_shrink, shrink_cov_ann)
w_gmv_shrink.round(4), r_gmv_shrink, vol_gmv_shrink
def risk_contribution(w,cov):
"""
Compute the relative contributions to risk of the constituents of a portfolio, given a set of portfolio weights
and a covariance matrix
"""
total_portfolio_var = portfolio_vol(w,cov)**2
# Marginal contribution of each constituent to portfolio variance
marginal_contrib = cov@w
# Relative contribution of each constituent to portfolio variance (risk)
risk_contrib = np.multiply(marginal_contrib,w.T)/total_portfolio_var
return risk_contrib
# Relative Risk Contributions of equally weighted portfolio given sample covar. matrix (Should sum to 1)
RRC_ew = risk_contribution(weight_ew(ind_return), cov)
RRC_ew
# Check sum to 1
sum(RRC_ew)
# Visualize RRC EW Portfolio
risk_contribution(weight_ew(ind_return), cov).plot.bar(title="Relative (%) Risk Contributions of an EW portfolio");
# Visualize constituent weights of EW Portfolio (Should be equal)
weight_ew(ind_return).plot.bar(title="Asset Weights - EW portfolio");
# Portfolio vol of equally weighted portfolio given sample covar. matrix
Port_vol_ew = portfolio_vol(weight_ew(ind_return), cov)
Port_vol_ew
# Risk Contributions (Component Risk) of constituents of ew portfolio (should sum to portfolio vol)
RC_ew = RRC_ew * Port_vol_ew
RC_ew.plot.bar(title="Component Risk of an EW portfolio");
# Check
sum(RC_ew)
from scipy.optimize import minimize
def target_risk_contributions(target_risk, cov):
"""
Returns a portfolio with constituent security weights such
that their risk contributions to the portfolio are as close as possible to
the target_risk contributions for a given the covariance matrix.
"""
n = cov.shape[0]
init_guess = np.repeat(1/n, n)
bounds = ((0.0, 1.0),) * n # an N-tuple of 2-tuples
# construct the constraints
weights_sum_to_1 = {'type': 'eq',
'fun': lambda weights: np.sum(weights) - 1
}
def msd_risk(weights, target_risk, cov):
"""
The objective function: Minimise the Sum of Squared Differences in the risk contributions to the portfolio
and the target_risk contributions via the asset weights decision variable
"""
w_contribs = risk_contribution(weights, cov)
return ((w_contribs-target_risk)**2).sum()
weights = minimize(msd_risk, init_guess,
args=(target_risk, cov), method='SLSQP',
options={'disp': False},
constraints=(weights_sum_to_1,),
bounds=bounds)
return weights.x
def equal_risk_contributions(cov):
"""
Returns the weights of the portfolio that equalizes the risk contributions
of the constituents based on the given covariance matrix
"""
n = cov.shape[0]
return target_risk_contributions(target_risk=np.repeat(1/n,n), cov=cov)
def weight_erc(r, cov_estimator=sample_cov, **kwargs):
"""
Produces the weights of the ERC portfolio given a returns series and covariance matrix strucrure.
"""
est_cov = cov_estimator(r, **kwargs)
return equal_risk_contributions(est_cov)
# RRC of ERC portfolio (Should be equal and sum to 1)
RRC_erc = risk_contribution(equal_risk_contributions(cov), cov)
RRC_erc.plot.bar(title="Relative (%) Risk Contributions of an ERC portfolio");
# Portfolio composition of ERC strategy. (Numpy array)
weight_erc(ind_return, cov_estimator=sample_cov)
# Portfolio composition of ERC strategy. (DataFrame)
numpy_weight_erc = weight_erc(ind_return, cov_estimator=sample_cov)
df_weight_erc = pd.DataFrame(data=numpy_weight_erc, index=ind_return.columns, columns=["ERC Asset Allocation"])
df_weight_erc
df_weight_erc.plot.bar(title="Asset Weights - ERC portfolio", legend=False);
# Portfolio vol of ERC strategy
Port_vol_erc = portfolio_vol(weight_erc(ind_return), cov)
Port_vol_erc
# Risk Contribution ERC strategy (Should be equal and sum to portfolio vol)
RC_erc = RRC_erc * Port_vol_erc
RC_erc.plot.bar(title="Component Risk of an ERC portfolio");
# Check
RC_erc.sum()
# Lookback period
BL_per_beg_1 = Start
BL_per_end_1 = End
# Market inputs: rfr. exp returns vector, sample covariance matrix
rf_1 = 0.00
ann_factor_1 = 12
er_1 = annualize_rets(ind_return[BL_per_beg_1:BL_per_end_1] , ann_factor)
ev_1 = annualize_vol(ind_return[BL_per_beg_1:BL_per_end_1], ann_factor)
corr_1 = ind_return[BL_per_beg_1:BL_per_end_1].corr()
cov_1 = ind_return[BL_per_beg_1:BL_per_end_1].cov()
# Data for Views Vector, q
View_1 = 0.20
View_2 = 0.10
View_3 = 0.05
# Data for Pick Matrix, p
Long_1 = 'T'
Short_1 = 'JPM'
Long_2 = 'V'
Short_2 = 'GOOG'
Long_3 = 'UNH'
Short_3 = 'MA'
# Specify investable universe.
assets = list(ind_return.columns)
assets
# Calculate correlation matrix and convert to Dataframe
rho = corr_1
rho
# Calculate expected volatilities of securities
vols = pd.DataFrame(ev_1, columns=["Vols"])
vols
# Market weights (optimal assumimg market equilibrium)
w_eq = ind_capweight.loc[BL_per_end_1]
w_eq
# Define prior covariance matrix (sample annualised covar matrix here)
sigma_prior = vols.dot(vols.T) * rho
sigma_prior
# Compute Equilibrium-implied returns vector and convert to series
def implied_returns(delta, sigma, w):
"""
Obtain the implied expected returns by reverse engineering the weights
Inputs:
delta: Risk Aversion Coefficient (scalar)
sigma: Variance-Covariance Matrix (N x N) as DataFrame
w: Market weights (N x 1) as Series
Returns an N x 1 vector of Returns as Series
"""
ir = delta * sigma.dot(w).squeeze() # to get a series from a 1-column dataframe
ir.name = 'Implied Returns'
return ir
# Compute Pi and compare:
pi = implied_returns(delta=2.5, sigma=sigma_prior, w=w_eq)
(pi*100).round(2)
# Populate views vector , Q: (X will outperform Y by Z%)
q = pd.Series([View_1]) # First view
# start with a single view and an empty Pick Matrix, to be overwritten with the specific pick(s) + view(s)
p = pd.DataFrame([0.]*len(assets), index=assets).T
q
p
# Pick 1
p.iloc[0][Long_1] = +1.
p.iloc[0][Short_1] = -1
(p*100).round(1)
# Add second view
view2 = pd.Series([View_2], index=[1])
q = q.append(view2)
pick2 = pd.DataFrame([0.]*len(assets), index=assets, columns=[1]).T
p = p.append(pick2)
p.iloc[1][Long_2]=+1
p.iloc[1][Short_2]=-1
np.round(p.T, 3)*100
# Add third view
view3 = pd.Series([View_3], index=[2])
q = q.append(view3)
pick3 = pd.DataFrame([0.]*len(assets), index=assets, columns=[2]).T
p = p.append(pick3)
p.iloc[2][Long_3]=+1
p.iloc[2][Short_3]=-1
np.round(p.T, 3)*100
# Calculate Omega as proportional to the variance of the prior
def proportional_prior(sigma, tau, p):
"""
Returns the He-Litterman simplified Omega
Inputs:
sigma: N x N Covariance Matrix as DataFrame
tau: a scalar
p: a K x N DataFrame linking Q and Assets
returns a P x P DataFrame, a Matrix representing Prior Uncertainties
"""
helit_omega = p.dot(tau * sigma).dot(p.T)
# Make a diag matrix from the diag elements of Omega
return pd.DataFrame(np.diag(np.diag(helit_omega.values)),index=p.index, columns=p.index)
# Program to compute the posterior expected returns based on the original black litterman reference model
from numpy.linalg import inv
def bl(w_prior, sigma_prior, p, q,
omega=None,
delta=2.5, tau=.02):
"""
# Computes the posterior expected returns based on the original black litterman reference model
# W.prior must be an N x 1 vector of weights, a Series
# Sigma.prior is an N x N covariance matrix, a DataFrame
# P must be a K x N matrix linking Q and the Assets, a DataFrame
# Q must be an K x 1 vector of views, a Series
# Omega must be a K x K matrix a DataFrame, or None
# if Omega is None, we assume it is proportional to variance of the prior
# delta and tau are scalars
"""
if omega is None:
omega = proportional_prior(sigma_prior, tau, p)
# Force w.prior and Q to be column vectors
# How many assets?
N = w_prior.shape[0]
# How many views?
K = q.shape[0]
# First, reverse-engineer the weights to get pi
pi = implied_returns(delta, sigma_prior, w_prior)
# Adjust (scale) Sigma by the uncertainty scaling factor
sigma_prior_scaled = tau * sigma_prior
# posterior estimate of the mean, use the "Master Formula"
# we use the versions that do not require
# Omega to be inverted (see previous section)
# this is easier to read if we use '@' for matrixmult instead of .dot()
# mu_bl = pi + sigma_prior_scaled @ p.T @ inv(p @ sigma_prior_scaled @ p.T + omega) @ (q - p @ pi)
mu_bl = pi + sigma_prior_scaled.dot(p.T).dot(inv(p.dot(sigma_prior_scaled).dot(p.T) + omega).dot(q - p.dot(pi).values))
# posterior estimate of uncertainty of mu.bl
#sigma_bl = sigma_prior + sigma_prior_scaled - sigma_prior_scaled @ p.T @ inv(p @ sigma_prior_scaled @ p.T + omega) @ p @ sigma_prior_scaled
sigma_bl = sigma_prior + sigma_prior_scaled - sigma_prior_scaled.dot(p.T).dot(inv(p.dot(sigma_prior_scaled).dot(p.T) + omega)).dot(p).dot(sigma_prior_scaled)
return (mu_bl, sigma_bl)
# Specify scalars
delta = 2.5
tau = 0.05
# Derive the Black Litterman Expected Returns
bl_mu, bl_sigma = bl(w_eq, sigma_prior, p, q, omega=None, delta=delta, tau= tau)
(bl_mu*100).round(2)
(bl_sigma*100).round(2)
# BL-implied Alpha : BL Exp Returns - Equilibrium Impl. Returns
((bl_mu*100) - (pi*100)).round(2)
# for convenience and readability, define the inverse of a dataframe
def inverse(d):
"""
Invert the dataframe by inverting the underlying matrix
"""
return pd.DataFrame(inv(d.values), index=d.columns, columns=d.index)
def w_msr(sigma, mu, scale=True):
"""
Optimal (Tangent/Max Sharpe Ratio) Portfolio weights
by using the Markowitz Optimization Procedure
Mu is the vector of Excess expected Returns
Sigma must be an N x N matrix as a DataFrame and Mu a column vector as a Series
This implements page 188 Equation 5.2.28 of
"The econometrics of financial markets" Campbell, Lo and Mackinlay.
"""
w = inverse(sigma).dot(mu)
if scale:
w = w/sum(w) # fix: this assumes all w is +ve
return w
# Optimal BL portfolio weights
bl_port = w_msr(bl_sigma,bl_mu)
bl_port.plot(kind='bar')
eq_port = w_msr(sigma_prior,pi, scale=True)
eq_port.plot(kind='bar')
sum(bl_port), sum(eq_port)
def w_star(delta, sigma, mu):
return (inverse(sigma).dot(mu))/delta
wstar = w_star(delta=delta, sigma=bl_sigma, mu=bl_mu)
# display w*
(wstar*100).round(4)
sum(wstar)
# Name BL optimal portfolio
alt_wstar = (w_msr(sigma=bl_sigma, mu=bl_mu,scale=True)*100).round(4)
alt_wstar
sum(alt_wstar)
# Transpose & Export for Backtesting purposes
df_alt_wstar = pd.DataFrame(alt_wstar, columns=[ind_return.index[-1]]).T
#df_alt_wstar.to_excel("BL_WEIGHTS4.5.xlsx", sheet_name=End)
df_alt_wstar
# Test: Market inputs should give market weights as output
w_eq_check = w_msr(delta*sigma_prior, pi, scale=False)
w_eq_check
np.round((w_eq - w_eq_check), 6)*100
# BL-implied Alpha : BL Exp Returns - Equilibrium Impl. Returns
Exp_Active_ret = (((bl_mu) - (pi))*(100)).round(2)
Exp_Active_ret.plot(kind='bar', title = "BL-implied Active Return");
np.round(wstar - w_eq/(1+tau), 3)
# Display the difference in Posterior and Prior weights
Active_weight = np.round(wstar - w_eq/(1+tau), 3)*100
Active_weight.plot(kind='bar', title = "BL-implied Active Weight");
d_r = {'BL': (bl_mu*100), 'Equilibrium': (pi*100)}
plotdata = pd.DataFrame(data=d_r, index=assets)
plotdata.plot(kind="bar", figsize=(16,8))
plt.title("Expected Returns");
d_w = {'BL': (wstar*100), 'Equilibrium': (w_eq*100)}
plotdata = pd.DataFrame(data=d_w, index=assets)
plotdata.plot(kind="bar", figsize=(16,8))
plt.title("Asset Weighting");
# Create Security Weighting Scheme for Black-Litterman Portfolios
ind_blcap = pd.read_excel("mktcap_2008_2020.xlsx", sheet_name='BL_WTS', index_col=0, parse_dates=True)
ind_blcap =ind_blcap.loc[Start:End]
ind_blcap.index = pd.to_datetime(ind_blcap.index, format="%Y%m").to_period('M')
total_blcap = ind_blcap.sum(axis="columns")
pd.set_option('display.max_rows', None)
ind_blweight = ind_blcap.divide(total_blcap, axis="rows")
ind_blweight = ind_blweight.iloc[0:]
ind_blweight.tail()
ind_return.tail()
total_bl_return = (ind_blweight * ind_return).sum(axis="columns")
total_bl_return.tail()
total_bl_index = drawdown(total_bl_return).Wealth
total_bl_index.plot(title="BL Weighted Index");
def backtest_ws(r, estimation_window=60, weighting=weight_ew, verbose=False, **kwargs):
"""
Backtests a given weighting scheme, given some parameters.
r : asset returns series
estimation_window: the lookback period to use to estimate parameters
weighting: the weighting scheme to use, must be a function that takes "r", and a variable number of keyword-value arguments
"""
# No of periods in returns series
n_periods = r.shape[0]
# return windows
windows = [(start, start+estimation_window) for start in range(n_periods-estimation_window)]
weights = [weighting(r.iloc[win[0]:win[1]], **kwargs) for win in windows]
# convert List of weights to DataFrame
weights = pd.DataFrame(weights, index=r.iloc[estimation_window:].index, columns=r.columns)
returns = (weights * r).sum(axis="columns", min_count=1) #mincount is to generate NAs if all inputs are NAs
return returns
def weight_ew(r, cap_weights=None, max_cw_mult=None, microcap_threshold=None, **kwargs):
"""
Returns the weights of the EW portfolio based on the asset returns "r" as a DataFrame
If supplied a set of capweights and a capweight tether, it is applied and reweighted
"""
n = len(r.columns)
ew = pd.Series(1/n, index=r.columns)
if cap_weights is not None:
cw = cap_weights.loc[r.index[0]] # starting cap weight
## exclude microcaps
if microcap_threshold is not None and microcap_threshold > 0:
microcap = cw < microcap_threshold
ew[microcap] = 0
ew = ew/ew.sum()
#limit weight to a multiple of capweight
if max_cw_mult is not None and max_cw_mult > 0:
ew = np.minimum(ew, cw*max_cw_mult)
ew = ew/ew.sum() #reweight
return ew
def weight_cw(r, cap_weights, **kwargs):
"""
Returns the weights of the CW portfolio based on the time series of capweights
"""
w = cap_weights.loc[r.index[11]]# Index number Must match Estimation Window!!!
return w/w.sum()
def weight_gmv(r, cov_estimator=sample_cov, **kwargs):
"""
Produces the weights of the GMV portfolio given a covariance matrix of the returns
"""
est_cov = cov_estimator(r, **kwargs)
return gmv(est_cov)
def weight_erc(r, cov_estimator=sample_cov, **kwargs):
"""
Produces the weights of the ERC portfolio given a covariance matrix of the returns
"""
est_cov = cov_estimator(r, **kwargs)
return equal_risk_contributions(est_cov)
def backtest_ws(r, estimation_window=60, weighting=weight_ew, verbose=False, **kwargs):
"""
Backtests a given weighting scheme, given some parameters.
r : asset returns series
estimation_window: the lookback period to use to estimate parameters
weighting: the weighting scheme to use, must be a function that takes "r", and a variable number of keyword-value arguments
"""
# No of periods in returns series
n_periods = r.shape[0]
# return windows
windows = [(start, start+estimation_window) for start in range(n_periods-estimation_window)]
weights = [weighting(r.iloc[win[0]:win[1]], **kwargs) for win in windows]
# convert List of weights to DataFrame
weights = pd.DataFrame(weights, index=r.iloc[estimation_window:].index, columns=r.columns)
returns = (weights * r).sum(axis="columns", min_count=1) #mincount is to generate NAs if all inputs are NAs
return returns
def gmv(cov):
"""
Returns the weights of the Global Minimum Volatility portfolio
given a covariance matrix
"""
n = cov.shape[0]
return msr(0, np.repeat(1, n), cov)
def weight_gmv(r, cov_estimator=sample_cov, **kwargs):
"""
Produces the weights of the GMV portfolio given a covariance matrix of the returns
"""
est_cov = cov_estimator(r, **kwargs)
return gmv(est_cov)
def weight_msr(r, cov_estimator=sample_cov, **kwargs):
"""
Produces the weights of the MSR portfolio given a ret series and covariance matrix structure
"""
est_cov = cov_estimator(r, **kwargs)
exp_ret = annualize_rets(r, 12,**kwargs)
return msr(0,exp_ret, est_cov)
# Specify Estimation Window
estimation_window=12
# Equal-Weighted Returns
ewr = backtest_ws(ind_return, estimation_window=estimation_window, weighting=weight_ew)
# Cap-Weighted Returns
cwr = backtest_ws(ind_return, estimation_window=estimation_window, weighting=weight_cw, cap_weights=ind_capweight)
# BL Returns
blr = backtest_ws(ind_return, estimation_window=estimation_window, weighting=weight_cw, cap_weights=ind_blweight)
# MSR Returns (sample cov)
MSRr_sample = backtest_ws(ind_return, estimation_window=estimation_window, weighting=weight_msr, cov_estimator=sample_cov)
# MSR Returns (shrink cov)
MSRr_shrink = backtest_ws(ind_return, estimation_window=estimation_window, weighting=weight_msr, cov_estimator=shrinkage_cov)
# GMV Returns (sample cov)
GMVr_sample = backtest_ws(ind_return, estimation_window=estimation_window, weighting=weight_gmv, cov_estimator=sample_cov)
# GMV Returns (shrink cov)
GMVr_shrink = backtest_ws(ind_return, estimation_window=estimation_window, weighting=weight_gmv, cov_estimator=shrinkage_cov)
# ERC Returns (sample cov)
ERCr_sample = backtest_ws(ind_return, estimation_window=estimation_window, weighting=weight_erc, cov_estimator=sample_cov)
# ERC Returns (shrink cov)
ERCr_shrink = backtest_ws(ind_return, estimation_window=estimation_window, weighting=weight_erc, cov_estimator=shrinkage_cov)
# Collect Sample Backtest Returns in DataFrame
btr_outsample = pd.DataFrame({
"Eq. Weighted": ewr['2017-07':],
"Cap Weighted": cwr['2017-07':],
"Max SR-Sample Cov": MSRr_sample['2017-07':],
"Max SR-Shrunk Cov": MSRr_shrink['2017-07':],
"Min Var Sample Cov": GMVr_sample['2017-07':],
"Min Var Shrunk Cov": GMVr_shrink['2017-07':],
"Risk Parity": ERCr_sample['2017-07':],
"Black Litterman": total_bl_return['2017-07':]
})
# View DataFrame
btr_outsample
# Compute Cumulative Return
cum_ret_outsample = (1+btr_outsample).cumprod()
cum_ret_outsample
# Plot Compunded Return
(1+btr_outsample).cumprod().plot(figsize=(16,12), title="Strategies Cumulative Return");
def summary_stats(r, riskfree_rate=rf):
"""
Return a DataFrame that contains aggregated summary stats for the returns in the columns of r
For each column which are having numeric values, the user-defined function is applied
"""
ann_r = r.aggregate(annualize_rets, periods_per_year=12)
ann_vol = r.aggregate(annualize_vol, periods_per_year=12)
ann_sr = r.aggregate(sharpe_ratio, riskfree_rate=riskfree_rate, periods_per_year=12)
dd = r.aggregate(lambda r: drawdown(r).Drawdown.min())
skew = r.aggregate(skewness)
kurt = r.aggregate(kurtosis)
ann_semi_dev = r.aggregate(semideviation) * math.sqrt(ann_factor)
cf_gvar5 = r.aggregate(var_gaussian, modified=False)
cf_var5 = r.aggregate(var_gaussian, modified=True)
hist_var5 = r.aggregate(var_historic)
hist_cvar5 = r.aggregate(cvar_historic)
rovol = ann_r/ann_vol
ann_sortino = ann_r/ann_semi_dev
rovar_cvar = ann_r/hist_cvar5
rocvar_cfvar = ann_r/cf_var5
radd = ann_r/-dd
return pd.DataFrame({
"Annualized Return": ann_r,
"Annualized Volatility": ann_vol,
"Ann. Semi-Dev.": ann_semi_dev,
"Skewness": skew,
"Kurtosis": kurt,
"Gaussian VaR (5%)": cf_gvar5,
"Modified VaR (5%)": cf_var5,
"Historic VaR (5%)": hist_var5,
"Historic CVaR (5%)": hist_cvar5,
"Max Drawdown": dd,
"Sharpe Ratio": ann_sr,
"Sortino Ratio": ann_sortino,
})
# Display Results
Perf_summary_samp = summary_stats(btr_outsample.dropna())
Perf_summary_samp.style.set_precision(4).set_caption("Performance Summary").set_table_styles([{'selector': 'caption',
'props': [('color', 'red'),
('text-align', 'center'),
('font-weight', 'bold'),
('font-size', '22px')]}])