PYTHON多线程行情抓取工具实现

思路

借助python当中threading模块与Queue模块组合可以方便的实现基于生产者-消费者模型的多线程模型。Jimmy大神的tushare一直是广大python数据分析以及业余量化爱好者喜爱的免费、开源的python财经数据接口包。

平时一直有在用阿里云服务器通过tushare的接口自动落地相关财经数据,但日复权行情数据以往在串行下载的过程当中,速度比较慢,有时遇到网络原因还需要重下。每只股票的行情下载过程中都需要完成下载、落地2个步骤,一个可能需要网络开销、一个需要数据库mysql的存取开销。2者原本就可以独立并行执行,是个典型的“生产者-消费者”模型。

基于queue与threading模块的线程使用一般采用以下的套路:


producerQueue=Queue()
consumerQueue=Queue()
lock = threading.Lock()
class producerThead(threading.Thread):
    def __init__(self, producerQueue,consumerQueue):
        self.producerQueue=producerQueue
        self.consumerQueue=consumerQueue



    def run(self):
        while not self.thread_stop:
            try:
                #接收任务,如果连续20秒没有新的任务,线程退出,否则会一直执行
                item=self.producerQueue.get(block=True, timeout=20)
                #阻塞调用进程直到有数据可用。如果timeout是个正整数,
                #阻塞调用进程最多timeout秒,
                #如果一直无数据可用,抛出Empty异常(带超时的阻塞调用)
            except Queue.Empty:
                print("Nothing to do!thread exit!")
                self.thread_stop=True
                break
            #实现生产者逻辑,生成消费者需要处理的内容 consumerQueue.put(someItem)
            #还可以边处理,边生成新的生产任务
            doSomethingAboutProducing()
            self.producerQueue.task_done()
    def stop(self):
        self.thread_stop = True

class consumerThead(threading.Thread):
    def __init__(self,lock, consumerQueue):
        self.consumerQueue=consumerQueue
    def run(self):
        while true:
            try:
                #接收任务,如果连续20秒没有新的任务,线程退出,否则会一直执行
                item=self.consumerQueue.get(block=True, timeout=20)
                #阻塞调用进程直到有数据可用。如果timeout是个正整数,
                #阻塞调用进程最多timeout秒,
                #如果一直无数据可用,抛出Empty异常(带超时的阻塞调用)
            except Queue.Empty:
                print("Nothing to do!thread exit!")
                self.thread_stop=True
                break
            doSomethingAboutConsuming(lock)# 处理消费者逻辑,必要时使用线程锁 ,如文件操作等
            self.consumerQueue.task_done()
#定义主线程
def main():
    for i in range(n):#定义n个i消费者线程
        t = ThreadRead(producerQueue, consumerQueue)
        t.setDaemon(True)
        t.start()
    producerTasks=[] #定义初始化生产者任务队列
    producerQueue.put(producerTasks)
    for i in range(n):#定义n个生产者钱程
        t = ThreadWrite(consumerQueue, lock)
        t.setDaemon(True)
        t.start()    
    stock_queue.join()
    data_queue.join()

相关接口

1,股票列表信息接口

  • 作用
    获取沪深上市公司基本情况。属性包括:
code,代码
name,名称
industry,所属行业
area,地区
pe,市盈率
outstanding,流通股本(亿)
totals,总股本(亿)
totalAssets,总资产(万)
liquidAssets,流动资产
fixedAssets,固定资产
reserved,公积金
reservedPerShare,每股公积金
esp,每股收益
bvps,每股净资
pb,市净率
timeToMarket,上市日期
undp,未分利润
perundp, 每股未分配
rev,收入同比(%)
profit,利润同比(%)
gpr,毛利率(%)
npr,净利润率(%)
holders,股东人数
  • 调用方法
import tushare as ts
ts.get_stock_basics()
  • 返回效果
        name    industry    area       pe   outstanding     totals  totalAssets
code
600606   金丰投资     房产服务   上海     0.00     51832.01   51832.01    744930.44
002285    世联行     房产服务   深圳    71.04     76352.17   76377.60    411595.28
000861   海印股份     房产服务   广东   126.20     83775.50  118413.84    730716.56
000526   银润投资     房产服务   福建  2421.16      9619.50    9619.50     20065.32
000056    深国商     房产服务   深圳     0.00     14305.55   26508.14    787195.94
600895   张江高科     园区开发   上海   171.60    154868.95  154868.95   1771040.38
600736   苏州高新     园区开发   江苏    48.68    105788.15  105788.15   2125485.75
600663    陆家嘴     园区开发   上海    47.63    135808.41  186768.41   4562074.50
600658    电子城     园区开发   北京    19.39     58009.73   58009.73    431300.19
600648    外高桥     园区开发   上海    65.36     81022.34  113534.90   2508100.75
600639   浦东金桥     园区开发   上海    57.28     65664.88   92882.50   1241577.00
600604   市北高新     园区开发   上海   692.87     33352.42   56644.92    329289.50

