- 需要观察房天下url的构造,本次爬取的是新房和二手房两个栏目的具体字段。
- 涉及到的知识点有url的拼接,具体字段的解析清洗,页面不规整的情况下,怎样提取。
- 分布式部署的相关操作
- 在爬的时候建议网页延迟多一些。
fangtianxia.py文件
import scrapy,re
from fang.items import NewHouseItem,ESFHouseItem
class FangtianxiaSpider(scrapy.Spider):
name = 'fangtianxia'
allowed_domains = ['fang.com']
start_urls = ['http://www.fang.com/SoufunFamily.htm']
def parse(self, response):
trs = response.xpath('//div[@class="outCont"]//tr')
province = None # 首先设为没有值,下方判断有值在赋给province
for tr in trs:
tds = tr.xpath('.//td[not(@class)]')
province_td = tds[0] # 提取省份,由于省份不是每一行都有的,所以要过滤一下
province_text = province_td.xpath('.//text()').get() # 没有省份的那一行会有空格
province_text = re.sub(r'\s','',province_text) # 用sub替换一下,好判断
if province_text:
province = province_text # 如果有值,就赋给province
if '其它' in province: # 不提取海外的
continue
city_id = tds[1] # 接下来提取城市链接和城市名称
city_links = city_id.xpath('.//a')
for city_link in city_links:
city_url = city_link.xpath('.//@href').get()
city = city_link.xpath('.//text()').get()
# 构建新房和二手房的url
url_module = city_url.split('fang')
prefix = url_module[0]
domain = url_module[1]
# 北京特殊,特殊处理一下
if 'bj' in prefix:
newhouse_url = 'http://' + 'newhouse.fang' + domain + 'house/s/'
esf_url = 'http://' + 'esf.fang' + domain
else:
# 构建新房的url
newhouse_url = prefix + 'newhouse.fang' + domain + 'house/s/'
# 构建二手房的url
esf_url = prefix + 'esf.fang' + domain
# meta里面可以携带一些参数信息放到Request里面,在callback函数里面通过response获取
yield scrapy.Request(url=newhouse_url,callback=self.parse_newhouse,meta={'info':(province,city)})
yield scrapy.Request(url=esf_url,callback=self.parse_esf,meta={'info':(province,city)})
def parse_newhouse(self,response):
# 解析新房具体字段
# meta里面可以携带一些参数信息放到Request里面,在callback函数里面通过response获取
province,city = response.meta.get('info')
lis = response.xpath('//div[contains(@class,"nl_con")]/ul/li')
for li in lis:
name = li.xpath(".//div[contains(@class,'house_value')]//div[@class='nlcd_name']/a/text()").get()
if name:
name = re.sub(r"\s","",name)
house_type_list = li.xpath('.//div[contains(@class,"house_type")]/a/text()').getall()
#house_type_list = list(map(lambda x:x.replace(' ',''),house_type_list))
house_type_list = list(map(lambda x:re.sub(r'/s','',x),house_type_list))
rooms = list(filter(lambda x:x.endswith('居'),house_type_list))
area = ''.join(li.xpath('.//div[contains(@class,"house_type")]/text()').getall())
area = re.sub(r'\s|-|/','',area)
address = li.xpath('.//div[@class="address"]/a/@title').get()
# district_text = ''.join(li.xpath('.//div[@class="address"]/a//text()').getall())
# district = re.search(r'.*\[(.+)\].*',district_text).group(1)
sale = li.xpath(".//div[contains(@class,'fangyuan')]/span/text()").get()
price = "".join(li.xpath(".//div[@class='nhouse_price']//text()").getall())
price = re.sub(r"\s|广告", "", price)
# 详情页url
origin_url = li.xpath(".//div[@class='nlcd_name']/a/@href").get()
item = NewHouseItem(name=name,rooms=rooms.get(),area=area,address=address,
sale=sale,price=price,origin_url=origin_url,province=province,city=city)
yield item
# 下一页
# next_url = response.xpath("//div[@class='page']//a[@class='next']/@href").get()
# if next_url:
# yield scrapy.Request(url=response.urljoin(next_url),
# callback=self.parse_newhouse,
# meta={'info': (provice, city)}
# )
def parse_esf(self, response):
# 二手房
province, city = response.meta.get('info')
dls = response.xpath("//div[@class='shop_list shop_list_4']/dl")
for dl in dls:
item = ESFHouseItem(province=province,city=city)
name = dl.xpath(".//span[@class='tit_shop']/text()").get()
if name:
infos = dl.xpath(".//p[@class='tel_shop']/text()").getall()
infos = list(map(lambda x: re.sub(r"\s", "", x), infos))
for info in infos:
if "厅" in info:
item["rooms"] = info
elif '层' in info:
item["floor"] = info
elif '向' in info:
item['toward'] = info
elif '㎡' in info:
item['area'] = info
elif '年建' in info:
item['year'] = re.sub("年建", "", info)
item['address'] = dl.xpath(".//p[@class='add_shop']/span/text()").get()
# 总价
item['price'] = "".join(dl.xpath(".//span[@class='red']//text()").getall())
# 单价
item['unit'] = dl.xpath(".//dd[@class='price_right']/span[2]/text()").get()
item['name'] = name
detail = dl.xpath(".//h4[@class='clearfix']/a/@href").get()
item['origin_url'] = response.urljoin(detail)
yield item
# 下一页
# next_url = response.xpath("//div[@class='page_al']/p/a/@href").get()
# if next_url:
# yield scrapy.Request(url=response.urljoin(next_url),
# callback=self.parse_esf,
# meta={'info': (provice, city)}
# )
item.py文件
import scrapy
from scrapy import Field
class NewHouseItem(scrapy.Item):
# define the fields for your item here like:
# name = scrapy.Field()
# 省份
province = Field()
# 城市
city = Field()
# 小区名字
name = Field()
# 价格
price = Field()
# 几居室,这是一个列表
rooms = Field()
# 面积
area = Field()
# 地址
address = Field()
sale = Field()
# 房天下详情url
origin_url = Field()
class ESFHouseItem(scrapy.Item):
# 省份
province = Field()
# 城市
city = Field()
# 小区名字
name = Field()
# 几室几厅
rooms = Field()
# 层
floor = Field()
# 朝向
toward = Field()
# 年代
year = Field()
# 地址
address = Field()
# 建筑面积
area = Field()
# 总价
price = Field()
# 单价
unit = Field()
# 详情页url
origin_url = Field()
settings.py文件
ROBOTSTXT_OBEY = False
DOWNLOAD_DELAY = 1
from fake_useragent import UserAgent
ua = UserAgent().random
DEFAULT_REQUEST_HEADERS = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en',
'User-Agent':ua
}
ITEM_PIPELINES = {
'fang.pipelines.FangPipeline': 300,
'fang.pipelines.MongoPipeline': 400,
}
MONGO_URI = 'localhost'
MONGO_DB = 'fangtianxia'
pipelines.py文件
from scrapy.exporters import JsonLinesItemExporter
class FangPipeline(object):
def __init__(self):
self.newhouse_fp = open('newhouse.json','wb')
self.esfhouse_fp = open('esfhouse.json','wb')
self.newhouse_exporter = JsonLinesItemExporter(self.newhouse_fp,ensure_ascii=False)
self.esfhouse_exporter = JsonLinesItemExporter(self.esfhouse_fp,ensure_ascii=False)
def process_item(self, item, spider):
self.newhouse_exporter.export_item(item)
self.esfhouse_exporter.export_item(item)
return item
def close_spider(self,spider):
self.newhouse_fp.close()
self.esfhouse_fp.close()
# 保存到mongodb
class MongoPipeline(object):
def __init__(self,mongo_uri,mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
@classmethod
def from_crawler(cls,crawler):
return cls(
mongo_uri = crawler.settings.get('MONGO_URI'),
mongo_db = crawler.settings.get('MONGO_DB')
)
def open_spider(self,spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def process_item(self,item,spider):
name = item.__class__.__name__
self.db[name].insert(dict(item))
return item
def close_spider(self,spider):
self.client.close()
改造成分布式爬虫
首先安装pip install scrapy-redis
要将一个Scrapy项目变成一个Scrapy-redis项目只需修改以下三点就可以了:
- 将爬虫的类从 scrapy.Spider 变成 scrapy_redis.spiders.RedisSpider;或者是从 scrapy.CrawlSpider 变成 scrapy_redis.spiders.RedisCrawlSpider。
拿上面的例子来说就是在 fangtianxia.py文件中
from scrapy_redis.spiders import RedisSpider
class FangtianxiaSpider(RedisSpider):
name = 'fangtianxia'
allowed_domains = ['fang.com']
# start_urls = ['http://www.fang.com/SoufunFamily.htm']
redis_key = "fang:start_urls"
- 将爬虫中的start_urls删掉。增加一个redis_key="xxx"。这个redis_key是为了以后在redis中控制爬虫启动的。爬虫的第一个url,就是在redis中通过这个发送出去的。
- 更改scrapy的调度器,用redis实现的调度器。url去重的工作也交由redis完成,爬取的数据共享一下,存储到redis。在配置文件中增加如下配置:
# Scrapy-Redis相关配置
# 确保request存储到redis中
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
# 确保所有爬虫共享相同的去重指纹
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
# 设置redis为item pipeline
ITEM_PIPELINES = {
'scrapy_redis.pipelines.RedisPipeline': 300
}
# 在redis中保持scrapy-redis用到的队列,不会清理redis中的队列,从而可以实现暂停和恢复的功能。
SCHEDULER_PERSIST = True
# 设置连接redis信息
REDIS_HOST = '127.0.0.1'
REDIS_PORT = 6379
运行爬虫:
- 在爬虫服务器上。进入爬虫文件所在的路径,然后输入命令:scrapy runspider [爬虫名字]。
- 在Redis服务器上,推入一个开始的url链接:redis-cli> lpush [redis_key] start_url开始爬取。