陶刚的博客
与你分享我的点滴

Django与Elasticsearch交互打造搜索引擎网站(一)

说明:该篇博客是博主一字一码编写的,实属不易,请尊重原创,谢谢大家!


一丶叙述

1.项目介绍

通过scrapy爬取伯乐在线网站的全部文章数据通过elasticsearch_dsl工具将数据写入到elasticsearch中;再使用django开发一个搜索网站并且与elasticsearch进行数据交互来打造出一个高大上的搜索引擎网站。

2.项目展示

  • 搜索引擎网站首页页面展示,当输入不完整的关键字时,会自动补全以及会显示出搜索建议

  • 搜索结果页面显示,会对搜索的内容进行分词处理,并且在文章列表中会对分词后的字段进行高亮标红显示

  • 动态效果展示,点击在搜索框输入python时的自动补全提示内容跳转到搜索结果列表页面

  • 动态效果展示,直接在搜索框输入python后点击搜索按钮,跳转到搜索结果列表页,并在主页我的搜索项中显示搜索历史

  • 在搜索结果页面同样有个搜索框,同样跟主页一样进行关键字搜索

3.什么是ElasticSearch

ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。

我们建立一个网站或应用程序,并要添加搜索功能,但是想要完成搜索工作的创建是非常困难的。我们希望搜索解决方案要运行速度快,我们希望能有一个零配置和一个完全免费的搜索模式,我们希望能够简单地使用JSON通过HTTP来索引数据,我们希望我们的搜索服务器始终可用,我们希望能够从一台开始并扩展到数百台,我们要实时搜索,我们要简单的多租户,我们希望建立一个云的解决方案。因此我们利用Elasticsearch来解决所有这些问题及可能出现的更多其它问题。

4.什么是Elasticsearch-RTF

RTF是Ready To Fly的缩写,在航模里面,表示无需自己组装零件即可直接上手即飞的航空模型,Elasticsearch-RTF是针对中文的一个发行版,即使用最新稳定的elasticsearch版本,并且帮你下载测试好对应的插件,如中文分词插件等,目的是让你可以下载下来就可以直接的使用(虽然es已经很简单了,但是很多新手还是需要去花时间去找配置,中间的过程其实很痛苦),当然等你对这些都熟悉了之后,你完全可以自己去diy了,跟linux的众多发行版是一个意思。

二丶elasticsearch-rtf的安装与测试

1. 安装elasticsearch-rtf

  • 在github上搜索elasticsearch-rtf,选择第一个点击进入如下页面下载即可

  • 将下载好的elasticsearch-5.1.1.zip包,进行解压,需要注意的是解压的目录名称不能带中文,否则运行elasticsearch则会报错

  • 因为elasticsearch是由java进行开发的,所以要运行elasticsearch则需要先安装java jdk8+;因为博主以前就安装过,所以这里不进行演示,网上有很多教程,安装完jdk后,在cmd窗口输入java -version则显示出安装成功后的jdk版本信息

  • 在cmd中进入解压的elasticsearch文件bin目录下,执行elasticsearch.bat批处理文件

2. 测试

三丶elasticsearch-head插件以及kibana的安装

1. elasticsearch-head插件安装

  • 在github上搜索elasticsearch-head,选择第一个点击进入如下页面下载即可

  • 博主这里直接使用git命令将elassearch-head下载下来,也可以直接在如上页面点击download进行下载

  • cd进入elassearch-head目录后,使用npm命令进行安装,需要注意的是npm命令就好比python中的pip一样,要想使用npm命令则需要下载安装windows版本的Node.js,安装Node.js成功后就可以直接使用npm命令了

  • 在elasticsearch-head插件页面无法连接http://127.0.0.1:9200/地址,如上图显示集群健康值:未连接,需要在elasticsearch解压文件config目录下elasticsearch.yml配置文件添加如下配置
http.cors.enabled: true
http.cors.allow-origin: "*"
http.cors.allow-methods: OPTIONS, HEAD, GET, POST, PUT, DELETE
http.cors.allow-headers: "X-Requested-With, Content-Type, Content-Length, X-User"

2.kibana的安装

  • 下载与elastsearch一致版本的kibana,否则会出错

  • 下载成功后进行解压,然后打开cmd窗口进入解压文件bin目录下,执行kibana.bat批处理文件

四丶elasticsearch搜索引擎的使用

**说明:**什么是Kibana:Kibana是一个针对Elasticsearch的开源分析及可视化平台,用来搜索、查看交互存储在Elasticsearch索引中的数据。使用Kibana,可以通过各种图表进行高级数据分析及展示;举例说明就好比mysql数据库的工具navicat一样

