使用Python爬虫为BI准备数据源

在为企业实施商业智能时,大部分都是使用内部数据建模和可视化;以前极少企业有爬虫工程师来为企业准备外部数据,最近一年来Python爬虫异常火爆,企业也开始招爬虫工程师为企业丰富数据来源。

       我使用Python 抓取过一些网站数据,如:美团、点评、一亩田、租房等;这些数据并没有用作商业用途而是个人兴趣爬取下来做练习使用;这里我已  一亩田为例使用scrapy框架去抓取它的数据。

一亩田

       它是一个农产品网站,汇集了中国大部分农产品产地和市场行情,发展初期由百度系的人员创建,最初是招了大量的业务员去农村收集和教育农民把产品信息发布到一亩田网上..。

一亩田一开始是网页版,由于爬虫太多和农户在外劳作使用不方便而改成APP版废弃网页版,一亩田App反爬能力非常强悍;另外一亩田有一亩田产地行情和市场行情网页版,它的信息量也非常多,所以我选择爬取一亩田产地行情数据。


       爬取一亩田使用的是Scrapy框架,这个框架的原理及dome我在这里不讲,直接给爬取一亩田的分析思路及源码;

一亩田爬虫分析思路

       首先登陆一亩田产地行情:http://hangqing.ymt.com/chandi,看到农产品分类

单击水果分类就能看到它下面有很多小分类,单击梨进入水果梨的行情页,能看到它下面有全部品种和指定地区选择一个省就能看到当天的行情和一个月的走势;


看到这一连串的网页我就根据这个思路去抓取数据。


一亩田爬虫源码

1.首先创建一个Spider


2.行情数据

       抓取大类、中类、小类、品种  hangqing.py

# -*- coding: utf-8-*-

importscrapy

frommySpider.items import MyspiderItem

fromcopy import deepcopy

importtime



classHangqingSpider(scrapy.Spider):

    name = "hangqing"

    allowed_domains =["hangqing.ymt.com"]

    start_urls = (

        'http://hangqing.ymt.com/',

    )


    #大分类数据

    def parse(self, response):

        a_list =response.xpath("//div[@id='purchase_wrapper']/div//a[@class='hide']")


        for a in a_list:

            items = MyspiderItem()

            items["ymt_bigsort_href"]= a.xpath("./@href").extract_first()

            items["ymt_bigsort_id"] =                               items["ymt_bigsort_href"].replace("http://hangqing.ymt.com/common/nav_chandi_","")

            items["ymt_bigsort_name"]= a.xpath("./text()").extract_first()


            #发送详情页的请求

            yield scrapy.Request(

               items["ymt_bigsort_href"],

               callback=self.parse_medium_detail,

                meta={"item":deepcopy(items)}

            )


            #发送下一页的请求(使用xpath 获取下一页地址)

            # next_url = response.xpath("下一页xpatn")

            # if next_url is not None:

            #     yield scrapy.Request(

            #         next_url,

            #         callback=self.parse

            #     )


    #中分类数据  其中小类也包含在其中

    def parse_medium_detail(self, response):

        items = response.meta["item"]

        li_list =response.xpath("//div[@class='cate_nav_wrap']//a")

        for li in li_list:

            items["ymt_mediumsort_id"]= li.xpath("./@data-id").extract_first()

           items["ymt_mediumsort_name"] =li.xpath("./text()").extract_first()

            yield scrapy.Request(

               items["ymt_bigsort_href"],

                callback=self.parse_small_detail,

                meta={"item":deepcopy(items)},

                dont_filter=True

            )


    #小分类数据

    def parse_small_detail(self, response):

        item = response.meta["item"]

        mediumsort_id =item["ymt_mediumsort_id"]

        if int(mediumsort_id) > 0:

            nav_product_id ="nav-product-" + mediumsort_id

            a_list = response.xpath("//div[@class='cate_content_1']//div[contains(@class,'{}')]//ul//a".format(nav_product_id))

            for a in a_list:

               item["ymt_smallsort_id"] =a.xpath("./@data-id").extract_first()

               item["ymt_smallsort_href"] = a.xpath("./@href").extract_first()

               item["ymt_smallsort_name"] =a.xpath("./text()").extract_first()

                yield scrapy.Request(

                   item["ymt_smallsort_href"],

                   callback=self.parse_variety_detail,

                    meta={"item":deepcopy(item)}

                )


    #品种数据

    def parse_variety_detail(self, response):

        item = response.meta["item"]

        li_list =response.xpath("//ul[@class='all_cate clearfix']//li")

        if len(li_list) > 0:

            for li in li_list:

               item["ymt_breed_href"] =li.xpath("./a/@href").extract_first()

               item["ymt_breed_name"] =li.xpath("./a/text()").extract_first()

                item["ymt_breed_id"]= item["ymt_breed_href"].split("_")[2]

                # time.sleep(1)

                yield item

                # print(item)

        else:

            item["ymt_breed_href"] =""

            item["ymt_breed_name"] =""

            item["ymt_breed_id"] = -1

            # time.sleep(1)

            yield item

            # print(item)