2,日复权行情接口

  • 作用
    提供股票上市以来所有历史数据,默认为前复权,读取后存到本地,作为后续分析的基础
  • 调用方法
ts.get_h_data('002337', start='2015-01-01', end='2015-03-16') #两个日期之间的前复权数据

parameter:
code:string,股票代码 e.g. 600848
start:string,开始日期 format:YYYY-MM-DD 为空时取当前日期
end:string,结束日期 format:YYYY-MM-DD 为空时取去年今日
autype:string,复权类型,qfq-前复权 hfq-后复权 None-不复权,默认为qfq
index:Boolean,是否是大盘指数,默认为False
retry_count : int, 默认3,如遇网络等问题重复执行的次数
pause : int, 默认 0,重复请求数据过程中暂停的秒数,防止请求间隔时间太短出现的问题

return:
date : 交易日期 (index)
open : 开盘价
high : 最高价
close : 收盘价
low : 最低价
volume : 成交量
amount : 成交金额

  • 返回结果
            open   high  close    low     volume      amount
date
2015-03-16  13.27  13.45  13.39  13.00   81212976  1073862784
2015-03-13  13.04  13.38  13.37  13.00   40548836   532739744
2015-03-12  13.29  13.95  13.28  12.96   71505720   962979904
2015-03-11  13.35  13.48  13.15  13.00   59110248   780300736
2015-03-10  13.16  13.67  13.59  12.72  105753088  1393819776
2015-03-09  13.77  14.73  14.13  13.70  139091552  1994454656
2015-03-06  12.17  13.39  13.39  12.17   89486704  1167752960
2015-03-05  12.79  12.80  12.17  12.08   26040832   966927360
2015-03-04  13.96  13.96  13.30  12.58   26636174  1060270720
2015-03-03  12.17  13.10  13.10  12.05   19290366   733336768

实现

废话不多说,直接上代码,

  • 生产者线程,读取行情
class ThreadRead(threading.Thread):
    def __init__(self, queue, out_queue):
        '''
        用于根据股票代码、需要读取的日期,读取增量的日行情数据,
        :param queue:用于保存需要读取的股票代码、起始日期的列表
        :param out_queue:用于保存需要写入到数据库表的结果集列表
        :return:
        '''
        threading.Thread.__init__(self)
        self.queue = queue
        self.out_queue = out_queue
    def run(self):
        while true:
            item = self.queue.get()
            time.sleep(0.5)
            try:
                df_h_data = ts.get_h_data(item['code'], start=item['startdate'], retry_count=10, pause=0.01)
                if df_h_data is not None and len(df_h_data)>0:
                    df_h_data['secucode'] = item['code']
                    df_h_data.index.name = 'date'
                    print df_h_data.index,item['code'],item['startdate']
                    df_h_data['tradeday'] = df_h_data.index.strftime('%Y-%m-%d')
                    self.out_queue.put(df_h_data)
            except Exception, e:
                print str(e)
                self.queue.put(item) # 将没有爬取成功的数据放回队列里面去,以便下次重试。
                time.sleep(10)
                continue

            self.queue.task_done()
  • 消费者线程,本地存储

class ThreadWrite(threading.Thread):
    def __init__(self, queue, lock, db_engine):
        '''
        :param queue: 某种形式的任务队列,此处为tushare为每个股票返回的最新日复权行情数据
        :param lock:  暂时用连接互斥操作,防止mysql高并发,后续可尝试去掉
        :param db_engine:  mysql数据库的连接对象
        :return:no
        '''
        threading.Thread.__init__(self)
        self.queue = queue
        self.lock = lock
        self.db_engine = db_engine

    def run(self):
        while True:
            item = self.queue.get()
            self._save_data(item)
            self.queue.task_done()

    def _save_data(self, item):
            with self.lock:
                try:
                    item.to_sql('cron_dailyquote', self.db_engine, if_exists='append', index=False)
                except Exception, e:  # 如果是新股,则有可能df_h_data是空对象,因此需要跳过此类情况不处理
                    print str(e)

  • 定义主线程
