功用:
主程序文件,负责读取数据、运行BSM和Heston模型、计算误差率、并进行可视化展示。
依赖关系:
依赖BSM_Model和Heston_Model类来计算期权价格。 依赖pandas和matplotlib库进行数据处理和可视化。
重要部分:
数据读取和预处理。 运行BSM模型和Heston模型,计算误差率。 可视化误差率随时间的变化和两个模型的平均误差率比较。
功用:
利用模拟退火算法寻找Heston模型的最优参数,使市场价格和模型价格的均方误差最小化。
依赖关系:
依赖Heston_Model类来计算期权价格。 依赖pandas库进行数据处理。
重要部分:
random_range函数:在一定范围内随机调整参数。 NG类:实现模拟退火算法的核心逻辑。 NGHeston类:继承自NG类,专门用于Heston模型参数优化。 SV_SA类:封装了模拟退火算法的具体实现。 getBestPara函数:读取数据并调用模拟退火算法寻找最优参数。
功用:
定义Heston模型类,用于计算期权价格。
依赖关系:
依赖numpy和scipy.integrate库进行数学计算。
重要部分:
Heston_Model类:包含初始化方法和计算期权价格的核心方法。 characteristic_function方法:计算特征函数。 integral_function方法:计算积分部分。 P_Value方法:计算p值。 Call_Value方法:计算看涨期权价格。
功用:
定义BSM模型类,用于计算期权价格。
依赖关系:
依赖scipy.stats和numpy库进行数学计算。
重要部分:
BSM_Model类:包含初始化方法和计算期权价格的核心方法。 d1和d2方法:计算中间变量d1和d2。 price方法:计算看涨或看跌期权价格。 互相的依赖关系 main.py依赖BSM_Model.py和Heston_Model.py来计算期权价格。 bestpara.py依赖Heston_Model.py来计算期权价格,并通过模拟退火算法优化Heston模型的参数。
BSM模型仅需要期权本身参数,可直接调用BSM_Model.py
Heston模型除了需要期权本身参数外,还需要拟合参数,该拟合参数由bestpara.py得到。先运行bestpara.py得到较优数据(即当前历史最优解)后,再运行main.py进行测试和绘图。
关于数据:16年至20年上证50ETF期权数据。在这里使用了2020-6-12至2020-9-12的数据用于训练拟合参数;2020-9-12至2020-12-12的数据为测试数据。
关于训练拟合参数:论文在1w多次循环后才得到结果,我这边跑了160个循环,循环越多理论上效果更好。
关于可视化绘图:
- BSM模型与Heston模型的平均误差率随时间变化图(折线图)
- BSM模型与Heston模型的总平均误差率比较(柱状图)
- 定义辅助函数:
random_range(x, a, b)
:对输入的x
增加一个随机变动量,并确保其在[a, b]
范围内。
- 定义模拟退火算法类
NG
:__init__(self, func, x0)
:初始化算法的参数,包括初始解x0
、目标函数func
、温度参数等。T_change(self)
:温度下降函数。save_xy(self)
和save_best_xy(self)
:保存当前解和最优解。__p_delta(self, alpha)
和__find_solver(self, func, f0)
:计算接受新解的概率和用二分法寻找方程解。find_alpha(self)
:寻找调节概率因子alpha
。get_x_new(self)
:生成新的解。judge(self)
:判断是否接受新的解。get_history_best_xy(self)
:获取历史最优解。plot_best(self)
:绘制最优值变化过程。count_times_delta_smaller(self)
:统计新旧函数值之差的绝对值连续小于某值的次数。condition_end(self)
:判断是否满足终止条件。run_T(self)
:在某一特定温度下进行循环。accept_best_xf(self)
:在每个温度下的循环结束时,有一定概率将当前接受的新解替换为历史最优解。run(self)
:运行模拟退火算法。
- 定义继承自
NG
的NGHeston
类:__init__(self, func, x0)
:初始化参数,设置特定的温度参数。get_x_new(self)
:生成新的解,并对 Heston 模型的参数进行范围限制。
- 定义
SV_SA
类:__init__(self, data, v0, kappa, theta, sigma, rho)
:初始化数据和初始参数。error_mean_percent(self, init_params)
:计算 Heston 模型期权定价的百分比误差均值。error_mean(self, init_params)
:计算 Heston 模型期权定价的均方误差。test_error_mean(self, multiple_parmas)
:计算多组初始参数的均方误差。test_option_price(self, multiple_parmas)
:计算多组期权数据和初始参数的期权价格。sa(self)
:对均方误差函数用模拟退火算法计算最优值。
- 定义主函数
getBestPara()
:- 读取期权数据,选择指定日期和类型的数据。
- 建立
SV_SA
类并开始训练模型,寻找最优解。 - 返回最优解。
在该文件中涉及到一些复杂函数。
def run(self):
while self.T > self.T_min:
self.run_T() # 循环在该温度下的求解
self.xf_best_T[self.T] = [
self.get_history_best_xy()
] # 记录在每一个温度下的最优解
self.T_change() # 温度继续下降
self.accept_best_xf() # 当每个温度下的循环结束时,有一定概率将当前接受的新解替换为历史最优解
if self.condition_end() == True: # 如果满足终止条件,终止该温度循环
break
run
函数是模拟退火算法的核心循环。它通过不断降低温度来寻找最优解。具体步骤如下:
- 温度循环:当当前温度
T
大于最小温度T_min
时,进入循环。 - 在当前温度下求解:调用
run_T
函数,在当前温度下进行求解。 - 记录最优解:将当前温度下的最优解记录在
xf_best_T
字典中。 - 温度下降:调用
T_change
函数,降低温度。 - 接受最优解:调用
accept_best_xf
函数,有一定概率将当前接受的新解替换为历史最优解。 - 终止条件:如果满足终止条件(调用
condition_end
函数),则跳出循环。
def run_T(self):
for time_ in range(self.times_max):
self.time_ = time_
self.x_new = self.get_x_new() # 获得新解
self.f_new = self.func(self.x_new) # 获得新的函数值
self.save_xy() # 将新解和函数值记录下来
self.delta = self.f_new - self.f # 计算函数值的变化值
self.judge() # 判断是否接受新解
self.times_cycle += 1 # 统计循环次数
self.delta_best = np.abs(
self.f - self.f_last
) # 上次函数值与这次函数值的差值绝对值
self.count_times_delta_smaller() # 统计新旧函数值之差的绝对值连续小于此值的次数
if self.condition_end() == True: # 如果满足终止条件,终止该温度循环
print(
"满足终止条件:接受新解后的函数值变化连续小于{}达到次数".format(
self.delta_min
)
)
break
print(
"当前历史最优解{}:{}".format(self.f_best, self.x_best)
) # 展示当前最优值
print("当前接受的新解{}:{}".format(self.f, self.x)) # 展示当前接受的新解
print("当前新解{}:{}".format(self.f_new, self.x_new)) # 展示当前新产生的解
print("当前温度为{}".format(self.T)) # 展示当前温度
run_T
函数在特定温度下进行循环求解。具体步骤如下:
- 循环次数:在每个温度下,循环
times_max
次。 - 获得新解:调用
get_x_new
函数,生成新的解x_new
。 - 计算新函数值:使用目标函数
func
计算新的函数值f_new
。 - 记录新解和函数值:调用
save_xy
函数,将新解和函数值记录下来。 - 计算函数值变化:计算新旧函数值的变化
delta
。 - 判断是否接受新解:调用
judge
函数,判断是否接受新解。 - 统计循环次数:增加循环次数
times_cycle
。 - 计算函数值差值:计算上次函数值与这次函数值的差值绝对值
delta_best
。 - 统计差值次数:调用
count_times_delta_smaller
函数,统计新旧函数值之差的绝对值连续小于某值的次数。 - 终止条件:如果满足终止条件(调用
condition_end
函数),则跳出循环。 - 打印信息:打印当前最优解、接受的新解、新产生的解和当前温度。
def judge(self):
if self.delta < 0: # 如果函数值变动幅度小于0,则接受新解
self.x = self.x_new
self.f_last = self.f # 在最优解函数值更新之前将其记录下来
self.f = self.f_new
self.save_best_xy() # 记录每次循环接受的新解
self.get_history_best_xy() # 更新历史最优解
self.times_stay = 0 # 由于未接受新解,将连续未接受新解的次数归零
print(
"由于函数值变小新接受解{}:{}".format(self.f, self.x)
) # 展示当前接受的新解
else:
p = np.exp(-self.delta / (self.T * self.alpha)) # 接受新解的概率
p_ = np.random.random() # 判断标准概率
if p > p_: # 如果概率足够大,接受新解
self.x = self.x_new
self.f_last = self.f # 在接受的新解更新之前将其记录下来
self.f = self.f_new
self.save_best_xy() # 记录每次循环接受的新解
self.get_history_best_xy() # 更新历史最优解
print(
"由于概率{}大于{},新接受解{}:{}".format(p, p_, self.f, self.x)
) # 展示当前接受的新解
self.times_p += 1 # 统计因为概率而接受新解的次数
self.times_stay = 0 # 由于未接受新解,将连续未接受新解的次数归零
else:
if self.time_ == 0:
self.f_last = self.f # 在接受的新解更新之前将其记录下来
self.times_stay += 1 # 连续接受新解次数加1
print("连续未接受新解{}次".format(self.times_stay))
judge
函数用于判断是否接受新的解 x_new
。具体步骤如下:
- 函数值变小:如果新函数值
f_new
小于当前函数值f
,则直接接受新解。- 更新当前解
x
和函数值f
。 - 记录最优解。
- 更新历史最优解。
- 将连续未接受新解的次数
times_stay
归零。 - 打印接受的新解信息。
- 更新当前解
- 函数值变大:如果新函数值
f_new
大于当前函数值f
,则根据概率判断是否接受新解。- 计算接受新解的概率
p
。 - 生成一个随机概率
p_
。 - 如果
p
大于p_
,则接受新解。- 更新当前解
x
和函数值f
。 - 记录最优解。
- 更新历史最优解。
- 打印接受的新解信息。
- 统计因为概率而接受新解的次数
times_p
。 - 将连续未接受新解的次数
times_stay
归零。
- 更新当前解
- 如果
p
小于p_
,则不接受新解。- 如果是第一次循环,将当前函数值
f
记录为f_last
。 - 增加连续未接受新解的次数
times_stay
。 - 打印未接受新解的次数。
- 如果是第一次循环,将当前函数值
- 计算接受新解的概率
def get_history_best_xy(self):
x_array = list(
np.array(list(self.xf_best_all.values()),dtype=object)[:, 0]
) # 从历史所有的最优x和f中获得所有的x
f_array = list(
np.array(list(self.xf_best_all.values()),dtype=object)[:, 1]
) # 从历史所有的最优x和f中获得所有的f
self.f_best = min(f_array) # 从每阶段最优的f中获得最优的f
self.x_best = x_array[f_array.index(self.f_best)] # 利用最优f反推最优x
return self.x_best, self.f_best
get_history_best_xy
函数用于获取历史最优解。具体步骤如下:
- 获取所有历史最优解:从
xf_best_all
字典中提取所有的x
和f
。 - 找到最优函数值:从所有的
f
中找到最小的f_best
。 - 找到对应的最优解:根据最优函数值
f_best
找到对应的x_best
。 - 返回最优解:返回最优解
x_best
和最优函数值f_best
。
** 功用:**
可用于用户自己改造,以便于用户从数据库(Tushare)等获得自己所需的数据以进行研究或应用。
运行 main.py 可能需要一些时间,因为受计算机硬件条件的影响,模拟退火算法非常耗时。此外,模拟退火算法通常需要较多的迭代次数,但由于计算机硬件条件的限制,我迭代的次数较少,这可能会对结果产生一些影响。