3.产地数据


       抓取省份、城市、县市  chandi.py

       # -*- coding: utf-8 -*-

importscrapy

frommySpider.items import MyspiderChanDi

fromcopy import deepcopy



classChandiSpider(scrapy.Spider):

    name = 'chandi'

    allowed_domains = ['hangqing.ymt.com']

    start_urls =['http://hangqing.ymt.com/chandi_8031_0_0']


    #省份数据

    def parse(self, response):

        #产地列表

        li_list =response.xpath("//div[@class='fl sku_name']/ul//li")

        for li in li_list:

            items = MyspiderChanDi()

           items["ymt_province_href"] =li.xpath("./a/@href").extract_first()

            items["ymt_province_id"]= items["ymt_province_href"].split("_")[-1]

            items["ymt_province_name"]= li.xpath("./a/text()").extract_first()


            yield scrapy.Request(

               items["ymt_province_href"],

               callback=self.parse_city_detail,

                meta={"item":deepcopy(items)}

            )


    #城市数据

    def parse_city_detail(self, response):

        item = response.meta["item"]

        option =response.xpath("//select[@class='location_select'][1]//option")


        if len(option) > 0:

            for op in option:

                name = op.xpath("./text()").extract_first()

                if name != "全部":

                   item["ymt_city_name"] = name

                   item["ymt_city_href"] =op.xpath("./@data-url").extract_first()

                   item["ymt_city_id"] = item["ymt_city_href"].split("_")[-1]

                    yield scrapy.Request(

                       item["ymt_city_href"],

                       callback=self.parse_area_detail,

                        meta={"item":deepcopy(item)}

                    )

        else:

            item["ymt_city_name"] =""

            item["ymt_city_href"] =""

            item["ymt_city_id"] = 0

            yield scrapy.Request(

               item["ymt_city_href"],

               callback=self.parse_area_detail,

                meta={"item":deepcopy(item)}


            )


    #县市数据

    def parse_area_detail(self, response):

        item = response.meta["item"]

        area_list =response.xpath("//select[@class='location_select'][2]//option")


        if len(area_list) > 0:

            for area in area_list:

                name =area.xpath("./text()").extract_first()

                if name != "全部":

                   item["ymt_area_name"] = name

                   item["ymt_area_href"] = area.xpath("./@data-url").extract_first()

                   item["ymt_area_id"] =item["ymt_area_href"].split("_")[-1]

                    yield item

        else:

            item["ymt_area_name"] =""

            item["ymt_area_href"] =""

            item["ymt_area_id"] = 0

            yield item


4.行情分布

location_char.py

       #-*- coding: utf-8 -*-

import scrapy

import pymysql

import json

from copy import deepcopy

from mySpider.items importMySpiderSmallProvincePrice

import datetime



