scrapy打造分布式带自动登陆的企业信息爬虫

题记:

  最近再做一个征信的项目,当查询企业征信时需要输入公司名称全称和企业所在的城市,而且你还不能输错,我就不想多吐槽了。反正,我就是感觉不爽,怎么办,自己动手,丰衣足食。就自己来获取下这些数据吧。

正文:

  为了获取全国企业的简要信息,我开始从网上找有这些信息的网站,最后找到了几个: 一个是企查查,一个是知企业,还有就是各个城市自己的工商系统。最后,我就选择先从知企业这里获取到公司的基本信息和公司所在地信息。

  分析完需求,开始进行技术选型: 爬虫就选用功能强大的scrapy,爬取的数据结构类似文档类型,选用mongodb比较合适。考虑到数据量比较大,大致计算下单机跑要很多天,程序中断等异常情况很容易发生,所以记录下程序爬取的记录很有必要,分析目标网站,每个公司由32位16进制字符标识,这样的话,存储几百万数据到内存中占用的空间也不大,就直接丢到redis中吧。然后爬虫本身加一个布隆过滤,已经爬过的就不在爬取。

  首先配置mongo,一开始我希望把redis装在开放到公网的树莓派上(路由器刷华硕固件后,开启ngrok将内网的树莓派等设备映射到公网),但是即使是树莓派3了,arm也已经是v7了,支持mongodb3时还是有点力所不能及(因为希望用mongodb3以上的WiredTiger引擎,看官方数据,这个比默认的MMAP引擎性能上提升了将近30%)。折腾了一番,无果,就放到阿里云上吧,相关配置如下:

systemLog:
  destination: file
  logAppend: true
  path: /var/log/mongodb/mongod.log
storage:
  dbPath: /var/lib/mongo
  journal:
    enabled: true
  engine: wiredTiger
  wiredTiger:
    engineConfig:
      cacheSizeGB: 1
      #cache_size: 300M
      directoryForIndexes: true
    collectionConfig:
      blockCompressor: snappy
    indexConfig:
      prefixCompression: true
processManagement:
  fork: true  # fork and run in background
  pidFilePath: /var/run/mongodb/mongod.pid
net:
  port: 27017
  bindIp: 0.0.0.0
security:
  authorization: enabled

。之后还发生了个小插曲,爬虫跑起来一段时间后我的主机竟然不能连接上去了,看监控信息内存爆掉了,看mongo文档,wiredTiger 引擎如果不配置cacheSizeGB 这个参数,则默认使用40% 系统内存,不过如果40%内存的值小于1G,就直接使用1G。果然很霸道,我这个机器的内存只有2G呀,mongo 引擎都要1G,再加上mongo自身需要的内存,大约1.3G就这样没了,而我这个机器上还跑的有别的程序,所以内存很容易就跑光了。最不可思议的是cacheSizeGB 这个参数是个整型,这样一来,你至少也得配1G,看来只能妥协了,加点内存吧,于是开启swap,添加1G的磁盘容量为swap内存,暂时解决了这个问题。

爬虫基本结构构造如下: 从首页初始化后,选择各个省份,然后进入到城市、地区,然后把这个页面上企业的url加入到爬取队列中,因为这个页面是分页的,所以分析完这个页面继续进入到下个页面进行分析,核心代码如下:

    """
    分析省份详情
    """
    def parse_province(self, response):
        # 获取省份名称,回调进入城市分析

    """
    分析城市详情
    """
    def parse_city(self, response):
        # 获取城市名称,回调进入区域分析

    """
    分析区域详情
    """
    def parse_area(self, response):
        # 获取区域名称,回调页面分析

    """
    分析页面上企业名单, 如果有下一页, 则在下一页继续执行本方法
    """
    def parse_page(self, response):
        xxx
        ar_corp = corp.get_corp_list(response)
        for corp_url in ar_corp:
            corp_id = corp.get_corp_id_by_url(corp_url)
            yield Request("%s%s" % (self.url_prefix, corp_url),
                          meta={"province": province,
                                "city": city,
                                "area": area,
                                "corp_id": corp_id}, callback=self.parse_corp)
        # 将下一页加入到爬取队列
        next_page = corp.get_next_page_url(response)
        if next_page:
            yield Request("%s%s" % (self.url_prefix, next_page),
                          meta={"province": province,
                                "city": city,
                                "area": area}, callback=self.parse_page)

    """
    分析企业详情
    """
    def parse_corp(self, response, meta=False):
        # 拼装item

  当我的爬虫愉快地爬了几天后,突然报异常了:页面上一些元素不能找到。赶紧到网站上一看,管理员在此期间,加了登陆限制,非登录用户只能查看很少一部分数据。还好登陆机制不太复杂,赶紧加了自动登陆的代码,在爬虫初始化后触发自动登陆,拿到授权cookie后再进行爬取。示例如下:

def start_requests(self):
        logging.info("start to login ...")
        return [Request("http://www.zhiqiye.com/index.html",
                    callback=self.post_login)]

def post_login(self, response):
    current = int(time.time() * 1000)
        return [FormRequest.from_response(response,
                                        url="http://www.zhiqiye.com/account/login/",
                                        method="POST",
                                        formdata={
                                            'time': "%s" % current,
                                            'username': self.user_name,
                                            'pass': self.password,
                                            'f': 'true'
                                        },
                                        dont_filter=True,
                                        callback=self.after_login)]

不过,爬取了几个小时候,发现又断了,看日志,继续报元素找不到异常。分析下来,原因是session过期了,看来需要做登陆态检查了。这里花了一番功夫,本想检查到登出后重新触发初始化就行了,然后发现并不行,后来了解到parse函数中做了return操作,这样就会触发数据保存并中断回调的操作,所以并不能在这里加回调操作。最终找到解决办法(v站讨论地址:https://www.v2ex.com/t/302345)就是定义一个重新登陆的函数,检测到登出态(通过捕捉爬虫元素报IndexError异常的方式)时调用这个函数(其实这里少了一个步骤:把本次失败的请求重新加入到爬虫队列中)。 示例如下:

try:
            corp = CorpParseModel()
            corp.get_corp_tips(response, item)
            corp.get_contact(response, item)
            corp.get_commercial(response, item)
            corp.get_corp_info(response, item)
            corp.get_relate_corp(response, item)
            return item
except IndexError:
            self.log("------------- start to login again -------------")
            if is_retry:
                self.log("------ retry login and fails ------")
            else:
                return self.post_login(response, True)

最后把mongo、redis 添加密码访问和ip限制,然后开放到公网,这样就可以多个设备一起愉快地爬取了。

心得:

1. 开发过程中注意对异常的处理,确保系统的健壮性。前期一定要有比较完整的日志收集,出问题时可以快速定位问题、分析问题。

2. scrapy 中可以使用 inspect_response 调试正则、附带的参数、cookie等信息,方便观察请求的细节。

3. 开发爬虫时可以定义一个测试用的爬虫,新的功能开发和测试可以现在测试爬虫完成后再放入到目标爬虫。

4. 监控很重要,要实时监控我们的系统运行的状态。开发这个爬虫项目时我加了个获取爬虫状态的功能到我的微信公众号,只要输入相关指令即可获取爬虫当前的状态(爬取总数、各省份爬取数量、模糊搜索等)

附:

1. 爬虫地址:https://github.com/liyj144/corporation_spider

2. 微信机器人地址(包含爬虫状态监控):https://github.com/liyj144/weixin_robot

发表评论

电子邮件地址不会被公开。 必填项已用*标注