1. elasticsearch基本的索引和文档CRUD操作

  • 启动Kibana服务

  • 在浏览器中访问http://127.0.0.1:5601则进行Kibana可视化页面,在页面中选择Dev-Tools可以创建索引以及对查询索引的配置

  • 如上图创建lagou索引后,回到elasticsearch-head工具页面,刷新后则出现创建的lagou索引

  • 修改lagou索引配置

  • 获取所有的索引信息

  • 向索引中保存文档(向数据库添加表数据),注重说明一下可以将索引看做成数据库,type看作成数据表下的表后面跟着表的id,即PUT lagou/job/1,当添加数据时不指明id则也能添加成功能,系统会默认将id生成一个uuid的值

  • 回到elasticsearch-head页面,点击数据浏览选择lagou就能看到在Kibana中添加的数据信息

  • 获取lagou下的job表数据

  • 获取lagou下的job表某个字段以及多个字段的数据,需要注意的是,后面不能留空格

  • 修改lagou下的job表id为1数据,也是使用PUT进行修改,这种修改时覆盖式的修改

  • 使用POST修改lagou下的job表id为1的数据

  • 删除lagou/job表下id为1的所有数据

  • 删除lagou索引

2. elasticsearch的mget和bulk批量操作

  • 在使用mget查询之前需要添加些查询数据
PUT testdb/job1/1
{
  "title":"test_job1_1"
}
PUT testdb/job1/2
{
  "title":"test_job1_2"
}
PUT testdb/job2/1
{
  "title":"test_job2_1"
}
PUT testdb/job2/2
{
  "title":"test_job2_2"
}
  • 查看添加后的索引数据信息

  • 使用mget查询多个条件数据

  • 当index一致时,就可以进行如下查询
GET testdb/_mget
{
  "docs":[
    {
      "_type":"job1",
      "_id":1
    },
    {
      "_type":"job2",
      "_id":2
    }
    ]
}
  • 以此类推当index和type相同都是job1时只有id不同的情况下,可以进行如下查询

  • 使用bulk批量操作,添加lagou索引并在其下创建job1和job2两个type以及对应的id

3. elasticsearch的mapping映射管理

**说明:**关于elasticsearch的mapping的概念可以百度进行了解

  • 为了方便演示创建索引的映射,博主需要将之前创建的索引全部删除

  • 创建索引映射

  • 向创建好的索引映射中插入数据

  • 获取索引下的mapping

  • 获取集群下的所有mapping

4. elasticsearch的简单查询

  • 删除lagou索引,然后创建新的lagou索引并添加映射

  • 向索引中添加查询数据

  • 使用match匹配查询title字段包含python开发的数据,在创建索引映射时title字段使用的是ik_max_word,该工具会对此字段不管大小写都会转换成小写并且会对title字段进行中英文分词处理,所以在使用match进行匹配查询时,就会查询到如下两条数据

  • 而使用term进行查询时,会根据传入的title的值也就是python开发进行全局匹配不会进行分词处理,通俗来说在数据title字段内容必须不多不少是”python开发”才能查询到数据,所以如下没有查询到数据

  • 使用terms进行查询,对满足数组中的任何内容的title字段数据就会查询出来

  • 控制查询数据返回数量,实际匹配出两条数据,但form从0开始取只往后取一个,即结果只显示出一条数据

  • 使用match_all查询所有数据

  • 使用match_phrase查询,会将查询的内容进行分词,slop表示的是分词之间的长度,也就是说python和总监之间的长度,要满足该长度内才能查询出数据

  • multi_match查询,指定在title和desc字段中查询包含python的数据

  • 指定返回title以及company_name字段数据

  • 通过sort把结果排序

  • range范围查询comments大于等于1000小于等于2000的数据,boost表示权重

  • 使用range对时间范围进行查询

5. elasticsearch的bool组合查询

说明 bool组合查询包括:must丶should丶must_not丶filter来完成

  • bool组合查询的格式如下
bool:{
  "must":[],
  "should":[],
  "must_not":[],
  "filter":[]
}
  • 删除之前的索引,重新创建测试数据

  • 从查询到的所有数据中筛选出salary为2000的数据

  • 使用ik_max_word查看分析器解析的结果

  • 使用ik_smart查看分析器解析结果

  • 组合过滤查询薪资等于2000或者工作为Python的数据并且满足薪水不为3000的数据

  • 嵌套查询工作为python或者工作为django并且工资为3000的数据

  • 过滤空和非空数据,首先创建测试数据

  • 查询tags字段不为null的数据

  • 整个在Kibana中的操作命令如下
# 创建lagou索引
PUT lagou
{
  "settings": {
    "index":{
      "number_of_shards":5,
      "number_of_replicas":1
    }
  }
}
# 获取lagou的配置
GET lagou/_settings
# 获取所有索引的配置
GET _all/_settings
# 获取kibana和lagou索引配置
GET .kibana,lagou/_settings
# 修改lagou索引的配置,shards一旦设置了值就无法修改
PUT lagou/_settings
{
  "number_of_replicas": "3"
}
# 获取所有的索引信息
GET _all
# 向索引中保存文档
PUT lagou/job/1
{
  "title":"Python分布式爬虫打造搜索引擎",
  "min_salary":15000,
  "city":"成都",
  "company":{
    "name":"百度",
    "company_add":"天府软件园"
  },
  "publish_date":"2019-06-01",
  "comments":20
}
# 获取lagou下的job表数据
GET lagou/job/1
# 获取lagou下的job表某个字段的数据
GET lagou/job/1?_source=title
# 获取lagou下的job表多个字段的数据
GET lagou/job/1?_source=title,city
#使用PUT修改lagou下的job表id为1的数据
PUT lagou/job/1
{
  "title":"Python分布式爬虫打造搜索引擎",
  "min_salary":20000,
  "city":"成都",
  "company":{
    "name":"百度",
    "company_add":"天府软件园"
  },
  "publish_date":"2019-06-01",
  "comments":20
}
# 使用POST修改lagou下的job表id为1的数据
POST lagou/job/1/_update
{
  "doc": {
   "comments":50 
  }
}
 