class LocationCharSpider(scrapy.Spider):

    name = 'location_char'

    allowed_domains = ['hangqing.ymt.com']

    start_urls = ['http://hangqing.ymt.com/']


    i = datetime.datetime.now()

    dateKey = str(i.year) + str(i.month) +str(i.day)

    db = pymysql.connect(

        host="127.0.0.1", port=3306,

        user='root', password='mysql',

        db='ymt_db', charset='utf8'

    )


    def parse(self, response):

        cur = self.db.cursor()

        location_char_sql = "selectsmall_id from ymt_price_small where dateKey = {} and day_avg_price >0".format(self.dateKey)


        cur.execute(location_char_sql)

        location_chars = cur.fetchall()

        for ch in location_chars:

            item = MySpiderSmallProvincePrice()

            item["small_id"] = ch[0]

            location_char_url ="http://hangqing.ymt.com/chandi/location_charts"

            small_id =str(item["small_id"])

            form_data = {

                "locationId":"0",

                "productId": small_id,

                "breedId":"0"

            }

            yield scrapy.FormRequest(

                location_char_url,

                formdata=form_data,

                callback=self.location_char,

                meta={"item":deepcopy(item)}

            )


    def location_char(self, response):

        item = response.meta["item"]


        html_str = json.loads(response.text)

        status = html_str["status"]

        if status == 0:

            item["unit"] =html_str["data"]["unit"]

            item["dateKey"] = self.dateKey

            dataList =html_str["data"]["dataList"]

            for data in dataList:

                if type(data) == type([]):

                   item["province_name"] = data[0]

                   item["province_price"] = data[1]

                elif type(data) == type({}):

                   item["province_name"] = data["name"]

                   item["province_price"] = data["y"]


                location_char_url ="http://hangqing.ymt.com/chandi/location_charts"

                small_id =str(item["small_id"])

                province_name =str(item["province_name"])

                province_id_sql = "selectprovince_id from ymt_1_dim_cdProvince where province_name = \"{}\"".format(province_name)

                cur = self.db.cursor()

                cur.execute(province_id_sql)

                province_id = cur.fetchone()


                item["province_id"] =province_id[0]


                province_id = str(province_id[0])

                form_data = {

                    "locationId":province_id,

                    "productId":small_id,

                    "breedId":"0"

                }

                yield scrapy.FormRequest(

                    location_char_url,

                    formdata=form_data,

                   callback=self.location_char_province,

                    meta={"item":deepcopy(item)}

                )


    def location_char_province(self, response):

        item = response.meta["item"]


        html_str = json.loads(response.text)

        status = html_str["status"]


        if status == 0:

            dataList =html_str["data"]["dataList"]

            for data in dataList:

                if type(data) == type([]):

                    item["city_name"]= data[0]

                   item["city_price"] = data[1]

                elif type(data) == type({}):

                    item["city_name"]= data["name"]

                    item["city_price"] =data["y"]


                location_char_url ="http://hangqing.ymt.com/chandi/location_charts"

                small_id =str(item["small_id"])

                city_name =str(item["city_name"])

                city_id_sql = "selectcity_id from ymt_1_dim_cdCity where city_name = \"{}\"".format(city_name)

                cur = self.db.cursor()

                cur.execute(city_id_sql)

                city_id = cur.fetchone()


                item["city_id"] =city_id[0]


                city_id = str(city_id[0])

                form_data = {

                    "locationId":city_id,

                    "productId":small_id,

                    "breedId":"0"

                }

                yield scrapy.FormRequest(

                    location_char_url,

                    formdata=form_data,

                   callback=self.location_char_province_city,

                    meta={"item":deepcopy(item)}

                )


    def location_char_province_city(self,response):

        item = response.meta["item"]


        html_str = json.loads(response.text)

        status = html_str["status"]


        if status == 0:

            dataList =html_str["data"]["dataList"]

            for data in dataList:

                if type(data) == type([]):

                    item["area_name"]= data[0]

                   item["area_price"] = data[1]

                elif type(data) == type({}):

                    item["area_name"]= data["name"]

                    item["area_price"] =data["y"]

                area_name =item["area_name"]

                area_id_sql = "selectarea_id from ymt_1_dim_cdArea where area_name = \"{}\"".format(area_name)

                cur1 = self.db.cursor()

                cur1.execute(area_id_sql)

                area_id = cur1.fetchone()


                item["area_id"] =area_id[0]


                breed_id_sql = "selectbreed_id from ymt_all_info_sort where small_id = {} and breed_id >0".format(item["small_id"])

                cur1.execute(breed_id_sql)

                breed_ids = cur1.fetchall()

                # print(len(breed_ids))

                location_char_url ="http://hangqing.ymt.com/chandi/location_charts"

                for breed_id in breed_ids:

                    item["breed_id"] =breed_id[0]

                    form_data = {

                        "locationId":str(item["city_id"]),

                        "productId":str(item["small_id"]),

                        "breedId":str(breed_id[0])

                    }

                    # print(form_data,breed_id)

                    yield scrapy.FormRequest(

                        location_char_url,

                        formdata=form_data,

                       callback=self.location_char_province_city_breed,

                        meta={"item":deepcopy(item)}

                    )


    def location_char_province_city_breed(self,response):

        item = response.meta["item"]


        html_str = json.loads(response.text)

        status = html_str["status"]


        if status == 0:

            dataList =html_str["data"]["dataList"]

            for data in dataList:

                if type(data) == type([]):

                   item["breed_city_name"] = data[0]

                   item["breed_city_price"] = data[1]

                elif type(data) == type({}):

                   item["breed_city_name"] = data["name"]

                   item["breed_city_price"] = data["y"]


                # print(item)

                yield item




