1、摘要
本文主要讲解:PSO粒子群优化-CNN-优化神经网络神经元个数dropout和batch_size,目标为对沪深300价格进行预测
主要思路:
- PSO Parameters :粒子数量、搜索维度、所有粒子的位置和速度、个体经历的最佳位置和全局最佳位置、每个个体的历史最佳适应值
- CNN Parameters 神经网络第一层神经元个数、神经网络第二层神经元个数、dropout比率、batch_size
- 开始搜索:初始粒子适应度计算、计算初始全局最优、计算适应值、初始全局最优参数、适应度函数、更新个体最优、更新全局最优、全局最优参数
- 训练模型,使用PSO找到的最好的全局最优参数
- plt.show()
2、数据介绍
['SP', 'High', 'Low', 'KP', 'QSP', 'ZDE', 'ZDF', 'CJL']
需要数据的话去我其他文章找到我的球球
3、相关技术
PSO好的地方就是论文多,好写引用文献
不过说实话,算法优化我并不推荐用PSO,虽然说PSO的论文多,但是都被用烂了,AutoML-NNI,hyperopt,optuna,ray都是很好很先进的优化框架,里面集成了很多效果非常好的优化算法,推荐大家学习。
4、完整代码和步骤
代码输出如下:
主运行程序入口
import os
import os
import random
import time
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from keras.callbacks import EarlyStopping
from keras.layers import Dense, Dropout, LSTM
from keras.layers.core import Activation
from keras.models import Sequential
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tensorflow.keras import Sequential
from tensorflow.python.keras.layers import Conv1D
from tensorflow.python.keras.models import Sequential
import csv
def writeOneCsv(relate_record, src):
with open(src, 'a', newline='\n') as csvFile:
writer = csv.writer(csvFile)
writer.writerow(relate_record)
np.random.seed(666)
matplotlib.rcParams['agg.path.chunksize'] = 0
matplotlib.rcParams.update(matplotlib.rc_params())
src = 'D:\项目\PSO-LSTM模型预测时间序列\数据\\'
src1 = 'D:\项目\PSO-LSTM模型预测时间序列\图片\\'
os.chdir(r'D:\项目\PSO-LSTM模型预测时间序列')
filename = 'lstm4_pso_'
batch_size = 128
epochs = 2
steps = 10
scalerx = StandardScaler()
scalery = StandardScaler()
def process_data():
# usecols 代表使用数据的列索引,左闭右开
dataset = pd.read_csv("data5.csv", engine='python', parse_dates=['date'], usecols=range(1, 9), index_col=['date'])
columns = ['Y', 'X1', 'X2', 'X3', 'X4', 'X5', 'X6']
# 对X进行标准化
for col in columns[1:]:
dataset[col] = scalerx.fit_transform(dataset[col].values.reshape(-1, 1))
# 对Y进行标准化
for col in columns[:1]:
dataset[col] = scalery.fit_transform(dataset[col].values.reshape(-1, 1))
X = dataset.drop(columns=['Y'], axis=1)
y = dataset['Y']
# test_size代表划分20%到测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False, random_state=666)
return X_train, y_train, X_test, y_test
def create_dataset(X, y, seq_len):
features = []
targets = [] # 标签
for i in range(0, len(X) - seq_len, 1): # 此处的1表示步长,每隔一步滑一下
data = X.iloc[i:i + seq_len] # 序列数据;前闭后开
label = y.iloc[i + seq_len] # 标签数据
# 保存到features和labels
features.append(data)
targets.append(label)
trainX = np.array(features).astype('float64')
return trainX, np.array(targets).reshape(-1, 1)
def build_model(neurons1, neurons2, dropout):
X_train, y_train, X_test, y_test = process_data()
X_train, y_train = create_dataset(X_train, y_train, steps)
X_test, y_test = create_dataset(X_test, y_test, steps)
nb_features = X_train.shape[2]
input1 = X_train.shape[1]
model1 = Sequential()
model1.add(Conv1D(neurons1, kernel_size=4, strides=2, input_shape=(input1, nb_features)))
model1.add(Dropout(dropout))
model1.add(Conv1D(neurons2, kernel_size=4, strides=2, input_shape=(input1, nb_features)))
model1.add(Dropout(dropout))
model1.add(Dense(units=1))
model1.add(Activation("linear"))
model1.compile(loss='mse', optimizer='Adam', metrics='mae')
return model1, X_train, y_train, X_test, y_test
def training(X):
neurons1 = int(X[0])
neurons2 = int(X[1])
dropout = round(X[2], 6)
batch_size = int(X[3])
print(X)
model, X_train, y_train, X_test, y_test = build_model(neurons1, neurons2, dropout)
model.fit(
X_train,
y_train,
batch_size=batch_size,
epochs=22,
validation_split=0.1,
verbose=1,
callbacks=[EarlyStopping(monitor='val_loss', patience=22, restore_best_weights=True)])
pred = model.predict(X_test)
le = len(pred)
y_t = y_test.reshape(-1, 1)
return pred, le, y_t
def function(ps, test, le):
ss = sum(((abs(test - ps)) / test) / le)
return ss
# (1) PSO Parameters
MAX_EPISODES = 22
MAX_EP_STEPS = 22
c1 = 2
c2 = 2
w = 0.5
pN = 100 # 粒子数量
# (2) LSTM Parameters
dim = 4 # 搜索维度
X = np.zeros((pN, dim) ) # 所有粒子的位置和速度
V = np.zeros((pN, dim))
pbest = np.zeros((pN, dim)) # 个体经历的最佳位置和全局最佳位置
gbest = np.zeros(dim)
p_fit = np.zeros(pN) # 每个个体的历史最佳适应值
print(p_fit.shape)
print(p_fit.shape)
t1 = time.time()
'''
神经网络第一层神经元个数
神经网络第二层神经元个数
dropout比率
batch_size
'''
UP = [150, 15, 0.5, 16]
DOWN = [50, 5, 0.05, 8]
# (4) 开始搜索
for i_episode in range(MAX_EPISODES):
"""初始化s"""
random.seed(8)
fit = -1e5 # 全局最佳适应值
# 初始粒子适应度计算
print("计算初始全局最优")
for i in range(pN):
for j in range(dim):
V[i][j] = random.uniform(0, 1)
if j == 2:
X[i][j] = random.uniform(DOWN[j], UP[j])
else:
X[i][j] = round(random.randint(DOWN[j], UP[j]), 0)
pbest[i] = X[i]
le, pred, y_t = training(X[i])
NN = 1
# 计算适应值
tmp = function(pred, y_t, le)
p_fit[i] = tmp
if tmp > fit:
fit = tmp
gbest = X[i]
print("初始全局最优参数:{:}".format(gbest))
fitness = [] # 适应度函数
for j in range(MAX_EP_STEPS):
fit2 = []
plt.title("第{}次迭代".format(i_episode))
for i in range(pN):
le, pred, y_t = training(X[i])
temp = function(pred, y_t, le)
fit2.append(temp / 1000)
if temp > p_fit[i]: # 更新个体最优
p_fit[i] = temp
pbest[i] = X[i]
if p_fit[i] > fit: # 更新全局最优
gbest = X[i]
fit = p_fit[i]
print("搜索步数:{:}".format(j))
print("个体最优参数:{:}".format(pbest))
print("全局最优参数:{:}".format(gbest))
for i in range(pN):
V[i] = w * V[i] + c1 * random.uniform(0, 1) * (pbest[i] - X[i]) + c2 * random.uniform(0, 1) * (
gbest - X[i])
ww = 1
for k in range(dim):
if DOWN[k] < X[i][k] + V[i][k] < UP[k]:
continue
else:
ww = 0
X[i] = X[i] + V[i] * ww
fitness.append(fit)
print('Running time: ', time.time() - t1)
# 训练模型 使用PSO找到的最好的神经元个数
neurons1 = int(gbest[0])
neurons2 = int(gbest[1])
dropout = gbest[2]
batch_size = int(gbest[3])
model, X_train, y_train, X_test, y_test = build_model(neurons1, neurons2, dropout)
history1 = model.fit(X_train, y_train, epochs=222, batch_size=batch_size, validation_split=0.2, verbose=1,
callbacks=[EarlyStopping(monitor='val_loss', patience=9, restore_best_weights=True)])
y_score = model.predict(X_test)
scaler_y_score = scalery.inverse_transform(y_score)
scaler_y_test = scalery.inverse_transform(y_test)
writeOneCsv(scaler_y_score, src + filename + 'pred_real.csv')
writeOneCsv(scaler_y_test, src + filename + 'pred_real.csv')
plt.figure(figsize=(10, 10))
plt.plot(scaler_y_score)
plt.plot(scaler_y_test)
plt.title('real vs pred test')
plt.ylabel('V')
plt.xlabel('X')
plt.legend(['pred', 'real'], loc='lower right')
plt.savefig(src1 + filename + 'pred_real.png')
plt.show()
代码比较复杂,如需帮忙请私聊
5、学习链接
PSO粒子群优化-LSTM-pyswarms框架-实现期货价格预测
PSO优化GRU-LSTM超参数
PSO粒子群优化-LSTM-优化神经网络神经元个数dropout和batch_size