分布式搜索(2)

分布式搜索(2)

数据聚合

对数据进行分类

参加聚合的字段必须是keyword、日期、数值、布尔类型

聚合种类

常见聚合:

  • 桶(Bucket)聚合:用来对文档做分组
    • TermAggregation:按照文档字段值分组,例如按照品牌值分组、按照国家分组
    • Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
  • 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
    • Avg:求平均值
    • Max:求最大值
    • Min:求最小值
    • Stats:同时求max、min、avg、sum等
  • 管道(pipeline)聚合:其它聚合的结果为基础做聚合

DSL聚合语法

桶聚合(Bucket)

  • TermAggregation实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    GET /hotel/_search
    {
      "size": 0,  // 设置size为0,结果中不包含文档,只包含聚合结果
      "aggs": { // 定义聚合
        "brandAgg": { //给聚合起个名字
          "terms": { // 聚合的类型,按照品牌值聚合,所以选择term
            "field": "brand", // 参与聚合的字段
            "size": 20 // 希望获取的聚合结果数量
          }
        }
      }
    }
  • 排序

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    GET /hotel/_search
    {
    "size": 0,
    "aggs": {
    "brandAgg": {
    "terms": {
    "field": "brand",
    "order": {
    "_count": "asc" # 按照数量升序排序
    },
    "size": 10
    }
    }
    }
    }
  • 指定聚合范围,默认对所有文档聚合

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    # bucket
    GET /hotel/_search
    {
    "query": { # 只对查询到的结果进行聚合
    "range": {
    "price": {
    "lte": 200
    }
    }
    },
    "size": 0,
    "aggs": {
    "brandAgg": {
    "terms": {
    "field": "brand",
    "order": {
    "_count": "asc"
    },
    "size": 10
    }
    }
    }
    }

度量(Metric)

对聚合后的桶里的文档进行运算

  • Stats实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    GET /hotel/_search
    {
      "size": 0, 
      "aggs": {
        "brandAgg": { 
          "terms": { 
            "field": "brand", 
            "size": 20
          },
          "aggs": { // 是brands聚合的子聚合,也就是分组后对每组分别计算
            "score_stats": { // 聚合名称
              "stats": { // 聚合类型,这里stats可以计算min、max、avg等
                "field": "score" // 聚合的字段,这里是对score进行运算
              }
            }
          }
        }
      }
    }

RestAPI聚合语法

  • 实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    @Test
    public void testBucket() throws IOException {
    //1.创建request请求对象
    SearchRequest searchRequest = new SearchRequest("hotel");
    //2.组织DSL参数
    //2.1 query
    searchRequest.source().query(QueryBuilders.rangeQuery("price").lte(200));
    //2.2 size
    searchRequest.source().size(0);
    //2.3 aggs
    searchRequest.source().aggregation(AggregationBuilders
    .terms("brandAgg") //聚合类型和聚合名
    .field("brand") //聚合的字段
    .order(BucketOrder.count(true)) //排序
    .size(10)); //希望获取的聚合结果数量

    //3.发送查询请求,获取结果
    SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
    //解析结果
    Aggregations aggregations = response.getAggregations(); //获得聚合集
    Terms brandAgg = aggregations.get("brandAgg"); //获取指定的聚合
    List<? extends Terms.Bucket> buckets = brandAgg.getBuckets(); //获取桶
    for (Terms.Bucket bucket : buckets) {
    String keyAsString = bucket.getKeyAsString();
    System.out.println(keyAsString);
    }
    }

自动补全

安装拼音分词器插件

自定义拼音分词器

