背景:数据平台的数据存储用的elasticsearch,之前用的是scroll分页,一次1000条 ,网上文档大多是采用这个查询方式,在数据查询返回量达到几十万时,请求直接失败。所以考虑用多线程的方式优化,scroll分页要求每次游标要连续,不能叠加多线程方法,这里我们采用from-size分页+多线程,上代码。
import queue
import threading
import pandas as pd
from pandas.io.json import json_normalize
from elasticsearch import Elasticsearch
q =queue.Queue()
results,threads,subject,ids,threads1=[],[],[],[],[]
tb=subject=pd.DataFrame()
es = Elasticsearch(['*****'], port=*****) ##连接es
index = 'xxxxx'
def es_to_df(results):##es查询结果解析成df
###具体可见:[https://www.jianshu.com/p/a84772b994a0](https://www.jianshu.com/p/a84772b994a0)
tb = pd.DataFrame(results)
source_list = tb['_source'].to_list()
data = tb.loc[0, '_source']
lst = []
mixlst = []
for key, value in data.items():
if isinstance(value, list):
lst.append(key)
elif isinstance(value, dict):
for j in data[key].keys():
dd2 = []
dd2.append(key)
dd2.append(j)
mixlst.append(dd2)
else:
mixlst.append(key)
big_df = json_normalize(source_list, lst, mixlst)
big_df.columns = [i.split('.')[1] if len(i.split('.')) > 1 else i for i in big_df.columns]
return big_df
def get_total(): ###获取结果总数,只需替换query部分
query_model="""
{
"from":%s,
"size": 1000,
"track_total_hits": "true",
"query": {
"bool": {
"must": [{"range": {"xxxx": {"gt": "7"}}}]
}
}
}
"""
re=es.search(index=index,body=query_model%(0))
total = re['hits']['total']['value']
return total
def get_search_re(i):##分页查询模版
query_model="""
{
"from":%s,
"size": 1000,
"track_total_hits": "true",
"query": {
"bool": {
"must": [{"range": {"xxxx": {"gt": "7"}}}]
}
}
}
"""
re=es.search(index=index,body=query_model%(i))
result = re['hits']['hits']
total = re['hits']['total']['value']
q.put(result)
total=get_total()
for i in range(0,total,1000):
t = threading.Thread(target=get_search_re, args=(i,))
t.start()
threads.append(t)
for i in threads:
i.join()
while not q.empty():
results = results + q.get()
re = es_to_df(results)
⛽️