# 删除lagou/job表下id为1的所有数据
DELETE lagou/job/1
# 删除lagou索引 
DELETE lagou
 
PUT testdb/job1/1
{
  "title":"test_job1_1"
}
PUT testdb/job1/2
{
  "title":"test_job1_2"
}
PUT testdb/job2/1
{
  "title":"test_job2_1"
}
PUT testdb/job2/2
{
  "title":"test_job2_2"
}
 
# 使用mget查询多个条件数据
GET _mget
{
  "docs":[
    {
      "_index":"testdb",
      "_type":"job1",
      "_id":1
    },
    {
      "_index":"testdb",
      "_type":"job2",
      "_id":2
    }
    ]
}
# 当index一致时,就可以进行如下查询
GET testdb/_mget
{
  "docs":[
    {
      "_type":"job1",
      "_id":1
    },
    {
      "_type":"job2",
      "_id":2
    }
    ]
}
# 当index和type相同都是job1时只有id不同
GET testdb/job1/_mget
{
  "docs":[
    {
      "_id":1
    },
    {
      "_id":2
    }
    ]
}
 
 
# 使用bulk批量操作
POST _bulk
{"index":{"_index":"lagou", "_type":"job1", "_id":1}}
{"title":"Python分布式爬虫打造搜索引擎", "min_salary":16000,"city":"成都","comany":{"name":"百度","company_addr":"天府软件A园"},"publish_date":"2019-06-01","comments":15}
{"index":{"_index":"lagou", "_type":"job2", "_id":2}}
{"title":"Django Web开发", "min_salary":20000,"city":"成都","comany":{"name":"腾讯","company_addr":"天府软件B园"},"publish_date":"2019-05-20","comments":30}
 
# 创建索引映射   
PUT lagou
{
  "mappings": {
    "job":{
      "properties": {
        "title":{
          "type": "text"
        },
        "min_salary":{
          "type": "integer"
        },
        "city":{
          "type": "keyword"
        },
        "company":{
          "properties": {
            "name":{
              "type":"text"
            },
            "company_addr":{
              "type":"text"
            },
            "employee_count":{
              "type":"integer"
            }
          }
        },
        "publish_date":{
          "type": "date",
          "format": "yyyy-MM-dd"
        },
        "comments":{
          "type": "integer"
        }
      }
    }
  }
}
 
# 向索引插入数据
PUT lagou/job/1
{
  "title":"Python分布式爬虫打造搜索引擎",
  "min_salary":20000,
  "city":"成都",
  "company":{
    "name":"百度",
    "company_add":"天府软件园",
    "employee":200
  },
  "publish_date":"2019-06-01",
  "comments":20
}
 
# 获取索引下的mapping
GET lagou/_mapping
 
# 获取集群下的所有mapping
GET _all/_mapping
 
# 添加映射
PUT lagou
{
  "mappings": {
    "job":{
      "properties": {
        "title":{
          "store": true,
          "type": "text",
          "analyzer": "ik_max_word"
        },
        "company_name":{
          "store": true,
          "type": "keyword"
        },
        "desc":{
          "type": "text"
        },
        "comments":{
          "type": "integer"
        },
        "add_time":{
          "type": "date",
          "format": "yyyy-MM-dd"
        }
      }
    }
  }
}
# 向索引中添加查询数据
POST lagou/job/
{
  "title":"Django Web开发工程师",
  "company_name":"阿里巴巴网络技术有限公司",
  "desc":"精通Python",
  "comments":1200,
  "add_time":"2019-02-14"
}
POST lagou/job/
{
  "title":"数据挖掘工程师",
  "company_name":"百度在线网络技术有限公司",
  "desc":"10年工作经验",
  "comments":1000,
  "add_time":"2019-03-16"
}
POST lagou/job/
{
  "title":"Python人工智能技术总监",
  "company_name":"华为技术有限公司",
  "desc":"能分析和解决疑难问题",
  "comments":3000,
  "add_time":"2019-06-18"
}
 
# 使用match查询
GET lagou/_search
{
  "query": {
    "match": {
      "title": "python开发"
    }
  }
}
 
# 使用term查询
GET lagou/_search
{
  "query": {
    "term": {
      "title": "python开发"
    }
  }
}
 
# 使用terms查询
GET lagou/_search
{
  "query": {
    "terms": {
      "title": ["智能", "挖掘", "web"]
    }
  }
}
 
# 控制查询数据返回数量
GET lagou/_search
{
  "query": {
    "match": {
      "title": "工程师"
    }
  },
  "from": 0,
  "size": 1
}
 
# match_all查询所有数据
GET lagou/_search
{
  "query": {
    "match_all": {}
  }
}
 