elasticsearch中分词器(analyzer)的组成包含三部分:

  • character filters:在tokenizer之前对文本进行处理。例如删除字符、替换字符
  • tokenizer:将文本按照一定的规则切割成词条(term)。例如keyword,就是不分词;还有ik_smart
  • tokenizer filter:将tokenizer输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等
  • 实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    PUT /test
    {
      "settings": {
        "analysis": {
          "analyzer": { // 自定义分词器
            "my_analyzer": {  // 分词器名称
              "tokenizer": "ik_max_word",
              "filter": "py" //过滤器名
            }
          },
          "filter": { // 自定义tokenizer filter
            "py": { // 过滤器名称
              "type": "pinyin", // 过滤器类型,这里是pinyin
    "keep_full_pinyin": false,
              "keep_joined_full_pinyin": true,
              "keep_original": true,
              "limit_first_letter_length": 16,
              "remove_duplicated_term": true,
              "none_chinese_pinyin_tokenize": false
            }
          }
        }
      },
      "mappings": {
        "properties": {
          "name": {
            "type": "text",
            "analyzer": "my_analyzer", //创建时使用的分词器
            "search_analyzer": "ik_smart" //查询时使用的分词器,防止搜汉字时搜到同音字
          }
        }
      }
    }

自动补全查询

参与补全查询的字段必须是completion类型。

字段的内容一般是用来补全的多个词条形成的数组。

  • 实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    // 创建索引库
    PUT test
    {
      "mappings": {
        "properties": {
          "title":{
            "type": "completion"
          }
        }
      }
    }

    // 示例数据
    POST test/_doc
    {
      "title": ["Sony", "WH-1000XM3"]
    }
    POST test/_doc
    {
      "title": ["SK-II", "PITERA"]
    }
    POST test/_doc
    {
      "title": ["Nintendo", "switch"]
    }

    // 自动补全查询
    GET /test/_search
    {
      "suggest": {
        "title_suggest": {
          "text": "s", // 关键字
          "completion": {
            "field": "title", // 补全查询的字段
            "skip_duplicates": true, // 跳过重复的
            "size": 10 // 获取前10条结果
          }
        }
      }
    }

RestAPI补全语法

  • 实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    @Test
    public void testSuggest() throws IOException {
    // 创建request请求对象
    SearchRequest searchRequest = new SearchRequest("hotel");
    searchRequest.source().suggest(new SuggestBuilder()
    .addSuggestion("suggestions", SuggestBuilders
    .completionSuggestion("suggestion")
    .prefix("h")
    .skipDuplicates(true)
    .size(10))
    );
    SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
    //解析结果
    Suggest suggest = response.getSuggest();
    CompletionSuggestion suggestion = suggest.getSuggestion("suggestions");

    for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) {
    System.out.println(option.getText().string());
    }
    }

数据同步

方式一:同步调用

  • 优点:实现简单,粗暴
  • 缺点:直接调接口,业务耦合度高

方式二:异步通知

  • 优点:低耦合,实现难度一般
  • 缺点:依赖mq的可靠性

方式三:监听binlog

  • 优点:完全解除服务间耦合
  • 缺点:开启binlog(类似主从复制)增加数据库负担、实现复杂度高

通过mq来实现数据同步

步骤:

  1. 导入依赖,写配置,在消费者端创建交换机、消息队列
  2. 生产者端导入依赖,写配置,通过RabbitTemplate发送消息
  3. 消费者端监听消息队列,通过获取的消息更新索引表
  4. 测试

实例