5.价格走势

pricedata.py

       # -*- coding: utf-8 -*-

importscrapy

importpymysql.cursors

fromcopy import deepcopy

frommySpider.items import MySpiderSmallprice


importdatetime

importjson



classPricedataSpider(scrapy.Spider):

    name = 'pricedata'

    allowed_domains = ['hangqing.ymt.com']

    start_urls =['http://hangqing.ymt.com/chandi_8031_0_0']

    i = datetime.datetime.now()


    def parse(self, response):

        db = pymysql.connect(

            host="127.0.0.1",port=3306,

            user='root', password='mysql',

            db='ymt_db', charset='utf8'

        )

        cur = db.cursor()


        all_small_sql = "select distinctsmall_id,small_name,small_href from ymt_all_info_sort"


        cur.execute(all_small_sql)

        small_all = cur.fetchall()


        for small in small_all:

            item = MySpiderSmallprice()

            item["small_href"] =small[2]

            # item["small_name"] =small[1]

            item["small_id"] =small[0]

            yield scrapy.Request(

                item["small_href"],

                callback=self.small_breed_info,

                meta={"item":deepcopy(item)}

            )


    def small_breed_info(self, response):

        item = response.meta["item"]

        item["day_avg_price"] =response.xpath("//dd[@class='c_origin_price']/p[2]//span[1]/text()").extract_first()

        item["unit"] =response.xpath("//dd[@class='c_origin_price']/p[2]//span[2]/text()").extract_first()

        item["dateKey"] =str(self.i.year)+str(self.i.month)+str(self.i.day)


        if item["day_avg_price"] isNone:

            item["day_avg_price"] = 0

            item["unit"] =""


        yield item



6.设计字典

       items.py

# -*- coding: utf-8-*-


#Define here the models for your scraped items

#

# Seedocumentation in:

#http://doc.scrapy.org/en/latest/topics/items.html


importscrapy


# 行情爬虫字段



classMyspiderItem(scrapy.Item):

    ymt_bigsort_href = scrapy.Field()

    ymt_bigsort_id = scrapy.Field()

    ymt_bigsort_name = scrapy.Field()

    ymt_mediumsort_id = scrapy.Field()

    ymt_mediumsort_name = scrapy.Field()

    ymt_smallsort_id = scrapy.Field()

    ymt_smallsort_href = scrapy.Field()

    ymt_smallsort_name = scrapy.Field()

    ymt_breed_id = scrapy.Field()

    ymt_breed_name = scrapy.Field()

    ymt_breed_href = scrapy.Field()



# 产地爬虫字段



classMyspiderChanDi(scrapy.Item):

    ymt_province_id = scrapy.Field()

    ymt_province_name = scrapy.Field()

    ymt_province_href = scrapy.Field()

    ymt_city_id = scrapy.Field()

    ymt_city_name = scrapy.Field()

    ymt_city_href = scrapy.Field()

    ymt_area_id = scrapy.Field()

    ymt_area_name = scrapy.Field()

    ymt_area_href = scrapy.Field()


# 小类产地价格



classMySpiderSmallprice(scrapy.Item):

    small_href = scrapy.Field()

    small_id = scrapy.Field()

    day_avg_price = scrapy.Field()

    unit = scrapy.Field()

    dateKey = scrapy.Field()


# 小分类省份/城市/县市 价格



classMySpiderSmallProvincePrice(scrapy.Item):

    small_id = scrapy.Field()

    unit = scrapy.Field()

    province_name = scrapy.Field()

    province_price = scrapy.Field()  #小类 省份 均价

    province_id = scrapy.Field()

    city_name = scrapy.Field()

    city_price = scrapy.Field()      #小类 城市 均价

    city_id = scrapy.Field()

    area_name = scrapy.Field()

    area_price = scrapy.Field()      #小类 县市均价

    area_id = scrapy.Field()


    breed_city_name = scrapy.Field()

    breed_city_price = scrapy.Field()

    breed_id = scrapy.Field()


    dateKey = scrapy.Field()




