-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathrate_index.py
332 lines (276 loc) · 13.6 KB
/
rate_index.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
#-*- coding:utf-8 -*-
from abc import abstractmethod
from repoze.lru import lru_cache
from misc import *
from utilities import *
from rate_options import BlackLognormalModel
from curve import DiscountCurve
import numpy as np
from scipy.stats import norm
from scipy.optimize import brentq as solver
import copy
class IndexFactory(object):
def __init__(self):
self.fixing_pool = {}
def set_fixing(self, date, rate):
if isinstance(date, Date):
self.fixing_pool[date] = rate
else:
self.fixing_pool[Date(date)] = rate
def get_fixing(self, date):
try:
return self.fixing_pool[date]
except:
return None
def remove_fixing(self, date):
self.fixing_pool.pop(date)
@abstractmethod
def create(self, period):
pass
class Index(object):
def __init__(self, fixing):
self.fixing = fixing
@abstractmethod
def forward(self, proj):
pass
@print_self(4, 8)
def __repr__(self):
pass
class FixedIndexFactory(IndexFactory): # Simply follows accrual schedule
def __init__(self, fixrate=0.0):
super(FixedIndexFactory, self).__init__()
self.fixrate = fixrate
def create(self, period):
fixing = self.get_fixing(period.fixdate) # for predefined stepping fixed rates
if fixing is None:
fixing = self.fixrate
return self.Index(fixing)
class Index(IndexFactory.Index):
def forward(self, proj): # proj is a dummy for fixed rate index
return self.fixing # fixed rate
class FloatIndexFactory(IndexFactory):
def create(self, period): # simply follows payment schedule
fixing = self.get_fixing(period.fixdate)
return self.Index(fixing, period.fixdate, period.accr_end)
class Index(IndexFactory.Index):
def __init__(self, fixing, fixdate, effdate, matdate, coverage):
super(FloatIndexFactory.Index, self).__init__(fixing)
self.fixdate = fixdate
self.effdate = effdate
self.matdate = matdate
self.coverage = coverage
def forward(self, proj):
if self.fixing is not None:
return self.fixing
elif self.effdate.t >= proj.t0:
return (proj(self.effdate.t) / proj(self.matdate.t) - 1) / self.coverage
else:
raise BaseException('missing fixing on %s ...' % self.fixdate)
class FwdCurveIndexFactory(IndexFactory):
def create(self, period): # simply follows payment schedule
fixing = self.get_fixing(period.fixdate)
return self.Index(fixing, period.fixdate, period.accr_start, period.accr_end, period.accr_cov)
class Index(IndexFactory.Index):
def __init__(self, fixing, fixdate, matdate):
super(FwdCurveIndexFactory.Index, self).__init__(fixing)
self.fixdate = fixdate
self.matdate = matdate
def forward(self, fwdcurve):
if self.fixing is not None:
return self.fixing
elif self.effdate.t >= fwdcurve.t0:
return fwdcurve(self.matdate.t)
else:
raise BaseException('missing fixing on %s ...' % self.fixdate)
class LiborIndexFactory(FloatIndexFactory):
def __init__(self, tenor, calendar, daycount, spotlag='2D', dayroll=DayRoll.ModifiedFollowing, endmonth=True):
super(LiborIndexFactory, self).__init__()
self.tenor = Period(tenor)
self.daycount = daycount
spotlag = Period(spotlag)
def roll_date(startdate, period):
if isinstance(period, ql.Period):
date = calendar.advance(startdate, period, dayroll, endmonth)
elif period == 'spot':
date = calendar.advance(startdate, spotlag, dayroll)
elif period == '-spot':
date = calendar.advance(startdate, -spotlag, dayroll)
return Date.convert(date)
self.roll_date = roll_date
def create(self, period=None, fixdate=None):
if period is not None:
fixdate = period.fixdate
fixing = self.get_fixing(fixdate)
effdate = self.roll_date(fixdate, 'spot')
matdate = self.roll_date(effdate, self.tenor) # normally matdate = paydate for ibor
coverage = self.daycount.yearFraction(effdate, matdate)
return self.Index(fixing, fixdate, effdate, matdate, coverage)
class RangeIndexFactory(FloatIndexFactory): ##### need extra work for aged deal...
def __init__(self,
hw=None,
baserate=None,
index_fac=None, # range leg rate index factory, e.g. Libor3M
rng_def=None, # defines the rate range for range swap, e.g. ('outside',0.01,0.02)
num_days='calendar', # count of days in a period, e.g. 'calendar' for calendar days, 'business' for business days
two_ends=(None,None),# defines the dates in a period to calculate the range theta, e.g. slice(1,-1) excludes start and end dates
lockout=('skipped',None), # lock-out period, e.g. ('crystallized', -5) or ('skipped', -3)
digi_gap=1e-5, # strike gap of digital spread, e.g. 1bp
**kwargs):
super(RangeIndexFactory, self).__init__(**kwargs)
self.baserate = baserate
self.hw = hw
assert isinstance(index_fac, LiborIndexFactory)
self.index_fac = index_fac
if len(rng_def) == 2:
self.rng_type, self.rng_rate = rng_def
else:
self.rng_type, self.rng_low, self.rng_high = rng_def
self.num_days = num_days
self.two_ends = slice(*two_ends)
self.lk_type, self.lk_days = lockout
self.digi_gap = digi_gap
def __range_libors(self, effdate, matdate):
dates = [effdate + i for i in range(matdate - effdate + 1)] # all calendar days
if self.num_days == 'business': # for business days
dates = [day for day in dates if self.index_fac.calendar.isBusinessDay(day)]
dates = dates[self.two_ends] # include or exclude the end dates of a period
libors = tuple(self.index_fac.create(fixdate=date) for date in dates)
if PRINT and self.lk_days is not None:
print(" lockout %s --> [%s, %s]" % (dates[self.lk_days], dates[self.lk_days + 1], dates[-1]))
return libors
@lru_cache(maxsize=100)#@cache_function(1)
def __parse_libors(self, libors): # paytime is dummy argument for hashing
ts = HashableArray([l.effdate.t for l in libors])
te = HashableArray([l.matdate.t for l in libors])
cov = np.array([l.coverage for l in libors])
return ts, te, cov
@lru_cache(maxsize=100)
def __xi(self, t0, ts, te): # paytime is dummy argument for hashing; = xi2(t0,ts,ts,te), and xi2(t0,ts,ts,ts) = 0; avoid zero
return np.array([self.hw.vol.xi2(t0,a,a,b) ** 0.5 for a, b in zip(ts, te)]) + 1e-20 # avoid zero
def theta_fn(self, proj, libors, paydate): # theta of one coupon period
ts, te, cov = self.__parse_libors(libors)
eta = (te - paydate.t) / (te - ts)
p = 1.0 + np.array([l.forward(proj) for l in libors]) * cov #p = proj(ts) / proj(te) # = (1 + libor * cov)
xi = self.__xi(proj.t0, ts, te) # proj.t0 is spotdate of curve
def capfloor(k, s): # forward/undiscounted caplet(s=1)/floorlet(s=-1) price
zstar = np.log((1 + k) / p) / xi - xi / 2
return s * (p * norm.cdf(-zstar * s) - (1 + k) * norm.cdf(-(zstar + xi) * s))
def digital(k, s): # s=1 ==> 1 for above k; s=-1 ==> 1 for below k
km = cov * (k - self.digi_gap * s)
kp = cov * (k + self.digi_gap * s)
return (1 + eta * kp) * capfloor(km,s) - (1 + eta * km) * capfloor(kp,s)
denom = 2 * self.digi_gap * cov #* (1 + eta * (p - 1)) # p - 1 = libor * cov
if self.rng_type == 'between': # inside; == (digital(rng_high,-1) - digital(rng_low,-1)) / denom
ratio = (digital(self.rng_low,1) - digital(self.rng_high,1)) / denom
elif self.rng_type == 'outside': # outside
ratio = (digital(self.rng_low,-1) + digital(self.rng_high,1)) / denom
elif self.rng_type == 'above': # above rate
ratio = digital(self.rng_rate,1) / denom
elif self.rng_type == 'below': # below rate
ratio = digital(self.rng_rate,-1) / denom
if self.lk_type == 'skipped':
ratio = ratio[:self.lk_days]
elif self.lk_type == 'crystallized':
ratio[self.lk_days:] = ratio[self.lk_days]
return np.mean(ratio)
def create(self, period): # accrual startdate and enddate
libors = self.__range_libors(period.accr_start, period.accr_end)
return self.Index(self.baserate, self.theta_fn, libors, period.paydate)
class Index(IndexFactory.Index):
def __init__(self, baserate, theta_fn, libors, paydate):
self.baserate = baserate
self.theta_fn = theta_fn
self.libors = libors
self.paydate = paydate
def forward(self, proj):
return self.baserate * self.theta_fn(proj, self.libors, self.paydate)
class CMSIndexFactory(FixedIndexFactory):
__shift_clamp = 1.0
class Option:
Cap = 1
Floor = -1
def __init__(self, today, proj, disc, swpnsurf,
index_fac, # cms index swap factory
kappa=0.0,
nswpns=50):
super(CMSIndexFactory, self).__init__()
self.today = today
self.proj = proj
self.disc = disc
self.index_fac = index_fac
self.swpnsurf = swpnsurf
self.nswpns = nswpns
def __b(t, T):
if kappa > 5e-5: # this is about the optimal cutoff value for kappa
return (1 - np.exp(-kappa * (T - t))) / kappa
else:
return T - t
self.__b = __b
def __strikes_and_weights(self, opt, swaption, strike, period):
if isinstance(swaption, BlackLognormalModel): # lognormal model
strike_at_maxvega = swaption.forward * np.exp(swaption.stdev ** 2 / 2) # at which max vega occurs
lower_bound = 1e-10 # rate must be positive
else: # normal model
strike_at_maxvega = swaption.forward # at which max vega occurs
lower_bound = -1e2 # rate can be negative
cutoff_vega = swaption.vega(strike_at_maxvega) / 100.0 # cutoff at vega that is 100 times smaller
find_strike = lambda k: swaption.vega(k) - cutoff_vega
if opt == self.Option.Cap:
cutoff_rate = solver(find_strike, strike_at_maxvega, 1e2) # to high end
else: # == self.Option.Floor
cutoff_rate = solver(find_strike, lower_bound, strike_at_maxvega) # to low end
def curves(t, h): # bumped by h, new curve with (t, T, h)
def bump(curve):
def newcurve(T): # T can be float or Numpy.Array
return curve(T) * np.exp(-h * self.__b(t, T)) # curve(t) = 1.0
return DiscountCurve.from_fn(t, newcurve)
return bump(self.proj), bump(self.disc)
def rate_to_h(swaprate):
def find_h(h):
newproj, newdisc = curves(period.fixdate.t, h)
return swaption.underlier.pleg_parrate(newproj, newdisc) - swaprate
try: # clamped to avoid double precision overflow
return solver(find_h, -self.__shift_clamp, self.__shift_clamp)
except ValueError:
return self.__shift_clamp # only positive h causes trouble
h_stt = rate_to_h(strike)
h_end = rate_to_h(cutoff_rate) # clamped if too large
n = self.nswpns
H = np.linspace(h_stt, h_end, n + 1)
K = np.zeros_like(H)
G = np.zeros_like(H)
for i, h in enumerate(H):
newproj, newdisc = curves(period.fixdate.t, h)
K[i] = swaption.underlier.pleg_parrate(newproj, newdisc)
G[i] = period.accr_cov * newdisc(period.paydate.t) / swaption.underlier.pleg.get_annuity(newdisc)
GK = G * (K - K[0]) # [g * (k - K[0]) for g, k in zip(G, K)]
W = np.zeros(n)
for i in range(1, n):
W[i - 1] = (GK[i] - W[:i].dot(K[i] - K[:i])) / (K[i] - K[i - 1])
return K[:-1], W # swaption at K[n] is not used
def cms_rate(self, period, strike=None):
cms_index_swap = self.index_fac.create(tradedate=period.fixdate)
swaption = self.swpnsurf.get_swaption(cms_index_swap)
if period.fixdate == self.today:
rate = swaption.forward
else:
if strike is None:
strike = swaption.forward
def weighted_sum(opt):
K, W = self.__strikes_and_weights(opt, swaption, strike, period)
return sum(w * swaption.value(opt, k) for k, w in zip(K, W))
cap = weighted_sum(self.Option.Cap)
floor = weighted_sum(self.Option.Floor)
rate = strike + (cap - floor) / (period.accr_cov * self.disc(period.paydate.t))
if PRINT: print('cms, par, covexity: %s\t%s\t%s' % (rate, swaption.forward, rate - swaption.forward))
return rate
def create(self, period):
fixing = self.get_fixing(period.fixdate)
if fixing is not None:
return self.Index(fixing)
elif period.fixdate >= self.today:
return self.Index(self.cms_rate(period))
else:
raise BaseException('missing fixing on %s ...' % period.fixdate)
if __name__ == '__main__':
pass