消费者端

  • 导入依赖

    1
    2
    3
    4
    5
    <!--amqp-->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  • 配置文件

    1
    2
    3
    4
    5
    6
    7
    spring: 
    rabbitmq:
    host: 192.168.229.128
    port: 5672
    virtual-host: /
    username: root
    password: 123456
  • 创建交换机、队列和key

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    package cn.itcast.hotel.config;

    @Configuration
    public class MqConfiguration {

    /**
    * 交换机
    * @return
    */
    @Bean
    public TopicExchange topicExchange() {
    return new TopicExchange(HOTEL_EXCHANGE, true, false);
    }

    /**
    * 修改消息队列
    * @return
    */
    @Bean
    public Queue insertQueue() {
    return new Queue(HOTEL_INSERT_QUEUE, true);
    }

    /**
    * 删除消息队列
    * @return
    */
    @Bean
    public Queue deleteQueue() {
    return new Queue(HOTEL_DELETE_QUEUE, true);
    }

    /**
    * 绑定队列和交换机
    */
    @Bean
    public Binding insertQueueBinding() {
    return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(HOTEL_INSERT_KEY);
    }

    @Bean
    public Binding deleteQueueBinding() {
    return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(HOTEL_DELETE_KEY);
    }


    }
  • 创建监听器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    package cn.itcast.hotel.mq;

    /**
    * 创建消息队列的监听器
    */
    @Component
    public class HotelListener {

    @Autowired
    private IHotelService service;

    @RabbitListener(queues = HOTEL_INSERT_QUEUE)
    public void listenerHotelInsertOrUpdate(Long id) {
    service.insertById(id);
    }

    @RabbitListener(queues = HOTEL_DELETE_QUEUE)
    public void listenerHotelDelete(Long id) {
    service.deleteById(id);
    }

    }

生产者端

  • 导入依赖、填写配置

  • 发送消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    package cn.itcast.hotel.web;

    @RestController
    @RequestMapping("hotel")
    public class HotelController {

    @Autowired
    private IHotelService hotelService;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/{id}")
    public Hotel queryById(@PathVariable("id") Long id){
    return hotelService.getById(id);
    }

    @GetMapping("/list")
    public PageResult hotelList(
    @RequestParam(value = "page", defaultValue = "1") Integer page,
    @RequestParam(value = "size", defaultValue = "1") Integer size
    ){
    Page<Hotel> result = hotelService.page(new Page<>(page, size));

    return new PageResult(result.getTotal(), result.getRecords());
    }

    @PostMapping
    public void saveHotel(@RequestBody Hotel hotel){
    hotelService.save(hotel);

    //将修改的数据id发到队列
    rabbitTemplate.convertAndSend(HOTEL_EXCHANGE, HOTEL_INSERT_KEY, hotel.getId());
    }

    @PutMapping()
    public void updateById(@RequestBody Hotel hotel){
    if (hotel.getId() == null) {
    throw new InvalidParameterException("id不能为空");
    }
    hotelService.updateById(hotel);

    //将修改的数据id发到队列
    rabbitTemplate.convertAndSend(HOTEL_EXCHANGE, HOTEL_INSERT_KEY, hotel.getId());
    }

    @DeleteMapping("/{id}")
    public void deleteById(@PathVariable("id") Long id) {
    hotelService.removeById(id);

    //将删除的数据id发到队列
    rabbitTemplate.convertAndSend(HOTEL_EXCHANGE, HOTEL_DELETE_KEY, id);
    }
    }

es集群

概念:

  • 集群(cluster):一组拥有共同的 cluster name 的 节点。

  • 节点(node) :集群中的一个 Elasticearch 实例

  • 分片(shard):索引可以被拆分为不同的部分进行存储,称为分片。在集群环境下,一个索引的不同分片可以拆分到不同的节点中

    解决问题:数据量太大,单点存储量有限的问题。

集群职责划分

默认情况下,集群中的任何一个节点都同时具备上述四种角色。但是真实的集群一定要将集群职责分离

脑裂

因为节点失联导致出现多个主节点

解决脑裂的方案是,要求选票超过 ( eligible节点数量 + 1 )/ 2 才能当选为主,因此eligible节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题

集群分布式存储/查询

存储

elasticsearch会通过hash算法来计算文档应该存储到哪个分片

shard = hash(_routing) % number_of_shards

  • _routing: 默认是文档id
  • number_of_shards:分片数,此索引库一旦创建,分片数量不能修改

查询

查询分成两个阶段:

  • scatter phase:分散阶段,coordinating node会把请求分发到每一个分片

  • gather phase:聚集阶段,coordinating node汇总data node的搜索结果,并处理为最终结果集返回给用户

故障转移

集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移。


分布式搜索(2)
http://xwww12.github.io/2022/09/18/微服务/分布式搜索(2)/
作者
xw
发布于
2022年9月18日
许可协议