实现多任务的方式
多线程
多进程
协程
多线程+多进程
为什么你能够实现多任务?
并行:同时发起,同时执行,多进程,进程:cpu分配资源
并发:同时发起,单个执行,线程
在pyhton语言中,并不能真正意义上实现多线程,因为Cpython解释器有一个全局的GIL解释器锁, 来保证同一时刻只有一个线程在执行
线程:
是cpu执行的一个基本单元,暂用的资源非常少,并且线程和线程之间的资源是共享的,线程是依赖于进程而存在的,多线程一般适用于I/O密集型操作,线程的执行是无序的
线程的创建和使用
from threading import Thread
import threading,time
data = []
def download_image(url,num):
"""
下载图片
:param url:
:param num:
:return:
"""
global data
time.sleep(2)
print(url, num)
data.append(num)
def read_data():
global data
for i in data:
print(i)
if __name__ == '__main__':
# 获取当前线程的名称:threading.currentThread().name
print('主线程开始',threading.currentThread().name)
# 创建一个子线程
"""
target=None, 线程要执行的目标函数
name=None, 创建线程的时候指定线程名称
args=():为目标函数传参数,对于的是元祖类型(tuple)
"""
thread_sub1 = Thread(
target=download_image,
name='下载线程',
args=('https://f10.baidu.com/it/u=3931984114,750350835&fm=72',1))
thread_sub2 = Thread(
target=read_data,
name='读取'
)
# 是否开启守护进程
# daemon = False,在主线程结束的时候会检测子线程人物是否结束,
# 如果子线程的任务没有结束,则会让子线程正常结束任务
# daemon = True,如果子线程中的任务没有结束会跟主线程一起结束
# thread_sub1.daemon = True
# 启动线程
thread_sub1.start()
thread_sub1.join()
thread_sub2.start()
# join():阻塞,等待子线程中的任务结束再回到主线程中继续执行
thread_sub2.join()
print('主线程结束',threading.currentThread().name)
- 队列
# 队列
import queue
# 创建一个队列,指定最大数据量
dataqueue = queue.Queue(maxsize=40)
for i in range(0,50):
# 存值,没有存满情况下存值
if not dataqueue.full():
dataqueue.put(i)
# 判断是否为空
dataqueue.empty()
# 判断是否存满
dataqueue.full()
# 长度
dataqueue.qsize()
# 取值,FIFO:先进先出,先存的哪个就先取哪个
dataqueue.get()
# li
# 创建线程执行下载任务
for i in range(1, 10):
taskQueue.put(i)
threadName = ['下载线程1号','下载线程2号','下载线程3号','下载线程4号']
crawl_thread = []
for name in threadName:
# 创建线程
thread_crawl = threading.Thread(target=download_page_data,
name=name,
args=(taskQueue,dataQueue)
)
crawl_thread.append(thread_crawl)
# 开启线程
thread_crawl.start()
# 让所有的爬取线程执行完毕,再回到主线程中继续执行
for thread in crawl_thread:
thread.join()
# 加线程锁
lock = threading.Lock()
lock.acquire() # 加锁
lock.release() # 解锁
- 使用队列做一个简单的爬虫--jobbole
import queue,requests,threading,json
from lxml.html import etree
# 注意:队列是线程之间数据的交换形式,为队列在线程间,是线程安全的
"""
1.创建一个任务队列:存放的是爬取的url地址
2. 创建爬取线程,执行任务下载
3. 创建数据队列,存放爬取线程获取到的页面源码
4.创建解析线程:解析html源码,提取目标数据,数据持久化
"""
# 获取jobbole的文章列表
# http://blog.jobbole.com/all-posts/page/1/
# http://blog.jobbole.com/all-posts/page/2/
def download_page_data(taskQueue,dataQueue):
"""
执行下载任务
:param taskQueue: 从任务队列里面取出任务
:param dataQueue: 将获取到的页面源码存到dataQueue队列中
:return:
"""
while not taskQueue.empty():
page = taskQueue.get()
print('正在下载'+str(page)+ '页',threading.currentThread().name)
full_url = 'http://blog.jobbole.com/all-posts/page/{}/'.format(str(page))
req_header = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:64.0) Gecko/20100101 Firefox/64.0'
}
response = requests.get(full_url,headers=req_header)
if response.status_code == 200:
# 将获取到的页面源码存到dataQueue队列里
dataQueue.put(response.text)
else:
taskQueue.put(page)
def parse_data(dataQueue,lock):
"""
解析数据,从dataQueue中取出的数据进行解析
:param dataQueue:
:return:
"""
while not dataQueue.empty():
print('正在解析',threading.currentThread().name)
html = data = dataQueue.get()
html_element = etree.HTML(html)
articles = html_element.xpath('//div[@class="post floated-thumb"]')
for article in articles:
articleInfo = {}
# 标题
articleInfo['title'] = article.xpath('.//a[@class="archive-title"]/text()')[0]
# 封面
img_element = article.xpath('.//div[@class="post-thumb"]/a/img')
if len(img_element) > 0:
articleInfo['coverImage'] = img_element[0].xpath('./@src')[0]
else:
articleInfo['coverImage'] = '暂无图片'
p_as = article.xpath('.//div[@class="post-meta"]/p[1]//a')
if len(p_as) >2:
# tag类型
articleInfo['tag'] = p_as[1].xpath('./text()')[0]
# 评论量
articleInfo['commentNum'] = p_as[2].xpath('./text()')[0]
else:
# tag类型
articleInfo['tag'] = p_as[1].xpath('./text()')[0]
# 评论量
articleInfo['commentNum'] = '0'
# 简介
articleInfo['content'] = article.xpath('.//span[@class="excerpt"]/p/text()')
# 时间
articleInfo['publishTime'] = ''.join(article.xpath('.//div[@class="post-meta"]/p[1]/text()')).replace('\n','').replace(' ','').replace('\r','').replace('.','')# //text()当前标签下的所有文本,包括子标签
# lock.acquire() # 加锁
# with open('jobbole.json','a+',encoding='utf-8') as file:
# json_str = json.dumps(articleInfo,ensure_ascii=False) + '\n'
# file.write(json_str)
# lock.release() #解锁
# print(articleInfo)
if __name__ == '__main__':
# 创建任务队列
taskQueue = queue.Queue()
for i in range(1,10):
taskQueue.put(i)
# 创建数据队列
dataQueue = queue.Queue()
# 创建线程执行下载任务
threadName = ['下载线程1号','下载线程2号','下载线程3号','下载线程4号']
crawl_thread = []
for name in threadName:
# 创建线程
thread_crawl = threading.Thread(target=download_page_data,
name=name,
args=(taskQueue,dataQueue)
)
crawl_thread.append(thread_crawl)
# print(crawl_thread)
# 开启线程
thread_crawl.start()
# 让所有的爬取线程执行完毕,再回到主线程中继续执行
for thread in crawl_thread:
thread.join()
# 加线程锁
lock = threading.Lock()
# 创建解析线程,从dataQueue队列中取出页面源码进行解析
threadName = ['解析线程1号', '解析线程2号', '解析线程3号','解析线程4号']
parse_thread = []
for name in threadName:
# 创建线程
thread_parse = threading.Thread(target=parse_data,
name=name,
args=(dataQueue,lock)
)
parse_thread.append(thread_crawl)
# 开启线程
thread_parse.start()
# 让所有的爬取线程执行完毕,再回到主线程中继续执行
for thread in parse_thread:
thread.join()
- 线程池
from concurrent.futures import ThreadPoolExecutor
# max_workers:指定线程池中的线程的数量
pool = ThreadPoolExecutor(max_workers=1000)
# 在线程池中添加任务
handler = pool.submit(目标函数,参数)
# 设置回调方法,当某个线程执行结束执行回调结果
handler.add_done_callback(download_data)
def download_done(futures):
# 返回回调结果
print(futures.result())
# 同join()
pool.shutdown()
- 线程池爬虫
from concurrent.futures import ThreadPoolExecutor
import requests,threading,json
from lxml.html import etree
# 线程池的目的:创建一个线程池,里面有指定数量的线程,让线程执行任务
def download_data(page):
print(page)
print('正在下载' + str(page) + '页',threading.currentThread().name)
full_url = 'http://blog.jobbole.com/all-posts/page/{}/'.format(str(page))
req_header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:64.0) Gecko/20100101 Firefox/64.0'
}
response = requests.get(full_url, headers=req_header)
if response.status_code == 200:
# 将获取到的页面源码存到dataQueue队列里
print('请求成功')
return response.text,response.status_code
def download_done(futures):
# 返回回调结果
print(futures.result())
# 可以在这里做数据解析
html = futures.result()[0]
html_element = etree.HTML(html)
articles = html_element.xpath('//div[@class="post floated-thumb"]')
for article in articles:
articleInfo = {}
# 标题
articleInfo['title'] = article.xpath('.//a[@class="archive-title"]/text()')[0]
# 封面
img_element = article.xpath('.//div[@class="post-thumb"]/a/img')
if len(img_element) > 0:
articleInfo['coverImage'] = img_element[0].xpath('./@src')[0]
else:
articleInfo['coverImage'] = '暂无图片'
p_as = article.xpath('.//div[@class="post-meta"]/p[1]//a')
if len(p_as) > 2:
# tag类型
articleInfo['tag'] = p_as[1].xpath('./text()')[0]
# 评论量
articleInfo['commentNum'] = p_as[2].xpath('./text()')[0]
else:
# tag类型
articleInfo['tag'] = p_as[1].xpath('./text()')[0]
# 评论量
articleInfo['commentNum'] = '0'
# 简介
articleInfo['content'] = article.xpath('.//span[@class="excerpt"]/p/text()')
# 时间
articleInfo['publishTime'] = ''.join(article.xpath('.//div[@class="post-meta"]/p[1]/text()')).replace('\n',
'').replace(
' ', '').replace('\r', '').replace('.', '') # //text()当前标签下的所有文本,包括子标签
with open('jobbole.json', 'a+',encoding='utf-8') as file:
json_str = json.dumps(articleInfo, ensure_ascii=False) + '\n'
file.write(json_str)
if __name__ == '__main__':
# 创建线程池
# max_workers:指定线程池中的线程的数量
pool = ThreadPoolExecutor(max_workers=10)
for i in range(1,201):
# 线程池中添加任务
handler = pool.submit(download_data,i)
# 设置回调方法,当某个线程执行结束执行回调结果
handler.add_done_callback(download_data)
# 执行shutdown()内部是执行join()方法
pool.shutdown()
进程
- 队列
from multiprocessing import Process,Queue
import os
#maxsize=-1:设置队列中嫩够存储的最大元素的个数
data_queue = Queue(maxsize=10)
def write_data(num,data_queue):
print(num)
#global data_queue
for i in range(0,num):
data_queue.put(i)
print(os.getpid(),data_queue.full())
def read_data(data_queue):
print('正在读取',os.getpid())
#global data_queue
print(data_queue.qsize())
for i in range(0,data_queue.qsize()):
print(data_queue.get())
if __name__ == '__main__':
#os.getpid()获取进程的id
print('主进程开启',os.getpid())
#创建子进程
"""
target=None,:设置进程要执行的函数
name=None,:设置进程的名称
args=(), :给进程执行的函数传递参数(tuple类型)
kwargs={} :给进程执行的函数传递参数(字典类型)
"""
process1 = Process(target=write_data,args=(10,data_queue))
#使用start()启动进程
process1.start()
#timeout=5:设置阻塞时间
process1.join()
process2 = Process(target=read_data,args=(data_queue,))
# 使用start()启动进程
process2.start()
# timeout=5:设置阻塞时间
process2.join()
print('主进程结束',os.getpid())
- 队列爬虫
"""
1.创建任务队列
2.创建爬取进程,执行爬取任务
3.创建数据队列
4.创建解析线程,解析获取的数据
"""
# 案例网站:世纪家园
# 武汉地区的活动:(第一页数据是静态页面,第二页之后是动态加载的)
# http://date.jiayuan.com/eventslist_new.php?
# page=1&city_id=4201&shop_id=33 (第一页)
# http://date.jiayuan.com/eventslist_new.php?
# page=2&city_id=4201&shop_id=33 (第二页)
# http://date.jiayuan.com/eventslist_new.php?
# page=3&city_id=4201&shop_id=33 (第三页)
"""
_gscu_1380850711=43812116hs5dyy11; accessID=20181222071935501079;
jy_refer=www.baidu.com; _gscbrs_1380850711=1;
PHPSESSID=9202a7e752f801a49a5747832520f1da;
plat=date_pc; DATE_FROM=daohang;
SESSION_HASH=61e963462c6b312ee1ffacf151ffaa028477217d;
user_access=1; uv_flag=124.64.18.38;
DATE_SHOW_LOC=4201; DATE_SHOW_SHOP=33
"""
# http://date.jiayuan.com/eventslist_new.php?
# page=2&city_id=31&shop_id=15
"""
_gscu_1380850711=43812116hs5dyy11; accessID=20181222071935501079;
jy_refer=www.baidu.com; _gscbrs_1380850711=1;
PHPSESSID=9202a7e752f801a49a5747832520f1da;
plat=date_pc; DATE_FROM=daohang;
SESSION_HASH=61e963462c6b312ee1ffacf151ffaa028477217d;
user_access=1; uv_flag=124.64.18.38;
DATE_SHOW_LOC=31; DATE_SHOW_SHOP=15
"""
from multiprocessing import Process,Queue
import requests,re,json
from lxml.html import etree
import time
def down_load_page_data(taskQueue,dataQueue):
"""
执行任务的下载
:param taskQueue:
:param dataQueue:
:return:
"""
sumTime = 0
isContinue = True
while isContinue:
if not taskQueue.empty():
sumTime = 0
url = taskQueue.get()
response,cur_page = download_page_data(url)
data_dict = {'data':response.text,'page':cur_page}
dataQueue.put(data_dict)
#获取下一页
if cur_page != 1:
print('====',cur_page)
if isinstance(response.json(),list):
next_page = cur_page+1
next_url = re.sub('page=\d+','page='+str(next_page),url)
taskQueue.put(next_url)
else:
print('已获取到'+str(cur_page)+'页','没有数据了',response.json())
pass
elif cur_page == 1:
next_page = cur_page + 1
next_url = re.sub('page=\d+', 'page=' + str(next_page), url)
taskQueue.put(next_url)
else:
#数据队列中没有任务了
time.sleep(0.001)
sumTime = sumTime + 1
if sumTime > 5000:
print('跳出循环')
isContinue = False
break
def download_page_data(url):
"""
下载每一个分页的数据
:param url: 每一个分页的url地址
:return:
"""
#http://date.jiayuan.com/eventslist_new.php?
# page=1&city_id=4201&shop_id=33
pattern = re.compile('.*?page=(\d+)&city_id=(\d+)&shop_id=(\d+)')
result = re.findall(pattern,url)[0]
cur_page = result[0]
DATE_SHOW_LOC = result[1]
DATE_SHOW_SHOP = result[2]
print(cur_page,DATE_SHOW_SHOP,DATE_SHOW_LOC)
cookie = """_gscu_1380850711=43812116hs5dyy11; accessID=20181222071935501079; jy_refer=www.baidu.com; _gscbrs_1380850711=1; PHPSESSID=9202a7e752f801a49a5747832520f1da; plat=date_pc; DATE_FROM=daohang; SESSION_HASH=61e963462c6b312ee1ffacf151ffaa028477217d; user_access=1; uv_flag=124.64.18.38; DATE_SHOW_LOC=%s; DATE_SHOW_SHOP=%s""" % (DATE_SHOW_LOC,DATE_SHOW_SHOP)
# print(cookie)
req_header = {
'User-Agent':'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36',
'Cookie':cookie,
'Referer':'http://date.jiayuan.com/eventslist.php',
}
# cookie_dict = {sub_str.split('=')[0]:sub_str.split('=')[1] for sub_str in cookie.split('; ')}
# print(cookie_dict)
#cookies(cookiejar object or dict)
response = requests.get(url,headers=req_header)
if response.status_code == 200:
print('第'+cur_page+'页获取成功',DATE_SHOW_SHOP,DATE_SHOW_LOC)
return response,int(cur_page)
def parse_page_data(dataQueue):
"""
解析进程解析数据
:param dataQueue:
:return:
"""
while not dataQueue.empty():
data = dataQueue.get()
page = data['page']
html = data['data']
if page == 1:
print('解析第一页数据,静态页面')
html_element = etree.HTML(html)
hot_active = html_element.xpath('//div[@class="hot_detail fn-clear"]')
for hot_div in hot_active:
# 活动详情的url地址
full_detail_url = 'http://date.jiayuan.com' + hot_div.xpath('.//h2[@class="hot_title"]/a/@href')[0]
response = download_detail_data(full_detail_url)
parse_detail_data(response)
more_active = html_element.xpath('//ul[@class="review_detail fn-clear t-activiUl"]/li')
for more_li in more_active:
# 活动详情的url地址
full_detail_url = 'http://date.jiayuan.com' + more_li.xpath('.//a[@class="review_link"]/@href')[0]
response = download_detail_data(full_detail_url)
parse_detail_data(response)
else:
print('解析第'+str(page)+'数据','非静态页面')
#使用json.loads()将json字符串转换为python数据类型
json_obj = json.loads(html)
if isinstance(json_obj, list):
# 是列表,说明得到的是正确的数据,
print('正在解析数据')
for sub_dict in json_obj:
id = sub_dict['id']
#http://date.jiayuan.com/activityreviewdetail.php?id=11706
full_detail_url = 'http://date.jiayuan.com/activityreviewdetail.php?id=%s' % id
response = download_detail_data(full_detail_url)
parse_detail_data(response)
def download_detail_data(url):
"""
根据活动详情的url地址发起请求
:param url:
:return:
"""
req_header = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36',
'Cookie': '_gscu_1380850711=43812116hs5dyy11; accessID=20181222071935501079; jy_refer=www.baidu.com; _gscbrs_1380850711=1; PHPSESSID=9202a7e752f801a49a5747832520f1da; plat=date_pc; DATE_FROM=daohang; SESSION_HASH=61e963462c6b312ee1ffacf151ffaa028477217d; user_access=1; uv_flag=124.64.18.38; DATE_SHOW_LOC=50; DATE_SHOW_SHOP=5',
'Referer': 'http://date.jiayuan.com/eventslist.php',
}
response = requests.get(url, headers=req_header)
if response.status_code == 200:
print('详情页面获取成功',response.url)
return response
def parse_detail_data(response):
"""
解析活动详情
:param response:
:return:
"""
html_element = etree.HTML(response.text)
# 创建一个字典,存放获取的数据
item = {}
# 活动标题
item['title'] = ''.join(html_element.xpath('//h1[@class="detail_title"]/text()')[0])
# 活动时间
item['time'] = ','.join(
html_element.xpath('//div[@class="detail_right fn-left"]/ul[@class="detail_info"]/li[1]//text()')[0])
# 活动地址
item['adress'] = html_element.xpath('//ul[@class="detail_info"]/li[2]/text()')[0]
# 参加人数
item['joinnum'] = html_element.xpath('//ul[@class="detail_info"]/li[3]/span[1]/text()')[0]
# 预约人数
item['yuyue'] = html_element.xpath('//ul[@class="detail_info"]/li[3]/span[2]/text()')[0]
# 介绍
item['intreduces'] = html_element.xpath('//div[@class="detail_act fn-clear"][1]//p[@class="info_word"]/span[1]/text()')[0]
# 提示
item['point'] = html_element.xpath('//div[@class="detail_act fn-clear"][2]//p[@class="info_word"]/text()')[0]
# 体验店介绍
item['introductionStore'] = ''.join(
html_element.xpath('//div[@class="detail_act fn-clear"][3]//p[@class="info_word"]/text()'))
# 图片连接
item['coverImage'] = html_element.xpath('//div[@class="detail_left fn-left"]/img/@data-original')[0]
with open('shijijiyua.json','a+') as file:
json_str = json.dumps(item,ensure_ascii=False)+'\n'
file.write(json_str)
if __name__ == '__main__':
#创建任务队列
taskQueue = Queue()
#设置起始任务
taskQueue.put('http://date.jiayuan.com/eventslist_new.php?page=1&city_id=4201&shop_id=33')
taskQueue.put('http://date.jiayuan.com/eventslist_new.php?page=1&city_id=31&shop_id=15')
taskQueue.put('http://date.jiayuan.com/eventslist_new.php?page=1&city_id=3702&shop_id=42')
taskQueue.put('http://date.jiayuan.com/eventslist_new.php?page=1&city_id=50&shop_id=5')
#创建数据队列
dataQueue = Queue()
#创建进程爬取任务
for i in range(0,3):
process_crawl = Process(
target=down_load_page_data,
args=(taskQueue,dataQueue)
)
process_crawl.start()
time.sleep(10)
#创建解析进程
for i in range(0,3):
process_parse = Process(
target=parse_page_data,
args=(dataQueue,)
)
process_parse.start()
- 进程池
from concurrent.futures import ProcessPoolExecutor
import os
"""
def download_page_data(page):
print(page,os.getpid())
return '下载完成'+str(page),page
def download_done(futures):
result = futures.result()
print(result)
next_page = int(result[1])+1
handler = pool.submit(download_page_data,next_page)
handler.add_done_callback(download_done)
if __name__ == '__main__':
#创建进程池
pool = ProcessPoolExecutor(4)
for page in range(0,200):
hanlder = pool.submit(download_page_data,page)
#回调函数的设置,看自己是否需要
hanlder.add_done_callback(download_done)
#cannot schedule new futures after shutdown
# pool.shutdown()
"""
#方式二
from multiprocessing import Pool
def download_page_data(page):
print(page,os.getpid())
return '下载完成'+str(page),page
def done(futures):
print(futures)
if __name__ == '__main__':
#创建进程池
pool = Pool(4)
for page in range(0,200):
# pool.apply_async() 异步非阻塞添加任务
# pool.apply() 同步的方式添加任务
# func, 要执行的方法(函数)
# args=(),给函数传递的参数
#callback = None,成功的回调
#error_callback = None,执行错误的回调
pool.apply_async(download_page_data,args=(page,),callback=done)
pool.close() #执行close后不可以再添加任务了
pool.join()
- 进程池爬虫
from concurrent.futures import ProcessPoolExecutor
import requests
import time,re,json
from lxml.html import etree
def down_load_page_data(url):
"""
执行任务的下载
:param url
:return:
"""
response,cur_page = download_page_data(url)
data_dict = {'data':response.text,'page':cur_page}
#获取下一页
if cur_page != 1:
if isinstance(response.json(),list):
next_page = cur_page+1
next_url = re.sub('page=\d+','page='+str(next_page),url)
else:
print('已获取到'+str(cur_page)+'页','没有数据了',response.json())
next_url = None
pass
elif cur_page == 1:
next_page = cur_page + 1
next_url = re.sub('page=\d+', 'page=' + str(next_page), url)
print('====', cur_page)
return data_dict,next_url
def download_page_data(url):
"""
下载每一个分页的数据
:param url: 每一个分页的url地址
:return:
"""
#http://date.jiayuan.com/eventslist_new.php?
# page=1&city_id=4201&shop_id=33
pattern = re.compile('.*?page=(\d+)&city_id=(\d+)&shop_id=(\d+)')
result = re.findall(pattern,url)[0]
cur_page = result[0]
DATE_SHOW_LOC = result[1]
DATE_SHOW_SHOP = result[2]
print(cur_page,DATE_SHOW_SHOP,DATE_SHOW_LOC)
cookie = """_gscu_1380850711=43812116hs5dyy11; accessID=20181222071935501079; jy_refer=www.baidu.com; _gscbrs_1380850711=1; PHPSESSID=9202a7e752f801a49a5747832520f1da; plat=date_pc; DATE_FROM=daohang; SESSION_HASH=61e963462c6b312ee1ffacf151ffaa028477217d; user_access=1; uv_flag=124.64.18.38; DATE_SHOW_LOC=%s; DATE_SHOW_SHOP=%s""" % (DATE_SHOW_LOC,DATE_SHOW_SHOP)
# print(cookie)
req_header = {
'User-Agent':'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36',
'Cookie':cookie,
'Referer':'http://date.jiayuan.com/eventslist.php',
}
# cookie_dict = {sub_str.split('=')[0]:sub_str.split('=')[1] for sub_str in cookie.split('; ')}
# print(cookie_dict)
#cookies(cookiejar object or dict)
response = requests.get(url,headers=req_header)
if response.status_code == 200:
print('第'+cur_page+'页获取成功',DATE_SHOW_SHOP,DATE_SHOW_LOC)
return response,int(cur_page)
def parse_page_data(futures):
"""
step1:获取到下一页的url地址,继续网进程池中添加任务
strp2:获取到分页的页面源码,进行数据的解析
:param futures:
:return:
"""
result = futures.result()
data = result[0]
next_page_url = result[1]
print(data,next_page_url)
if next_page_url:
print('正在天加任务',next_page_url)
handler = page_pool.submit(down_load_page_data, next_page_url)
handler.add_done_callback(parse_page_data)
page = data['page']
html = data['data']
# 创建进程池(获取活动详情的页面源码)
detail_pool = ProcessPoolExecutor(3)
if page == 1:
print('解析第一页数据,静态页面')
html_element = etree.HTML(html)
hot_active = html_element.xpath('//div[@class="hot_detail fn-clear"]')
for hot_div in hot_active:
# 活动详情的url地址
full_detail_url = 'http://date.jiayuan.com' + hot_div.xpath('.//h2[@class="hot_title"]/a/@href')[0]
detail_handler = detail_pool.submit(download_detail_data,full_detail_url)
detail_handler.add_done_callback(parse_detail_data)
more_active = html_element.xpath('//ul[@class="review_detail fn-clear t-activiUl"]/li')
for more_li in more_active:
# 活动详情的url地址
full_detail_url = 'http://date.jiayuan.com' + more_li.xpath('.//a[@class="review_link"]/@href')[0]
detail_handler = detail_pool.submit(download_detail_data, full_detail_url)
detail_handler.add_done_callback(parse_detail_data)
else:
print('解析第' + str(page) + '数据', '非静态页面')
# 使用json.loads()将json字符串转换为python数据类型
json_obj = json.loads(html)
if isinstance(json_obj, list):
# 是列表,说明得到的是正确的数据,
print('正在解析数据')
for sub_dict in json_obj:
id = sub_dict['id']
# http://date.jiayuan.com/activityreviewdetail.php?id=11706
full_detail_url = 'http://date.jiayuan.com/activityreviewdetail.php?id=%s' % id
detail_handler = detail_pool.submit(download_detail_data, full_detail_url)
detail_handler.add_done_callback(parse_detail_data)
def download_detail_data(url):
"""
根据活动详情的url地址发起请求
:param url:
:return:
"""
req_header = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36',
'Cookie': '_gscu_1380850711=43812116hs5dyy11; accessID=20181222071935501079; jy_refer=www.baidu.com; _gscbrs_1380850711=1; PHPSESSID=9202a7e752f801a49a5747832520f1da; plat=date_pc; DATE_FROM=daohang; SESSION_HASH=61e963462c6b312ee1ffacf151ffaa028477217d; user_access=1; uv_flag=124.64.18.38; DATE_SHOW_LOC=50; DATE_SHOW_SHOP=5',
'Referer': 'http://date.jiayuan.com/eventslist.php',
}
response = requests.get(url, headers=req_header)
if response.status_code == 200:
print('详情页面获取成功',response.url)
return response
def parse_detail_data(futures):
"""
解析活动详情
:param response:
:return:
"""
response = futures.result()
html_element = etree.HTML(response.text)
# 创建一个字典,存放获取的数据
item = {}
# 活动标题
item['title'] = ''.join(html_element.xpath('//h1[@class="detail_title"]/text()')[0])
# 活动时间
item['time'] = ','.join(
html_element.xpath('//div[@class="detail_right fn-left"]/ul[@class="detail_info"]/li[1]//text()')[0])
# 活动地址
item['adress'] = html_element.xpath('//ul[@class="detail_info"]/li[2]/text()')[0]
# 参加人数
item['joinnum'] = html_element.xpath('//ul[@class="detail_info"]/li[3]/span[1]/text()')[0]
# 预约人数
item['yuyue'] = html_element.xpath('//ul[@class="detail_info"]/li[3]/span[2]/text()')[0]
# 介绍
item['intreduces'] = html_element.xpath('//div[@class="detail_act fn-clear"][1]//p[@class="info_word"]/span[1]/text()')[0]
# 提示
item['point'] = html_element.xpath('//div[@class="detail_act fn-clear"][2]//p[@class="info_word"]/text()')[0]
# 体验店介绍
item['introductionStore'] = ''.join(
html_element.xpath('//div[@class="detail_act fn-clear"][3]//p[@class="info_word"]/text()'))
# 图片连接
item['coverImage'] = html_element.xpath('//div[@class="detail_left fn-left"]/img/@data-original')[0]
with open('shijijiyua.json','a+') as file:
json_str = json.dumps(item,ensure_ascii=False)+'\n'
file.write(json_str)
if __name__ == '__main__':
#创建一个进程池,执行分页任务下载
page_pool = ProcessPoolExecutor(4)
start_urls = [
'http://date.jiayuan.com/eventslist_new.php?page=1&city_id=4201&shop_id=33',
'http://date.jiayuan.com/eventslist_new.php?page=1&city_id=31&shop_id=15',
'http://date.jiayuan.com/eventslist_new.php?page=1&city_id=3702&shop_id=42',
'http://date.jiayuan.com/eventslist_new.php?page=1&city_id=50&shop_id=5',
]
for url in start_urls:
handler = page_pool.submit(down_load_page_data,url)
handler.add_done_callback(parse_page_data)