分布式搜索(2)
分布式搜索(2)
数据聚合
对数据进行分类
参加聚合的字段必须是keyword、日期、数值、布尔类型
聚合种类
常见聚合:
- 桶(Bucket)聚合:用来对文档做分组
- TermAggregation:按照文档字段值分组,例如按照品牌值分组、按照国家分组
- Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
- 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
- Avg:求平均值
- Max:求最大值
- Min:求最小值
- Stats:同时求max、min、avg、sum等
- 管道(pipeline)聚合:其它聚合的结果为基础做聚合
DSL聚合语法
桶聚合(Bucket)
TermAggregation实例
1
2
3
4
5
6
7
8
9
10
11
12GET /hotel/_search
{
"size": 0, // 设置size为0,结果中不包含文档,只包含聚合结果
"aggs": { // 定义聚合
"brandAgg": { //给聚合起个名字
"terms": { // 聚合的类型,按照品牌值聚合,所以选择term
"field": "brand", // 参与聚合的字段
"size": 20 // 希望获取的聚合结果数量
}
}
}
}排序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"order": {
"_count": "asc" # 按照数量升序排序
},
"size": 10
}
}
}
}指定聚合范围,默认对所有文档聚合
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23# bucket
GET /hotel/_search
{
"query": { # 只对查询到的结果进行聚合
"range": {
"price": {
"lte": 200
}
}
},
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"order": {
"_count": "asc"
},
"size": 10
}
}
}
}
度量(Metric)
对聚合后的桶里的文档进行运算
Stats实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20
},
"aggs": { // 是brands聚合的子聚合,也就是分组后对每组分别计算
"score_stats": { // 聚合名称
"stats": { // 聚合类型,这里stats可以计算min、max、avg等
"field": "score" // 聚合的字段,这里是对score进行运算
}
}
}
}
}
}
RestAPI聚合语法
实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27@Test
public void testBucket() throws IOException {
//1.创建request请求对象
SearchRequest searchRequest = new SearchRequest("hotel");
//2.组织DSL参数
//2.1 query
searchRequest.source().query(QueryBuilders.rangeQuery("price").lte(200));
//2.2 size
searchRequest.source().size(0);
//2.3 aggs
searchRequest.source().aggregation(AggregationBuilders
.terms("brandAgg") //聚合类型和聚合名
.field("brand") //聚合的字段
.order(BucketOrder.count(true)) //排序
.size(10)); //希望获取的聚合结果数量
//3.发送查询请求,获取结果
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
//解析结果
Aggregations aggregations = response.getAggregations(); //获得聚合集
Terms brandAgg = aggregations.get("brandAgg"); //获取指定的聚合
List<? extends Terms.Bucket> buckets = brandAgg.getBuckets(); //获取桶
for (Terms.Bucket bucket : buckets) {
String keyAsString = bucket.getKeyAsString();
System.out.println(keyAsString);
}
}
自动补全
安装拼音分词器插件
下载Release v8.4.1 · medcl/elasticsearch-analysis-pinyin (github.com)
查看插件挂载的数据卷
1
docker volume inspect es-plugins
将下载的插件解压到目录下
重启
测试
1
2
3
4
5GET /_analyze
{
"text": ["拼音分词器"],
"analyzer": "pinyin"
}
自定义拼音分词器
elasticsearch中分词器(analyzer)的组成包含三部分:
- character filters:在tokenizer之前对文本进行处理。例如删除字符、替换字符
- tokenizer:将文本按照一定的规则切割成词条(term)。例如keyword,就是不分词;还有ik_smart
- tokenizer filter:将tokenizer输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等
实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33PUT /test
{
"settings": {
"analysis": {
"analyzer": { // 自定义分词器
"my_analyzer": { // 分词器名称
"tokenizer": "ik_max_word",
"filter": "py" //过滤器名
}
},
"filter": { // 自定义tokenizer filter
"py": { // 过滤器名称
"type": "pinyin", // 过滤器类型,这里是pinyin
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "my_analyzer", //创建时使用的分词器
"search_analyzer": "ik_smart" //查询时使用的分词器,防止搜汉字时搜到同音字
}
}
}
}
自动补全查询
参与补全查询的字段必须是completion类型。
字段的内容一般是用来补全的多个词条形成的数组。
实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40// 创建索引库
PUT test
{
"mappings": {
"properties": {
"title":{
"type": "completion"
}
}
}
}
// 示例数据
POST test/_doc
{
"title": ["Sony", "WH-1000XM3"]
}
POST test/_doc
{
"title": ["SK-II", "PITERA"]
}
POST test/_doc
{
"title": ["Nintendo", "switch"]
}
// 自动补全查询
GET /test/_search
{
"suggest": {
"title_suggest": {
"text": "s", // 关键字
"completion": {
"field": "title", // 补全查询的字段
"skip_duplicates": true, // 跳过重复的
"size": 10 // 获取前10条结果
}
}
}
}
RestAPI补全语法
实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20@Test
public void testSuggest() throws IOException {
// 创建request请求对象
SearchRequest searchRequest = new SearchRequest("hotel");
searchRequest.source().suggest(new SuggestBuilder()
.addSuggestion("suggestions", SuggestBuilders
.completionSuggestion("suggestion")
.prefix("h")
.skipDuplicates(true)
.size(10))
);
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
//解析结果
Suggest suggest = response.getSuggest();
CompletionSuggestion suggestion = suggest.getSuggestion("suggestions");
for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) {
System.out.println(option.getText().string());
}
}
数据同步
方式一:同步调用
- 优点:实现简单,粗暴
- 缺点:直接调接口,业务耦合度高
方式二:异步通知
- 优点:低耦合,实现难度一般
- 缺点:依赖mq的可靠性
方式三:监听binlog
- 优点:完全解除服务间耦合
- 缺点:开启binlog(类似主从复制)增加数据库负担、实现复杂度高
通过mq来实现数据同步
步骤:
- 导入依赖,写配置,在消费者端创建交换机、消息队列
- 生产者端导入依赖,写配置,通过RabbitTemplate发送消息
- 消费者端监听消息队列,通过获取的消息更新索引表
- 测试
实例
消费者端
导入依赖
1
2
3
4
5<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>配置文件
1
2
3
4
5
6
7spring:
rabbitmq:
host: 192.168.229.128
port: 5672
virtual-host: /
username: root
password: 123456创建交换机、队列和key
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47package cn.itcast.hotel.config;
@Configuration
public class MqConfiguration {
/**
* 交换机
* @return
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(HOTEL_EXCHANGE, true, false);
}
/**
* 修改消息队列
* @return
*/
@Bean
public Queue insertQueue() {
return new Queue(HOTEL_INSERT_QUEUE, true);
}
/**
* 删除消息队列
* @return
*/
@Bean
public Queue deleteQueue() {
return new Queue(HOTEL_DELETE_QUEUE, true);
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding insertQueueBinding() {
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(HOTEL_INSERT_KEY);
}
@Bean
public Binding deleteQueueBinding() {
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(HOTEL_DELETE_KEY);
}
}创建监听器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22package cn.itcast.hotel.mq;
/**
* 创建消息队列的监听器
*/
@Component
public class HotelListener {
@Autowired
private IHotelService service;
@RabbitListener(queues = HOTEL_INSERT_QUEUE)
public void listenerHotelInsertOrUpdate(Long id) {
service.insertById(id);
}
@RabbitListener(queues = HOTEL_DELETE_QUEUE)
public void listenerHotelDelete(Long id) {
service.deleteById(id);
}
}
生产者端
导入依赖、填写配置
发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54package cn.itcast.hotel.web;
@RestController
@RequestMapping("hotel")
public class HotelController {
@Autowired
private IHotelService hotelService;
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/{id}")
public Hotel queryById(@PathVariable("id") Long id){
return hotelService.getById(id);
}
@GetMapping("/list")
public PageResult hotelList(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "size", defaultValue = "1") Integer size
){
Page<Hotel> result = hotelService.page(new Page<>(page, size));
return new PageResult(result.getTotal(), result.getRecords());
}
@PostMapping
public void saveHotel(@RequestBody Hotel hotel){
hotelService.save(hotel);
//将修改的数据id发到队列
rabbitTemplate.convertAndSend(HOTEL_EXCHANGE, HOTEL_INSERT_KEY, hotel.getId());
}
@PutMapping()
public void updateById(@RequestBody Hotel hotel){
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能为空");
}
hotelService.updateById(hotel);
//将修改的数据id发到队列
rabbitTemplate.convertAndSend(HOTEL_EXCHANGE, HOTEL_INSERT_KEY, hotel.getId());
}
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
//将删除的数据id发到队列
rabbitTemplate.convertAndSend(HOTEL_EXCHANGE, HOTEL_DELETE_KEY, id);
}
}
es集群
概念:
集群(cluster):一组拥有共同的 cluster name 的 节点。
节点(node) :集群中的一个 Elasticearch 实例
分片(shard):索引可以被拆分为不同的部分进行存储,称为分片。在集群环境下,一个索引的不同分片可以拆分到不同的节点中
解决问题:数据量太大,单点存储量有限的问题。
集群职责划分
默认情况下,集群中的任何一个节点都同时具备上述四种角色。但是真实的集群一定要将集群职责分离
脑裂
因为节点失联导致出现多个主节点
解决脑裂的方案是,要求选票超过 ( eligible节点数量 + 1 )/ 2 才能当选为主,因此eligible节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题
集群分布式存储/查询
存储
elasticsearch会通过hash算法来计算文档应该存储到哪个分片
shard = hash(_routing) % number_of_shards
- _routing: 默认是文档id
- number_of_shards:分片数,此索引库一旦创建,分片数量不能修改
查询
查询分成两个阶段:
scatter phase:分散阶段,coordinating node会把请求分发到每一个分片
gather phase:聚集阶段,coordinating node汇总data node的搜索结果,并处理为最终结果集返回给用户
故障转移
集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移。