# match_phrase查询
GET lagou/_search
{
  "query": {
    "match_phrase": {
      "title":{
        "query": "python总监",
        "slop":6
      }
    }
  }
}
# multi_match查询,指定在title和desc字段中查询
#包含python的数据
GET lagou/_search
{
  "query": {
    "multi_match": {
      "query": "python",
      "fields": ["title", "desc"]
    }
  }
}
# 指定返回的哪些字段数据
GET lagou/_search
{
  "stored_fields": ["title", "company_name"],
  "query": {
    "match": {
      "title": "pythona"
    }
  }
}
# 通过sort把结果排序
GET lagou/_search
{
  "query": {
    "match_all": {}
  },
  "sort": [
    {
      "comments": {
        "order": "desc"
      }
    }
  ]
}
# range范围查询
GET lagou/_search
{
  "query": {
    "range": {
      "comments": {
        "gte": 1000,
        "lte": 2000,
        "boost": 1
      }
    }
  }
}
 
# 使用range对时间范围进行查询
GET lagou/_search
{
  "query": {
    "range": {
      "add_time": {
        "gte": "2019-01-01",
        "lte": "now"
      }
    }
  }
}
 
# bool查询 
#bool:{
#  "must":[],
#  "should":[],
#  "must_not":[],
#  "filter":[]
#}
# 建立测试数据
POST lagou/testjob/_bulk
{"index":{"_id":1}}
{"salary":1000, "title":"Python"}
{"index":{"_id":2}}
{"salary":2000, "title":"Django"}
{"index":{"_id":3}}
{"salary":3000, "title":"Flask"}
{"index":{"_id":4}}
{"salary":4000, "title":"Scrapy"}
 
# 从查询到的所有数据中筛选出salary为2000的数据
# 对应数据库查询语句为
# select * from testjob where salary=2000;
GET lagou/testjob/_search
{
  "query": {
    "bool": {
      "must": [
        {"match_all": {}}
      ],
      "filter": {
        "term": {
          "salary": "2000"
        }
      }
    }
  }
}
# 查看分析器解析的结果
GET _analyze
{
  "analyzer": "ik_max_word",
  "text": "Python Web开发工程师"
}
# 查看分析器解析的结果
GET _analyze
{
  "analyzer": "ik_smart",
  "text": "Python Web开发工程师"
}
 
# 组合过滤查询薪资等于2000或者工作为Python的数据并且满足薪水不为3000的数据
# select * from testjob where (salary=2000 or title="Python") and (salary !=3000);
 
GET lagou/testjob/_search
{
  "query": {
    "bool": {
      "should": [
        {"term": {
          "salary": {
            "value": "2000"
          }
        }},
        {"term": {
          "title": {
            "value": "python"
          }
        }}
      ],
      "must_not": [
        {"term": {
          "salary": {
            "value": "3000"
          }
        }}
      ]
    }
  }
}
 
# 嵌套查询工作为python或者工作为django并且工资为3000的数据
# select * from testjob where title="python" or (title="django" and salary=3000)
 
GET lagou/testjob/_search
{
  "query": {
    "bool": {
      "should": [
        {"term": {
          "title": {
            "value": "python"
          }
        }},
        {"bool": {
          "must": [
            {"term": {
              "title": {
                "value": "django"
              }
            }},
            {"term": {
              "salary": {
                "value": "3000"
              }
            }}
          ]
        }}
      ]
    }
  }
}
# 过滤空和非空数据,首先创建测试数据
POST lagou/testjob2/_bulk
{"index":{"_id":1}}
{"tags":["search"]}
{"index":{"_id":2}}
{"tags":["search", "python"]}
{"index":{"_id":3}}
{"other_field":["some data"]}
{"index":{"_id":4}}
{"tags":null}
{"index":{"_id":5}}
{"tags":["search", null]}
 
# 查询tags字段不为null的数据
# select tags from testjob2 where tags is not null;
GET lagou/testjob2/_search
{
  "query": {
    "bool": {
      "filter": {
        "exists": {
          "field": "tags"
        }
      }
    }
  }
}

五丶使用scrapy爬取知名技术文章网站

1. 创建爬虫项目虚拟环境

2. 进入虚拟环境安装scrapy以及requests

3. 创建scrapy项目

  • 在cmd终端中执行scrapy startproject ArticleSpider即创建scrapy 爬虫项目ArticleSpider

  • 在Pycharm中打开ArticleSpider项目

  • 创建jobbole爬虫文件

  • 即在项目spider目录下生成jobbole.py文件

4. 运行scrapy项目测试是否能够运行成功

  • 在终端执行scrapy crawl jobbole命令,结果报错了

  • 对于windows系统而言报这个错是正常的,使用pip命令安装pypiwin32包即可

  • 安装成功后再次运行则启动scrapy成功

5. 爬取伯乐在线网站的某篇文章

  • 在网站全部文章分类中爬取该篇文章

  • 博主这里使用xpath来匹配要爬取的文章的标题丶发布时间丶标签丶文章内容丶点赞数丶收藏数以及评论数

  • 因为Pycharm未提供scrapy模板,要想使用Debug进行断点测试,需要在项目根目录下创建main.py文件,文件代码如下,也就是执行爬虫的命令scrapy crawl jobbole