from Queue import Queue
stock_queue = Queue()
data_queue = Queue()
lock = threading.Lock()
def main():
    '''
    用于测试多线程读取数据
    :return:
    '''
    #获取环境变量,取得相应的环境配置,上线时不需要再变更代码
    global stock_queue
    global data_queue
    config=os.getenv('FLASK_CONFIG')
    if config == 'default':
        db_url='mysql+pymysql://root:******@localhost:3306/python?charset=utf8mb4'
    else:
        db_url='mysql+pymysql://root:******@localhost:3306/test?charset=utf8mb4'
    db_engine = create_engine(db_url, echo=True)
    conn = db_engine.connect()
    #TODO 增加ts.get_stock_basics()报错的处理,如果取不到信息则直接用数据库中的股票代码信息,来获取增量信息
    #TODO 增加一个标志,如果一个股票代码的最新日期不是最新日期,则需标记该代码不需要重新获取数据,即记录该股票更新日期到了最新工作日,
    df = ts.get_stock_basics()
    df.to_sql('stock_basics',db_engine,if_exists='replace',dtype={'code': CHAR(6)})
    # 计算距离当前日期最大的工作日,以便每日定时更新
    today=time.strftime('%Y-%m-%d',time.localtime(time.time()))
    s1=("select max(t.date) from cron_tradeday t where flag=1 and t.date <='"+ today+"'")
    selectsql=text(s1)
    maxTradeay = conn.execute(selectsql).first()
    # 计算每只股票当前加载的最大工作日期,支持重跑
    s = ("select secucode,max(t.tradeday) from cron_dailyquote t group by secucode ")
    selectsql = text(s)
    result = conn.execute(selectsql)  # 执行查询语句
    df_result = pd.DataFrame(result.fetchall())
    df_result.columns=['stockcode','max_tradeday']
    df_result.set_index(df_result['stockcode'],inplace=True)
    # 开始归档前复权历史行情至数据库当中,以便可以方便地计算后续选股模型

    for i in range(3):#使用3个线程
        t = ThreadRead(stock_queue, data_queue)
        t.setDaemon(True)
        t.start()
    for code in set(list(df.index)):
        try:
            #如果当前股票已经是最新的行情数据,则直接跳过,方便重跑。
            #print maxTradeay[0],df_result.loc[code].values[1]
            if df_result.loc[code].values[1] == maxTradeay[0]:
                continue
            startdate=getLastNdate(df_result.loc[code].values[1],1)
        except Exception, e:
            #如果某只股票没有相关的行情,则默认开始日期为2015年1月1日
            startdate='2015-01-01'
        item={}
        item['code']=code
        item['startdate']=startdate
        stock_queue.put(item) # 生成生产者任务队列
    for i in range(3):
        t = ThreadWrite(data_queue, lock, db_engine)
        t.setDaemon(True)
        t.start()
    stock_queue.join()
    data_queue.join()

  • 执行效果
    原本需要2,3个小时才能执行完成的每日复权行情增量落地,有效缩短至了1小时以内,这里线程数并不上越多越好,由于复权行情读的是新浪接口,在高并发情况下会返回HTTP 503服务器过载的错误,另外高并发下可能需要使用IP代理池,下载的时段也需要尝试多个时段进行。初次尝试,如果有更好的方法或者哪里有考虑不周的地方欢迎留言建议或者指正。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,099评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,473评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,229评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,570评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,427评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,335评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,737评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,392评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,693评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,730评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,512评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,349评论 3 314
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,750评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,017评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,290评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,706评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,904评论 2 335

推荐阅读更多精彩内容

  • # Python 资源大全中文版 我想很多程序员应该记得 GitHub 上有一个 Awesome - XXX 系列...
    aimaile阅读 26,425评论 6 428
  • GitHub 上有一个 Awesome - XXX 系列的资源整理,资源非常丰富,涉及面非常广。awesome-p...
    若与阅读 18,595评论 4 418
  • 魏晋玄学是继两汉经学之后的中国古代哲学思想的一个发展阶段和形态,它提出并讨论了有,无,独化等本原,本体类问题。魏晋...
    张静年阅读 709评论 0 6
  • 喜欢你的双眸 像月光照亮泉水 喜欢你的温柔 像荷叶间的氤氲 我会叮嘱南风 希望下一次的吹过 ...
    冷川远树阅读 188评论 0 0
  • 一般说道演讲,大家都会潜意识认为,哇塞,好遥远的事情,我没有机会上台演讲。但是却是在日常生活中,工作上,我们需要和...
    江子牙的鱼塘阅读 172评论 0 0