关键词:
BaoStock Pandas sqlite3 to_sql
GitHub: StockIndicatorAnalyzer
网站入口
- BaoStock 使用
BaoStock 的文档写的非常详细,从安装到接口使用基本不需要额外介绍了。
这里我们主要拿两个数据,一个是所有的股票代码数据,另一个是所有股票的每日K线数据。
a)获得所有股票的代码数据
接口query_all_stock(),这里直接用的Baostock的例子,只是参数为空,默认当前日期
import baostock as bs
import pandas as pd
lg = bs.login()
print('login respond error_code:'+lg.error_code)
print('login respond error_msg:'+lg.error_msg)
rs = bs.query_all_stock()
print('query_all_stock respond error_code:'+rs.error_code)
print('query_all_stock respond error_msg:'+rs.error_msg)
data_list = []
while (rs.error_code == '0') & rs.next():
data_list.append(rs.get_row_data())
result = pd.DataFrame(data_list, columns=rs.fields)
result.to_csv("all_stock.csv", encoding="gbk", index=False)
print(result)
bs.logout()
b)获得某只股票的K线数据
接口query_history_k_data_plus
参数和输出见API手册。这里我用的是前复权
import baostock as bs
import pandas as pd
lg = bs.login()
print('login respond error_code:'+lg.error_code)
print('login respond error_msg:'+lg.error_msg)
# 分钟线指标:date,time,code,open,high,low,close,volume,amount,adjustflag
rs = bs.query_history_k_data_plus("sh.600000",
"date,code,open,high,low,close,preclose,volume,amount,adjustflag,turn,tradestatus,pctChg,isST",
start_date='2017-07-01', end_date='2017-12-31',
frequency="d", adjustflag="2")
print('query_history_k_data_plus respond error_code:'+rs.error_code)
print('query_history_k_data_plus respond error_msg:'+rs.error_msg)
data_list = []
while (rs.error_code == '0') & rs.next():
data_list.append(rs.get_row_data())
result = pd.DataFrame(data_list, columns=rs.fields)
result.to_csv("history_A_stock_k_data.csv", index=False)
print(result)
bs.logout()
- 创建数据库
经过思考,我只需要存储股票代码数据,股票日K线数据,然后计算一些指标,表不会特别多,所以直接用Python自带的Sqlite3.
我们需要建三张表:
a) allstock 保存所有股票代码和名字
b) stock_day_k 保存所有股票的日K线数据
c) stock_spec 保存计算好的股票的指标
Sqlite3的数据类型也比较简单,只有5种数据类型
Sqlite3数据类型
这里我们直接参考BaoStock的返回数据来创建表
####createDB.py####
import baostock as bs
import pandas as pd
import sqlite3
def createDB():
conn = sqlite3.connect("mystock.db")
cursor = conn.cursor()
# create three tables, allstock stock_day_k and stock_spec
sql_stock = "CREATE TABLE allstock(code TEXT PRIMARY KEY, tradeStatus INT not null, code_name TEXT not null)"
sql_day_K = "CREATE TABLE stock_day_k(date date, code TEXT, open REAL, high REAL, low REAL, close REAL,"\
"preclose REAL, volume INT, amount INT, adjustflag INT, turn REAL, tradestatus INT,"\
"pctChg REAL, peTTM REAL, pbMRQ REAL, psTTM REAL, pcfNcfTTM REAL, isST INT, primary key (date, code))"
sql_stock_spec = "CREATE TABLE stock_spec(date date, code TEXT, name TEXT, relacode TEXT, alpha_y REAL, beta_y REAL, r_y REAL,"\
"alpha_m REAL, beta_m REAL, r_m REAL, corr_y REAL, cov_y REAL, corr_m REAL, cov_m REAL,"\
"amplitude_y REAL, amplitude_m REAL,amplitude_10 REAL,amplitude_5 REAL, primary key (date, code))"
#为stock_day_k的code列加索引
sql_day_K_index = "CREATE INDEX code_index ON stock_day_k (code)"
cursor.execute(sql_stock)
cursor.execute(sql_day_K)
cursor.execute(sql_stock_spec)
cursor.execute(sql_day_K_index)
cursor.close()
conn.commit()
conn.close()
if __name__=='__main__':
createDB()
这里stock_day_k和stock_spec都用了 code和date作为联合主键。
后续在使用过程中,stock_day_k数据比较多,从数据库中获取某只股票的K线数据时会有点慢,所以给stock_day_k的code列加一个索引。
- 存储数据
这一段在实现过程中遇到过一些问题。
最初的时候,因为BaoStock的例子用的是pandas的to_csv方法保存到csv,所以参考直接用pandas的to_sql将数据存到数据库。第一次抓数据的时候没有问题,但是测试发现新数据的时候,要不然就是之前的数据没有了,要不然就是有重复数据。
研究了pandas的手册to_sql 方法,里面的if_exists参数
if_exists{‘fail’, ‘replace’, ‘append’}, default ‘fail’
How to behave if the table already exists.
fail: Raise a ValueError.
replace: Drop the table before inserting new values.
append: Insert new values to the existing table.
用replace的话,会把整个table删掉然后创建新表并插入新数据,这个肯定不考虑了因为我们每天需要更新增量。
用append的话,会把新的数据加入现有的表,看上去没有问题,但是如果有重复数据的话(比如表之前存到2020-04-20,新抓的数据又包含2020-04-20)表内就会出现重复数据。这样也不行, 所以最后采用第二步,自己创建带有主键的表。然后继续尝试to_sql,
当有重复数据的时候报错
sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed: stock_day_k.date, stock_day_k.code
date和code是联合主键,要求唯一,失败了。
所以to_sql有两个问题,一是不能创建带主键的表,二是不能向带有主键的表插入重复数据。
最后发现还是不能犯懒,老老实实搜索sqlite3怎样插入数据到数据库,用insert or replace可以实现,并且可以不用for循环批量更新。
import baostock as bs
import pandas as pd
from sqlalchemy import create_engine
def refresh_all_stock(current_date = "2020-03-27"):
db_conn = create_engine('sqlite:///mystock.db')
lg = bs.login()
rs = bs.query_all_stock(day=current_date)
data_list = []
while (rs.error_code == '0') & rs.next():
data_list.append(rs.get_row_data())
db_conn.execute(r'''
INSERT OR REPLACE INTO allstock VALUES (?, ?, ?)
''', data_list)