from scrapy.cmdline import execute
import sys
import os
 
 
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
execute(["scrapy", "crawl", "jobbole"])
  • 在jobbole.py文件parse方法中使用xpath匹配获取标题丶发布时间丶标签丶文章内容丶点赞数丶收藏数以及评论数数据
def parse(self, response):
    # 标题
    title = response.xpath("//div[@class='entry-header']/h1/text()").extract()[0]
    # 发表时间
    create_date = response.xpath("//p[@class='entry-meta-hide-on-mobile']/text()").extract()[0].strip().replace(" ·", "")
    # 点赞数
    praise_num = response.xpath("//h10/text()").extract()[0]
    # 收藏数
    fav_nums = response.xpath("//span[contains(@class, 'bookmark-btn')]/text()").extract()[0]
    # 使用re正则匹配获取收藏数
    match_re = re.match(".*(\d+).*", fav_nums)
    if match_re:
        fav_nums = match_re.group(1)
    # 评论数
    comment_nums = response.xpath("//span[contains(@class, 'hide-on-480')]/text()").extract()[0]
    # 使用re正则匹配获取评论数
    match_re = re.match(".*(\d+).*", comment_nums)
    if match_re:
        comment_nums = match_re.group(1)
    # 文章内容
    content = response.xpath("//div[@class='entry']").extract()[0]
    # 获取文章的tag标签
    tag_list = response.xpath("//p[@class='entry-meta-hide-on-mobile']/a/text()").extract()
    # 使用列表生成式过滤评论标签
    tag_list = [element for element in tag_list if not element.strip().endswith('评论')]
    tags = ','.join(tag_list)
    pass
  • 因为是爬取某一篇文章,所以在jobbole.py中的start_urls需要修改成要爬取的文章地址
start_urls = ['http://blog.jobbole.com/114690/']
  • Debug运行main.py文件,断点测试获取数据成功且正确

6. 爬取伯乐在线网站中的所有文章

  • 在Terminal终端中通过scrapy shell 命令测试获取所有文章页面的数据,并使用xpath来获取页面中文章url地址,命令为scrapy shell http://blog.jobbole.com/all-posts/

  • 当获取到列表页中20篇文章的url后,当scrapy进行页面数据下载后,需要回调给parse_detail方法进行页面字段的提取操作,这是第一步,第二步就是获取列表下一页的页面数据在parse函数中只要next_url提取存在,则会将此页面数据交给parse函数进行处理依次类推重复循环直到news_url不存在
class JobboleSpider(scrapy.Spider):
    name = 'jobbole'
    allowed_domains = ['blog.jobbole.com']
    start_urls = ['http://blog.jobbole.com/all-posts/']
 
    def parse(self, response):
        """
        1.获取所有文章列表页中文章的url并交给解析函数对具体字段进行解析
        2.获取列表页下一页的url地址并交给scrapy进行页面数据下载,下载完后交给parse函数进行字段获取
        :param response:
        :return:
        """
        # 1.获取列表页中所有文章url
        post_urls = response.xpath("//div[@class='post floated-thumb']/div[@class='post-thumb']/a/@href").extract()
        for post_url in post_urls:
            # 为了防止有些文章链接不带域名前缀,所以需要对对链接地址进行域名凭借,需使用urllib提供的parse工具中的urljoin方法
            # 使用scrapy提供的Request类,将获取的文章内容通过callback回调函数交给parse_detail方法对页面内容字段进行获取
            yield Request(url=parse.urljoin(response.url, post_url), callback=self.parse_detail)
 
        # 2.获取列表页下一页的url地址并交给scrapy进行页面数据下载
        next_url = response.xpath("//a[@class='next page-numbers']/@href").extract_first("")
        if next_url:
            yield Request(url=parse.urljoin(response.url, next_url), callback=self.parse)
 
    def parse_detail(self, response):
        """获取文章详情页中的字段数据"""
        # 标题
        title = response.xpath("//div[@class='entry-header']/h1/text()").extract()[0]
        # 发表时间
        create_date = response.xpath("//p[@class='entry-meta-hide-on-mobile']/text()").extract()[0].strip().replace(" ·", "")
        # 点赞数
        praise_num = response.xpath("//h10/text()").extract()[0]
        # 收藏数
        fav_nums = response.xpath("//span[contains(@class, 'bookmark-btn')]/text()").extract()[0]
        # 使用re正则匹配获取收藏数
        match_re = re.match(".*(\d+).*", fav_nums)
        if match_re:
            fav_nums = match_re.group(1)
        # 评论数
        comment_nums = response.xpath("//span[contains(@class, 'hide-on-480')]/text()").extract()[0]
        # 使用re正则匹配获取评论数
        match_re = re.match(".*(\d+).*", comment_nums)
        if match_re:
            comment_nums = match_re.group(1)
        # 文章内容
        content = response.xpath("//div[@class='entry']").extract()[0]
        # 获取文章的tag标签
        tag_list = response.xpath("//p[@class='entry-meta-hide-on-mobile']/a/text()").extract()
        # 使用列表生成式过滤评论标签
        tag_list = [element for element in tag_list if not element.strip().endswith('评论')]
        tags = ','.join(tag_list)
  • Debug测试是否成功爬取文章指定字段数据

  • 在上图中红框提取的评论数显示的不是数字而是评论,访问该篇文章,发现该文章并没有评论

  • 回到代码中对评论数和收藏数正则匹配不成功时需要进行逻辑判断,不存在默认为0
