交通流量预测在智能交通(ITS)系统中占有重要地位,是实现交通诱导的前提。准确实时的短时交通流预测有助于更好的分析路网交通状况,对路网交通规划和交通优化控制有非常重要的作用。随着交通数据采集技术的不断发展,及时获取路网中实时的交通数据已成为可能,大量的交通信息为基于深度学习的预测模型提供了数据保障。
基于神经网络的交通流预测的相关研究如下列论文所示。由于与我的研究方向相关,在本文中我实现了基于SAEs、RNN的交通流量预测模型。
Paper:
Traffic Flow Prediction With Big Data: A Deep Learning Approach
Using LSTM and GRU neural network methods for traffic flow prediction
Github:https://github.com/xiaochus/TrafficFlowPrediction
环境
- Python 3.5
- Tensorflow-gpu 1.2.0
- Keras 2.1.3
- scikit-learn 0.18
数据处理
实验数据是从Caltrans Performance Measurement System (PeMS)获取的。原始的流量数据是一个长度为n的一维数据。我们首先使用训练集的数据实现一个标准化对象scaler,然后使用scaler分别对训练集与测试集进行标准化。
由于时序预测任务需要使用历史数据对未来数据进行预测,我们使用时滞变量lags对数据进行划分,最后获得大小为(samples, lags)的数据集。
划分好的数据集在排列顺序上依旧带有时序特性,虽然keras在训练时可以选择对数据进行混洗,但是其执行顺序是先对val数据进行采样再进行混洗,采样过程依旧是按照顺序来的。因此我们事先使用np.random.shuffle对数据进行混洗,打乱数据的顺序。
def process_data(train, test, lags):
"""Process data
Reshape and split train\test data.
# Arguments
train: String, name of .csv train file.
test: String, name of .csv test file.
lags: integer, time lag.
# Returns
X_train: ndarray.
y_train: ndarray.
X_test: ndarray.
y_test: ndarray.
scaler: StandardScaler.
"""
attr = 'Lane 1 Flow (Veh/5 Minutes)'
df1 = pd.read_csv(train, encoding='utf-8').fillna(0)
df2 = pd.read_csv(test, encoding='utf-8').fillna(0)
# scaler = StandardScaler().fit(df1[attr].values)
scaler = MinMaxScaler(feature_range=(0, 1)).fit(df1[attr].values)
flow1 = scaler.transform(df1[attr].values)
flow2 = scaler.transform(df2[attr].values)
train, test = [], []
for i in range(lags, len(flow1)):
train.append(flow1[i - lags: i + 1])
for i in range(lags, len(flow2)):
test.append(flow2[i - lags: i + 1])
train = np.array(train)
test = np.array(test)
np.random.shuffle(train)
X_train = train[:, :-1]
y_train = train[:, -1]
X_test = test[:, :-1]
y_test = test[:, -1]
return X_train, y_train, X_test, y_test, scaler
模型
LSTM
2隐层LSTM网络。
def get_lstm(units):
"""LSTM(Long Short-Term Memory)
Build LSTM Model.
# Arguments
units: List(int), number of input, output and hidden units.
# Returns
model: Model, nn model.
"""
model = Sequential()
model.add(LSTM(units[1], input_shape=(units[0], 1), return_sequences=True))
model.add(LSTM(units[2]))
model.add(Dropout(0.2))
model.add(Dense(units[3], activation='linear'))
return model
GRU
2隐层GRU网络。
def get_gru(units):
"""GRU(Gated Recurrent Unit)
Build GRU Model.
# Arguments
units: List(int), number of input, output and hidden units.
# Returns
model: Model, nn model.
"""
model = Sequential()
model.add(GRU(units[1], input_shape=(units[0], 1), return_sequences=True))
model.add(GRU(units[2]))
model.add(Dropout(0.2))
model.add(Dense(units[3], activation='linear'))
return model
SAEs
Auto-Encoders的原理是先通过一个encode层对输入进行编码,这个编码就是特征,然后利用encode乘第2层参数(也可以是encode层的参数的转置乘特征并加偏执),重构(解码)输入,然后用重构的输入和实际输入的损失训练参数。
这里我们构建了三个单独的自动编码器,并按照相同的隐层结构构建了一个三层的SAEs。
def _get_sae(inputs, hidden, output):
"""SAE(Auto-Encoders)
Build SAE Model.
# Arguments
inputs: Integer, number of input units.
hidden: Integer, number of hidden units.
output: Integer, number of output units.
# Returns
model: Model, nn model.
"""
model = Sequential()
model.add(Dense(hidden, input_dim=inputs, name='hidden'))
model.add(Activation('sigmoid'))
model.add(Dropout(0.2))
model.add(Dense(output))
return model
def get_saes(layers):
"""SAEs(Stacked Auto-Encoders)
Build SAEs Model.
# Arguments
layers: List(int), number of input, output and hidden units.
# Returns
models: List(Model), List of SAE and SAEs.
"""
sae1 = _get_sae(layers[0], layers[1], layers[-1])
sae2 = _get_sae(layers[1], layers[2], layers[-1])
sae3 = _get_sae(layers[2], layers[3], layers[-1])
saes = Sequential()
saes.add(Dense(layers[1], input_dim=layers[0], name='hidden1'))
saes.add(Activation('sigmoid'))
saes.add(Dense(layers[2], name='hidden2'))
saes.add(Activation('sigmoid'))
saes.add(Dense(layers[3], name='hidden3'))
saes.add(Activation('sigmoid'))
saes.add(Dropout(0.2))
saes.add(Dense(layers[4]))
models = [sae1, sae2, sae3, saes]
return models
训练
LSTM、GRU按照正常的RNN网络进行训练。使用train_model()函数训练。
SAEs的训练过程:多个SAE分别训练,第一个SAE训练完之后,其encode的输出作为第二个SAE的输入,接着训练。最后训练完后,将所有SAE的中间隐层连接起来组成一个SAEs网络,使用之前的权值作为初始化权值,再对整个网络进行fine-tune。使用train_seas()函数训练。
使用RMSprop(lr=0.001, rho=0.9, epsilon=1e-06)作为优化器,batch_szie为256,lags为12(即时滞长度为一个小时)。
def train_model(model, X_train, y_train, name, config):
"""train
train a single model.
# Arguments
model: Model, NN model to train.
X_train: ndarray(number, lags), Input data for train.
y_train: ndarray(number, ), result data for train.
name: String, name of model.
config: Dict, parameter for train.
"""
model.compile(loss="mse", optimizer="rmsprop", metrics=['mape'])
# early = EarlyStopping(monitor='val_loss', patience=30, verbose=0, mode='auto')
hist = model.fit(
X_train, y_train,
batch_size=config["batch"],
epochs=config["epochs"],
validation_split=0.05)
model.save('model/' + name + '.h5')
df = pd.DataFrame.from_dict(hist.history)
df.to_csv('model/' + name + ' loss.csv', encoding='utf-8', index=False)
def train_seas(models, X_train, y_train, name, config):
"""train
train the SAEs model.
# Arguments
models: List, list of SAE model.
X_train: ndarray(number, lags), Input data for train.
y_train: ndarray(number, ), result data for train.
name: String, name of model.
config: Dict, parameter for train.
"""
temp = X_train
# early = EarlyStopping(monitor='val_loss', patience=30, verbose=0, mode='auto')
for i in range(len(models) - 1):
if i > 0:
p = models[i - 1]
hidden_layer_model = Model(input=p.input,
output=p.get_layer('hidden').output)
temp = hidden_layer_model.predict(temp)
m = models[i]
m.compile(loss="mse", optimizer="rmsprop", metrics=['mape'])
m.fit(temp, y_train, batch_size=config["batch"],
epochs=config["epochs"],
validation_split=0.05)
models[i] = m
saes = models[-1]
for i in range(len(models) - 1):
weights = models[i].get_layer('hidden').get_weights()
saes.get_layer('hidden%d' % (i + 1)).set_weights(weights)
train_model(saes, X_train, y_train, name, config)
实验
评估
在这里使用MAE、MSE、RMSE、MAPE、R2、explained_variance_score几个指标对回归预测结果进行评估。
def MAPE(y_true, y_pred):
"""Mean Absolute Percentage Error
Calculate the mape.
# Arguments
y_true: List/ndarray, ture data.
y_pred: List/ndarray, predicted data.
# Returns
mape: Double, result data for train.
"""
y = [x for x in y_true if x > 0]
y_pred = [y_pred[i] for i in range(len(y_true)) if y_true[i] > 0]
num = len(y_pred)
sums = 0
for i in range(num):
tmp = abs(y[i] - y_pred[i]) / y[i]
sums += tmp
mape = sums * (100 / num)
return mape
def eva_regress(y_true, y_pred):
"""Evaluation
evaluate the predicted resul.
# Arguments
y_true: List/ndarray, ture data.
y_pred: List/ndarray, predicted data.
"""
mape = MAPE(y_true, y_pred)
vs = metrics.explained_variance_score(y_true, y_pred)
mae = metrics.mean_absolute_error(y_true, y_pred)
mse = metrics.mean_squared_error(y_true, y_pred)
r2 = metrics.r2_score(y_true, y_pred)
print('explained_variance_score:%f' % vs)
print('mape:%f%%' % mape)
print('mae:%f' % mae)
print('mse:%f' % mse)
print('rmse:%f' % math.sqrt(mse))
print('r2:%f' % r2)
预测
我们使用训练好的模型对测试集进行预测。
def main():
lstm = load_model('model/lstm.h5')
gru = load_model('model/gru.h5')
saes = load_model('model/saes.h5')
models = [lstm, gru, saes]
names = ['LSTM', 'GRU', 'SAEs']
lag = 12
file1 = 'data/train.csv'
file2 = 'data/test.csv'
_, _, X_test, y_test, scaler = process_data(file1, file2, lag)
y_test = scaler.inverse_transform(y_test)
y_preds = []
for name, model in zip(names, models):
if name == 'SAEs':
X_test = np.reshape(X_test, (X_test.shape[0], X_test.shape[1]))
else:
X_test = np.reshape(X_test, (X_test.shape[0], X_test.shape[1], 1))
file = 'images/' + name + '.png'
plot_model(model, to_file=file, show_shapes=True)
predicted = model.predict(X_test)
predicted = scaler.inverse_transform(predicted)
y_preds.append(predicted[:300])
print(name)
eva_regress(y_test, predicted)
plot_results(y_test[: 300], y_preds, names)
预测精度对比如下所示:
Metrics | MAE | MSE | RMSE | MAPE | R2 | Explained variance score |
---|---|---|---|---|---|---|
LSTM | 7.16 | 94.20 | 9.71 | 21.25% | 0.9420 | 0.9421 |
GRU | 7.18 | 95.01 | 9.75 | 17.42% | 0.9415 | 0.9415 |
SAEs | 7.71 | 106.46 | 10.32 | 25.62% | 0.9344 | 0.9352 |
预测结果对比如下所示: