算法原理
1、利用股票日K数据的MA与收盘价的对比,确定股票的上升与下降趋势时间段;
2、在时间段内求极值点
代码实现
import talib
import pandas as pd
def get_peak_and_buttom(bar_df:pd.Series, ma_length:int)->(list, list):
"""获取序列的波峰与波谷
Args:
bar_df (Series): 股票收盘价序列
ma_length(int):MA长度
Returns:
_type_: _description_
"""
if len(bar_df) <=ma_length:
return None, None
ma = talib.MA(bar_df, ma_length)
compare_flag = ma > bar_df
df_list = []
start_idx = 0
prev_flag = compare_flag.iloc[0]
def add_to_list(df, flag):
if len(df_list) > 0 and flag == df_list[-1]['flag']:
tmp_df = df_list.pop()
new_df = pd.concat([tmp_df['df'], df], axis=0)
df_list.append({'flag':flag, 'df':new_df})
else:
df_list.append({"flag":flag, "df":df})
# 利用MA数据与收盘价的对比计算上升与下降区间
for i,flag in enumerate(compare_flag):
if flag != prev_flag:
end_idx = i
# 屏蔽个别异常数据
if end_idx - start_idx <= 1:
prev_flag = not prev_flag
continue
t_df = bar_df.iloc[start_idx:end_idx]
add_to_list(t_df, prev_flag)
start_idx = i
prev_flag = flag
end_idx = len(compare_flag)
t_df = bar_df.iloc[start_idx:]
add_to_list(t_df, prev_flag)
# 计算各个区间内的极值点
high_idx = [t_df["df"].idxmax() for t_df in df_list if not t_df["flag"]]
low_idx = [t_df["df"].idxmin() for t_df in df_list if t_df["flag"]]
return high_idx, low_idx
测试结果
由图可以看出,MA的长度越长,就会屏蔽中间的次级波动