match_re = re.match(".*(\d+).*", fav_nums)
if match_re:
    fav_nums = int(match_re.group(1))
else:
    fav_nums = 0
 
match_re = re.match(".*(\d+).*", comment_nums)
if match_re:
    comment_nums = int(match_re.group(1))
else:
    comment_nums = 0
  • 再次测试则显示评论数为0

7. 将爬取的数据保存到本地项目中

**分析:**除了需要提取文章详情里面的字段内容外,在文章列表页中每篇文章的封面图的URL地址也是需要进行提取的

  • 因为文章的链接地址与封面图片地址都在a标签节点之下,所以需要修改之前的代码获取a节点列表,再通过遍历获取每一个a节点,在根据a节点来匹配获取文章地址以及封面图地址

  • 在parse_detail方法中获取response中meta字典属性中cover_image_url的值

  • 在items文件中定义数据模型
class JobBoleArticleItem(scrapy.Item):
    title = scrapy.Field()
    create_date = scrapy.Field()
    url = scrapy.Field()
    url_object_id = scrapy.Field()
    cover_image_url = scrapy.Field()
    cover_image_path = scrapy.Field()
    praise_nums = scrapy.Field()
    comment_nums = scrapy.Field()
    fav_nums = scrapy.Field()
    tags = scrapy.Field()
    content = scrapy.Field()
  • 在jobbole文件parse_detail方法中将提取的数据保存到items对应字段中
# 将提取到的字段数据传递给items中
article = JobBoleArticleItem()
article['title'] = title
article['create_date'] = create_date
article['url'] = response.url
article['cover_image_url'] =  cover_image_url
article['praise_nums'] = praise_nums
article['comment_nums'] = comment_nums
article['fav_nums'] = fav_nums
article['tags'] = tags
article['content'] = content
yield article
  • 在settings中配置使用pipelines数据管道文件
ITEM_PIPELINES = {
   'ArticleSpider.pipelines.ArticlespiderPipeline': 300,
}
  • Debug测试在pipelines中获取item数据

  • 需要将封面图片下载到项目根目录下的images目录下,首先需要在根目录下新建images目录,用于存放下载的封面图;然后需要在settings中pipeline配置中添加scrapy提供的图片下载的管道类ImagesPipeline;紧接着需要settings中指定items中的cover_image_url字段为图片下载地址,最后一步就是settings中指定图片下载后保存的目录路径images

  • 直接run运行main.py文件启动scrapy爬虫项目,结果提示如下错误

  • 因为PIL属于pillow包下的模块,所以安装pillow即可

  • 重新run运行项目,结果又报错了,原因是使用scrapy的ImagesPipeline类,会将settings中配置的IMAGES_URLS_FIELD的值也就是封面图地址数据当作列表数据,但在代码中设置的article[‘cover_image_url’]的值不是列表数据,所以就出现如下异常错误

  • 即在parse_detail函数中将提取的cover_image_url变量修改列表数据即可
article['cover_image_url'] =  [cover_image_url]
  • 重新启动项目,则成功下载文章封面图到images目录下

  • 在下载图片的时候也需要将图片保存的路径保存起来,需要继承scrapy提供的ImagesPipeline类,并重新父类的item_completed方法在方法中将遍历获取的图片路径保存到items中cover_image_path字段中,首先需要在pipelines文件中定义ArticleImagePipeline类完成逻辑
class ArticleImagePipeline(ImagesPipeline):
    def item_completed(self, results, item, info):
        if "cover_image_url" in item:
            for ok, value in results:
                image_file_path = value["path"]
            item["cover_image_path"] = image_file_path
        return item
  • 紧接着在settings中修改管道文件配置为上面定义的管道类
ITEM_PIPELINES = {
   'ArticleSpider.pipelines.ArticlespiderPipeline': 300,
   # 'scrapy.pipelines.images.ImagesPipeline': 1,
   'ArticleSpider.pipelines.ArticleImagePipeline': 1,
}
  • Debug断点测试在执行ArticlespiderPipeline管道类中时是否获取到cover_image_path图片存放路径,说明因为在以上settings中配置的继承scrapy的ImagesPipeline类的管道类ArticleImagePipeline的优先级为1,所以程序在执行管道文件时是先将image_file_path的值也就是图片路径保存到item实例对象的cover_image_path字段中的

  • 接下来在items字段中只差url_object_id的值了,这个值是将response.url也就是文章的地址通过hashlib库中的md5对象的update方法将文章url转换成唯一且长度一致的值,在项目目录下创建utils包并在该包下创建common.py文件,文件代码如下
import hashlib
 
 
def get_md5(url):
    if isinstance(url, str):
        url = url.encode("utf-8")
    m = hashlib.md5()
    m.update(url)
    return m.hexdigest()
 