7.数据入库

       pipelines.py

       # -*- coding: utf-8 -*-


frompymongo import MongoClient

importpymysql.cursors



classMyspiderPipeline(object):

    def open_spider(self, spider):

        # client =MongoClient(host=spider.settings["MONGO_HOST"],port=spider.settings["MONGO_PORT"])

        # self.collection =client["ymt"]["hangqing"]

        pass


    def process_item(self, item, spider):

        db = pymysql.connect(

            host="127.0.0.1",port=3306,

            user='root', password='mysql',

            db='ymt_db', charset='utf8'

        )

        cur = db.cursor()


        if spider.name == "hangqing":


            #所有 分类数据

            all_sort_sql = "insert intoymt_all_info_sort(big_id, big_name, big_href, " \

                           "medium_id,medium_name, " \

                           "small_id,small_name, small_href, " \

                           "breed_id,breed_name, breed_href) " \

                          "VALUES({},\"{}\",\"{}\",\"{}\",\"{}\",\"{}\",\"{}\",\"{}\",\"{}\",\"{}\",\"{}\")".format(

               item["ymt_bigsort_id"], item["ymt_bigsort_name"],item["ymt_bigsort_href"],

               item["ymt_mediumsort_id"],item["ymt_mediumsort_name"],

               item["ymt_smallsort_id"], item["ymt_smallsort_name"],item["ymt_smallsort_href"],

                item["ymt_breed_id"],item["ymt_breed_name"], item["ymt_breed_href"])


            try:

                cur.execute(all_sort_sql)

                db.commit()


            except Exception as e:

                db.rollback()

            finally:

                cur.close()

                db.close()


            return item


        elif spider.name == "chandi":


            #所有的产地数据

            all_cd_sql = "insert intoymt_all_info_cd(" \

                         "province_id,province_name, province_href, " \

                         "city_id,city_name, city_href," \

                         "area_id,area_name, area_href) " \

                         "VALUES({},\"{}\",\"{}\",{},\"{}\",\"{}\",{},\"{}\",\"{}\")".format(

               item["ymt_province_id"], item["ymt_province_name"],item["ymt_province_href"],

                item["ymt_city_id"],item["ymt_city_name"], item["ymt_city_href"],

                item["ymt_area_id"],item["ymt_area_name"], item["ymt_area_href"])

            try:

                #产地数据

                cur.execute(all_cd_sql)

                db.commit()

            except Exception as e:

                db.rollback()


            finally:

                cur.close()

                db.close()


            return item


        elif spider.name =="pricedata":

            avg_day_price_sql = "insertinto ymt_price_small(small_href, small_id, day_avg_price, unit, dateKey) "\

                                "VALUES(\"{}\",{},{},\"{}\",\"{}\")".format(item["small_href"],item["small_id"], item["day_avg_price"],item["unit"], item["dateKey"])

            try:

                cur.execute(avg_day_price_sql)

                db.commit()

            except Exception as e:

                db.rollback()

            finally:

                cur.close()

                db.close()


        elif spider.name =="location_char":

            location_char_sql = "insertinto ymt_price_provice(small_id, province_name, provice_price, city_name,city_price, area_name, area_price,unit, dateKey, area_id, city_id, provice_id,breed_city_name, breed_city_price, breed_id) " \

                               "VALUES({},\"{}\",{},\"{}\",{},\"{}\",{},\"{}\",{},{},{},{},\"{}\",{},{})".format(item["small_id"],item["province_name"], item["province_price"],item["city_name"], item["city_price"],

                                                                                           item["area_name"], item["area_price"],item["unit"], item["dateKey"],

                                                                                           item["area_id"], item["city_id"],item["province_id"],

                                                                                           item["breed_city_name"],item["breed_city_price"], item["breed_id"])

            try:

                cur.execute(location_char_sql)

                db.commit()

            except Exception as e:

                db.rollback()

            finally:

                cur.close()

                db.close()


        else:

            cur.close()

            db.close()



最后结果

       处于个人兴趣,最后把爬取下来的农产品信息变成了一个WEB系统。


©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容