# coding=utf-8
from __future__ import division
import time, random
import numpy as np
import numba
import pyximport
pyximport.install()
import ma
data = []
data_length = 100000
ma_length = 500
test_times = 10
for i in range(data_length):
data.append(random.randint(1, 100))
# 方法1
def ma_basic(data, ma_length):
ma = []
data_window = data[:ma_length]
test_data = data[ma_length:]
for new_tick in test_data:
data_window.pop(0)
data_window.append(new_tick)
sum_tick = 0
for tick in data_window:
sum_tick += tick
ma.append(sum_tick / ma_length)
return ma
# 方法2
def ma_numpy_wrong(data, ma_length):
ma = []
data_window = data[:ma_length]
test_data = data[ma_length:]
for new_tick in test_data:
data_window.pop(0)
data_window.append(new_tick)
data_array = np.array(data_window)
ma.append(data_array.mean())
return ma
# 方法3
def ma_numpy_right(data, ma_length):
ma = []
data_window = np.array(data[:ma_length])
test_data = data[ma_length:]
for new_tick in test_data:
data_window[0:ma_length - 1] = data_window[1:ma_length]
data_window[-1] = new_tick
ma.append(data_window.mean())
return ma
# 方法4
@numba.jit
def ma_numba(data, ma_length):
ma = []
data_window = data[:ma_length]
test_data = data[ma_length:]
for new_tick in test_data:
data_window.pop(0)
data_window.append(new_tick)
sum_tick = 0
for tick in data_window:
sum_tick += tick
ma.append(sum_tick / ma_length)
return ma
# 方法5
def ma_online(data, ma_length):
ma = []
data_window = data[:ma_length]
test_data = data[ma_length:]
sum_buffer = 0
for new_tick in test_data:
old_tick = data_window.pop(0)
data_window.append(new_tick)
if not sum_buffer:
sum_tick = 0
for tick in data_window:
sum_tick += tick
ma.append(sum_tick / ma_length)
sum_buffer = sum_tick
else:
sum_buffer = sum_buffer - old_tick + new_tick
ma.append(sum_buffer / ma_length)
return ma
# 方法6
@numba.jit
def ma_online_numba(data, ma_length):
ma = []
data_window = data[:ma_length]
test_data = data[ma_length:]
sum_buffer = 0
for new_tick in test_data:
old_tick = data_window.pop(0)
data_window.append(new_tick)
if not sum_buffer:
sum_tick = 0
for tick in data_window:
sum_tick += tick
ma.append(sum_tick / ma_length)
sum_buffer = sum_tick
else:
sum_buffer = sum_buffer - old_tick + new_tick
ma.append(sum_buffer / ma_length)
return ma
start_time = time.time()
for i in range(test_times):
result = ma.ma_cython_online(data, ma_length)
time_per_test = (time.time() - start_time) / test_times
time_per_point = time_per_test / (data_length - ma_length)
print('单次耗时: %s秒' % time_per_test)
print('单个数据点耗时: %s微妙' % (time_per_point * 1000000))
print('最后10个移动平均值: ', result[-10:])
def ma_cython(data, ma_length):
ma = []
data_window = data[:ma_length]
test_data = data[ma_length:]
for new_tick in test_data:
data_window.pop(0)
data_window.append(new_tick)
sum_tick = 0
for tick in data_window:
sum_tick += tick
ma.append(sum_tick / ma_length)
return ma
def ma_cython_online(data, ma_length):
cdef float sum_buffer, sum_tick, old_tick, new_tick
ma = []
data_window = data[:ma_length]
test_data = data[ma_length:]
sum_buffer = 0
for new_tick in test_data:
old_tick = data_window.pop(0)
data_window.append(new_tick)
if not sum_buffer:
sum_tick = 0
for tick in data_window:
sum_tick += tick
ma.append(sum_tick / ma_length)
sum_buffer = sum_tick
else:
sum_buffer = sum_buffer - old_tick + new_tick
ma.append(sum_buffer / ma_length)
return ma