if __name__ == "__main__":
    print (get_md5("http://jobbole.com"))
  • 执行以上代码,则将http://jobbole.com转换成唯一标识符

  • 在parse_detail函数中设置url_object_id的值
article['url_object_id'] = get_md5(response.url)
  • Debug测试items中是否存在url_object_id的值

  • 将文章详情页中提取的字段数据保存到本地,需要在pipelines文件中定义管道类将item数据保存到本地文件中
class JsonWithEncodingPipeline(object):
    #自定义json文件的导出
    def __init__(self):
        self.file = codecs.open('article.json', 'w', encoding="utf-8")
    def process_item(self, item, spider):
        lines = json.dumps(dict(item), ensure_ascii=False) + "\n"
        self.file.write(lines)
        return item
    def spider_closed(self, spider):
        self.file.close()
  • 在settings中配置以上管道文件
ITEM_PIPELINES = {
   'ArticleSpider.pipelines.JsonWithEncodingPipeline':2,
   # 'ArticleSpider.pipelines.ArticlespiderPipeline': 300,
   # 'scrapy.pipelines.images.ImagesPipeline': 1,
   'ArticleSpider.pipelines.ArticleImagePipeline': 1,
}
  • 运行项目,在下项目根目录成功创建article.json文件,并将爬取到的字段数据写入到文件中

  • 还有一种方法就是使用scrapy提供的JsonItemExporter类实例对象的exporting方法将item数据写入到文件对象中
class JsonExporterPipleline(object):
    #调用scrapy提供的json export导出json文件
    def __init__(self):
        self.file = open('articleexport.json', 'wb')
        self.exporter = JsonItemExporter(self.file, encoding="utf-8", ensure_ascii=False)
        self.exporter.start_exporting()
 
    def close_spider(self, spider):
        self.exporter.finish_exporting()
        self.file.close()
 
    def process_item(self, item, spider):
        self.exporter.export_item(item)
        return item
  • 在settings配置文件中配置以上管道类
ITEM_PIPELINES = {
   'ArticleSpider.pipelines.JsonExporterPipleline':2,
   # 'ArticleSpider.pipelines.JsonWithEncodingPipeline':2,
   # 'ArticleSpider.pipelines.ArticlespiderPipeline': 300,
   # 'scrapy.pipelines.images.ImagesPipeline': 1,
   'ArticleSpider.pipelines.ArticleImagePipeline': 1,
}
  • 运行项目,在下项目根目录成功创建articleexport.json文件,并将爬取到的字段数据写入到文件中

8. 将爬取的数据保存到数据库中

  • 在mysql数据库中创建数据库以及表

  • 需要在parse_detail方法中对create_date字段数据转换成date格式的数据
try:
    create_date = datetime.datetime.strptime(create_date, "%Y/%m/%d").date()
except Exception as e:
    create_date = datetime.datetime.now().date()
  • 安装mysqlclient驱动包

  • 在管道文件中编写存储数据到mysql的管道类
class MysqlPipeline(object):
    #采用同步的机制写入mysql
    def __init__(self):
        self.conn = MySQLdb.connect('localhost', 'root', 'mysql', 'article_spider', charset="utf8", use_unicode=True)
        self.cursor = self.conn.cursor()
 
    def process_item(self, item, spider):
        insert_sql = """insert into article(url_object_id, title, url, create_date, fav_nums)VALUES (%s, %s, %s, %s, %s)"""
        self.cursor.execute(insert_sql, (item["url_object_id"], item["title"], item["url"], item["create_date"], item["fav_nums"]))
        self.conn.commit()
  • 在settings中配置以上管道类,为了演示将数据保存到mysql数据库所以博主这里将图片管道文件注释掉了
ITEM_PIPELINES = {
   'ArticleSpider.pipelines.JsonExporterPipleline':2,
   # 'ArticleSpider.pipelines.JsonWithEncodingPipeline':2,
   # 'ArticleSpider.pipelines.ArticlespiderPipeline': 300,
   # 'scrapy.pipelines.images.ImagesPipeline': 1,
   # 'ArticleSpider.pipelines.ArticleImagePipeline': 1,
   'ArticleSpider.pipelines.MysqlPipeline': 1,
}
  • Debug断点测试是否将要保存到数据库的字段数据成功保存到数据库

  • 使用Navicat工具打开article表数据

  • 退出Debug模式直接run运行后,在Navicat工具中查看爬取的字段数据,由于文章太多所以只爬取一定时间的文字字段数据

  • 当爬取的数据量比较大时,可以使用异步方式将数据写入到数据库中,首先需要在pipelines文件中定义MysqlTwistedPipeline类,逻辑代码如下
class MysqlTwistedPipeline(object):
    def __init__(self, dbpool):
        self.dbpool = dbpool
 
    @classmethod
    def from_settings(cls, settings):
        dbparms = dict(
            host = settings["MYSQL_HOST"],
            db = settings["MYSQL_DBNAME"],
            user = settings["MYSQL_USER"],
            passwd = settings["MYSQL_PASSWORD"],
            charset='utf8',
            cursorclass=MySQLdb.cursors.DictCursor,
            use_unicode=True,
        )
        dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)
        return cls(dbpool)
 
    def process_item(self, item, spider):
        # 使用twisted将mysql插入变成异步执行
        query = self.dbpool.runInteraction(self.do_insert, item)
        query.addErrback(self.handle_error, item, spider)  # 处理异常
 
    def handle_error(self, failure, item, spider):
        # 处理异步插入的异常
        print(failure)
 
    def do_insert(self, cursor, item):
        insert_sql = """insert into article(url_object_id, title, url, create_date, fav_nums)VALUES (%s, %s, %s, %s, %s)"""
        cursor.execute(insert_sql, (item["url_object_id"], item["title"], item["url"], item["create_date"], item["fav_nums"]))
  • 在settings配置文件中配置连接mysql数据库的配置项
MYSQL_HOST = "localhost"
MYSQL_DBNAME = "article_spider"
MYSQL_USER = "root"
MYSQL_PASSWORD = "mysql"
  • 在settings中配置定义异步写入数据库数据的管道类
ITEM_PIPELINES = {
   # 'ArticleSpider.pipelines.JsonExporterPipleline':2,
   'ArticleSpider.pipelines.JsonWithEncodingPipeline':2,
   # 'ArticleSpider.pipelines.ArticlespiderPipeline': 300,
   # 'scrapy.pipelines.images.ImagesPipeline': 1,
   # 'ArticleSpider.pipelines.ArticleImagePipeline': 1,
   # 'ArticleSpider.pipelines.MysqlPipeline': 1,
   'ArticleSpider.pipelines.MysqlTwistedPipeline': 1,
}

六丶将scrapy爬取到的数据写入到elasticsearch中

重点:因为博主使用的Elasticsearch版本为5.1.1,所以在安装elasticsearch-dsl工具的版本应该在5.0.0,<6.0.0之间,最新的elasticsearch-dsl版本为7.0,该最新版本与前面版本做了一些修改所涉及的代码以及用法都不一样,参考github上的文档即可https://github.com/elastic/elasticsearch-dsl-py

# Elasticsearch 7.x
elasticsearch-dsl>=7.0.0,<8.0.0
 
# Elasticsearch 6.x
elasticsearch-dsl>=6.0.0,<7.0.0
 
# Elasticsearch 5.x
elasticsearch-dsl>=5.0.0,<6.0.0
 
# Elasticsearch 2.x
elasticsearch-dsl>=2.0.0,<3.0.0
  • 安装elasticsearch-dsl

  • 在项目models下创建es_types.py文件,在文件中创建索引mapping
# -*- coding: utf-8 -*-
__author__ = 'cdtaogang'
__date__ = '2019/6/2 12:45'
from elasticsearch_dsl import DocType, Date, Nested, Boolean, analyzer,Completion, Keyword, Text, Integer
from elasticsearch_dsl.connections import connections
connections.create_connection(hosts=["localhost"])
 
 
class ArticleType(DocType):
    title = Text(analyzer="ik_max_word")
    create_date = Date()
    url = Keyword()
    url_object_id = Keyword()
    cover_image_url = Keyword()
    cover_image_path = Keyword()
    praise_nums = Integer()
    comment_nums = Integer()
    fav_nums = Integer()
    tags = Text(analyzer="ik_max_word")
    content = Text(analyzer="ik_max_word")
 
    class Meta:
        index = "jobbole"
        doc_type = "article"
 
if __name__ == '__main__':
    ArticleType.init()
  • 运行es_types.py文件

  • 回到elasticsearch-head可视化页面,成功生成jobbole索引

  • 在pipelines文件中定义管道类ElasticsearchPipeline将数据写入到es中
class ElasticsearchPipeline(object):
    # 将数据写入到es中
    def process_item(self, item, spider):
        article = ArticleType()
        article.title = item['title']
        article.create_date = item['create_date']
        article.url = item['url']
        article.meta.id = item['url_object_id']
        article.cover_image_url = item['cover_image_url']
        if "cover_image_path" in item:
            article.cover_image_path = item['cover_image_path']
        article.praise_nums = item['praise_nums']
        article.comment_nums = item['comment_nums']
        article.fav_nums = item['fav_nums']
        article.tags = item['tags']
        article.content = remove_tags(item['content'])
 
        article.save()
        return item
  • Debug断点测试没有任何报错

  • 在elasticsearch-head页面中查看jobbole索引的doc数据,成功将爬取的数据写入到es中

  • 在Kibana页面中对jobbole索引doc内容的tags标签进行匹配搜索,是否能够搜索出来此篇文章内容

  • 退出Debug,运行项目一段时间,爬取大量数据并写入到es中

赞(3) 打赏 源码下载
版权声明:本文为CSDN博主「cdtaogang」的原创文章,遵循CC 4.0 BY-NC-SA版权协议,转载请附上原文出处链接及本声明:记录学习生活 » Django与Elasticsearch交互打造搜索引擎网站(一)

评论 抢沙发

评论前必须登录!

 

觉得文章有用就打赏一下文章作者

非常感谢你的打赏,我们将继续给力更多优质内容,让我们一起创建更加美好的网络世界!

支付宝扫一扫打赏

微信扫一扫打赏