黑马头条笔记
App端
登录
需求
用户可以选择登录和不登录
- 登录后有查看和操作的权限
- 不登录只有查看的权限
知识点
用户密码加密
注册时
- 用户输入密码,生成随机的盐值,把密码和盐值拼接后,经过MD5生成秘钥并保存
- 盐值也需要一同保存到数据库
登录时
- 把用户输入的密码和数据库中保存的盐值进行MD5加密,生成的秘钥和数据库中的秘钥进行比对
MD5生成的秘钥是不可逆的
接口文档
swagger
导包
1 |
|
添加配置类
1 |
|
访问
localhost:51801/swagger-ui.html
knife4j
swagger对应的包也要导
1 |
|
配置类
1 |
|
访问
localhost:51801/doc.html
网关
授权、限流、登录、日志、跨域
导包
1 |
|
添加本地配置
1 |
|
nacos添加配置
1 |
|
网关的过滤器
实现GlobalFilter和Ordered接口
GlobalFilter的filter方法为具体的过滤逻辑
Ordered为设置过滤器的优先级,数值越小优先级越高
1 |
|
nginx
反向代理,负载均衡,部署前端项目
- 配置nginx.conf文件
- 通过
nginx
命令启动nginx
在conf
文件夹下创建新的文件夹来放自己编写的conf
文件
1 |
|
在conf
文件夹下的nginx.conf
里导入自己编写的conf
文件
1 |
|
重新加载配置文件nginx -s reload
并重启
查看文章
需求
刷新
上拉刷新:加载最新文章
下拉刷新:加载早先的文章
展示文章详情
知识点
拆分表
ap_article:文章概述
ap_article_config:文章配置(是否删除等)
ap_article_content:文章详情
垂直分表:将一个表的字段分散到多个表中
好处:
- 减少锁表的几率,查看文章概述和详情互不影响
- 查询文章详情数据效率低,不会影响到查询文章概述
拆分规则:
- 不常用字段单独放一张表
- 把大字段拆分出来单独一张表
- 经常组合查询的字段单独一张表
FreeMarker模板引擎
导包
1 |
|
添加配置
1 |
|
添加模板
在resources/templates/下
1 |
|
使用
1 |
|
MinIO
分布式对象存储服务
概念
- bucket:类似目录
- Object:类似文件
- Keys:类似文件名
docker安装
1 |
|
访问控制台192.168.200.130:9000
使用java上传文件
导包
1 |
|
1 |
|
使用:
通过FreeMarker将文章数据和模板绑定后生成静态文件,把静态的html文件保存到MinIO中,之后访问对应的文章页面就直接从MinIO中返回,不用再查询数据库
保存文章
自媒体端发布文章,审核通过后,App端需要保存文章信息
通过Feign远程让自媒体端远程调用
有带文章id:
- 修改文章
- 更新文章、文章内容
没有带文章id:
- 新增的文章
- 要保存文章、文章配置、文章内容
文章搜索
知识点
elasticsearch
安装
拉取镜像
1
docker pull elasticsearch:7.4.0
创建容器
1
docker run -id --name elasticsearch -d --restart=always -p 9200:9200 -p 9300:9300 -v /usr/share/elasticsearch/plugins:/usr/share/elasticsearch/plugins -e "discovery.type=single-node" elasticsearch:7.4.0
设置ik分词器
1
2把ik分词器上传到挂载的目录/usr/share/elasticsearch/plugins中解压
重启es测试
添加映射
添加语法:put请求发送
ip:端口/映射名
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39192.168.200.130:9200/app_info_article
{
"mappings":{
"properties":{
"id":{
"type":"long"
},
"publishTime":{
"type":"date"
},
"layout":{
"type":"integer"
},
"images":{
"type":"keyword",
"index": false
},
"staticUrl":{
"type":"keyword",
"index": false
},
"authorId": {
"type": "long"
},
"authorName": {
"type": "text"
},
"title":{
"type":"text",
"analyzer":"ik_smart"
},
"content":{
"type":"text",
"analyzer":"ik_smart"
}
}
}
}其他操作
1
2
3
4
5GET请求查询映射:http://192.168.200.130:9200/app_info_article
DELETE请求,删除索引及映射:http://192.168.200.130:9200/app_info_article
GET请求,查询所有文档:http://192.168.200.130:9200/app_info_article/_search
初始化索引库
导包
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35<!-- 引入依赖模块 -->
<dependency>
<groupId>com.heima</groupId>
<artifactId>heima-leadnews-common</artifactId>
</dependency>
<!-- Spring boot starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--elasticsearch-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.4.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.4.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.4.0</version>
</dependency>
<dependency>
<groupId>org.jsoup</groupId>
<artifactId>jsoup</artifactId>
<version>1.13.1</version>
</dependency>es配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "elasticsearch")
public class ElasticSearchConfig {
private String host;
private int port;
@Bean
public RestHighLevelClient client(){
return new RestHighLevelClient(RestClient.builder(
new HttpHost(
host,
port,
"http"
)
));
}
}配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23server:
port: 9999
spring:
application:
name: es-article
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/leadnews_article?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
username: root
password: 123456
# 设置Mapper接口所对应的XML文件位置,如果你在Mapper接口中有自定义方法,需要进行该配置
mybatis-plus:
mapper-locations: classpath*:mapper/*.xml
# 设置别名包扫描路径,通过该属性可以给包中的类注册别名
type-aliases-package: com.heima.model.article.pojos
#自定义elasticsearch连接配置
elasticsearch:
host: 192.168.200.130
port: 9200创建实体、mapper
导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32@SpringBootTest
@RunWith(SpringRunner.class)
public class ApArticleTest {
@Resource
private ApArticleMapper apArticleMapper;
@Resource
private RestHighLevelClient restHighLevelClient;
/**
* 初始化es中的数据
*/
@Test
public void init() throws IOException {
// 查询文章
List<SearchArticleVo> searchArticleVos = apArticleMapper.loadArticleList();
// 批量导入es
BulkRequest bulkRequest = new BulkRequest("app_info_article");
for (SearchArticleVo searchArticleVo : searchArticleVos) {
IndexRequest indexRequest = new IndexRequest()
.id(searchArticleVo.getId().toString())
.source(JSON.toJSONString(searchArticleVo), XContentType.JSON);
bulkRequest.add(indexRequest);
}
restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
System.out.println("文章信息导入es完成");
}
}
需求
搜索栏
创建文章时添加文档到索引
搜索记录
每个用户都需要存储搜搜记录,数据量较大,使用MongoDB来存
知识点
MongoDB
docker安装
1 |
|
spring集成mongodb
导入依赖
1
2
3
4<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>添加配置
1
2
3
4
5
6
7
8server:
port: 9998
spring:
data:
mongodb:
host: 192.168.200.130
port: 27017
database: leadnews-history映射类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19@Data
@Document("ap_associate_words")
public class ApAssociateWords implements Serializable {
private static final long serialVersionUID = 1L;
private String id;
/**
* 联想词
*/
private String associateWords;
/**
* 创建时间
*/
private Date createdTime;
}测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35@SpringBootTest(classes = MongoApplication.class)
@RunWith(SpringRunner.class)
public class MongoTest {
@Resource
private MongoTemplate mongoTemplate;
@Test
public void saveTest() {
ApAssociateWords apAssociateWords = new ApAssociateWords();
apAssociateWords.setAssociateWords("test");
apAssociateWords.setCreatedTime(new Date());
mongoTemplate.save(apAssociateWords);
}
@Test
public void findByIdTest(){
ApAssociateWords apAssociateWords = mongoTemplate.findById("64b65971d91a9f14ddf8da6a", ApAssociateWords.class);
System.out.println(apAssociateWords);
}
@Test
public void findByWordsTest() {
Query query = Query.query(Criteria.where("associateWords").is("test"))
.with(Sort.by(Sort.Direction.DESC, "createdTime"));
List<ApAssociateWords> apAssociateWordsList = mongoTemplate.find(query, ApAssociateWords.class);
System.out.println(apAssociateWordsList);
}
@Test
public void testDel() {
Query query = Query.query(Criteria.where("associateWords").is("test"));
mongoTemplate.remove(query, ApAssociateWords.class);
}
}
需求
保存用户的最新的10条记录在mongodb中
如果记录在历史中,更新时间
如果不在历史中,删掉最旧的一条记录再添加
用户行为
需求
所有用户行为信息都要存到Redis中
点赞/取消点赞
用户点赞后记录到redis中,取消点赞时从redis中删除
阅读数
将文章阅读数以hash结构存到redis
key:Article:viewCont;key:articleId;value:阅读数
展示用户行为数据
通过用户id和文章id查找redis的set中是否有对应的value
关注/取关
CRUD,同时改关注表和粉丝表
收藏
CRUD
热门文章
知识点
xxl-job
分布式任务调度框架
spring传统的定时任务@Scheduled,但是这样存在这一些问题 :
- 做集群任务的重复执行问题
- cron表达式定义在代码之中,修改不方便
- 定时任务失败了,无法重试也没有统计
- 如果任务量过大,不能有效的分片执行
部署
windows
- 下载源码:https://gitee.com/xuxueli0323/xxl-job
- 执行sql
- 修改
xxl-job-admin
下的数据库配置 - 启动
XxlJobAdminApplication
- 访问
http://localhost:8080/xxl-job-admin/
,账号admin,密码123456
docker
创建mysql容器,执行xxl-job的sql脚本
1
2
3
4
5
6docker run -p 3306:3306 --name mysql57 \
-v /opt/mysql/conf:/etc/mysql \
-v /opt/mysql/logs:/var/log/mysql \
-v /opt/mysql/data:/var/lib/mysql \
-e MYSQL_ROOT_PASSWORD=root \
-d mysql:5.7拉取xxl-job镜像
1
docker pull xuxueli/xxl-job-admin:2.3.0
创建xxl-job容器
1
2
3
4
5docker run -e PARAMS="--spring.datasource.url=jdbc:mysql://192.168.200.130:3306/xxl_job?Unicode=true&characterEncoding=UTF-8 \
--spring.datasource.username=root \
--spring.datasource.password=root" \
-p 8888:8080 -v /tmp:/data/applogs \
--name xxl-job-admin --restart=always -d xuxueli/xxl-job-admin:2.3.0访问测试
使用
导包
1
2
3
4
5
6
7
8
9
10<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.0</version>
</dependency>添加配置类,可以从官方的源码里找到
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26package com.heima.xxlJob.config;
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.port}")
private int port;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setPort(port);
return xxlJobSpringExecutor;
}
}添加配置
1
2
3
4
5
6
7
8
9
10
11server:
port: 8881
xxl:
job:
admin:
addresses: http://192.168.200.130:8888/xxl-job-admin
executor:
appname: sample
port: 9999 # 随意编写要执行的方法
1
2
3
4
5
6
7
8
9package com.heima.xxlJob.jpb;
@Component
public class HelloJob {
@XxlJob("demoJobHandler")
public void hello() {
System.out.println("job test");
}
}在控制台添加执行器管理和任务管理
在任务管理里运行定时任务
路由策略
- 轮询:服务器轮流执行任务
- 分片广播:将任务序号取模来分配到服务器上
KafkaStream
Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。
概念
Source Processor:源处理器,从Topic中生成输入流,转发给其他处理器进行处理
Sink Processor:将处理器处理完的数据转发给指定的Topic
入门案例
1 |
|
往itcast_topic_input
发送消息,itcast_topic_out
接收消息
springboot整合kafkaStream
添加配置
1
2
3kafka:
hosts: 192.168.200.130:9092
group: ${spring.application.name}添加配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27package com.heima.kafka.config;
@Getter
@Setter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix = "kafka")
public class KafkaStreamConfig {
private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
private String hosts;
private String group;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
props.put(StreamsConfig.RETRIES_CONFIG, 10);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return new KafkaStreamsConfiguration(props);
}
}添加监听器,处理流数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33package com.heima.kafka.listener;
@Configuration
@Slf4j
public class KafkaStreamHelloListener {
@Bean
public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
//创建kstream对象,同时指定从那个topic中接收消息
KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.split(" "));
}
})
//根据value进行聚合分组
.groupBy((key,value)->value)
//聚合计算时间间隔
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
//求单词的个数
.count()
.toStream()
//处理后的结果转换为string字符串
.map((key,value)->{
System.out.println("key:"+key+",value:"+value);
return new KeyValue<>(key.key().toString(),value.toString());
})
//发送消息
.to("itcast-topic-out");
return stream;
}
}
需求
定时计算:
为每个频道缓存热度最高的30条文章(5天之内的):
根据文章的阅读,点赞,评论,收藏加权(1,3,5,8)来判断文章的热度
feign调用wemedia,获取所有频道
根据频道来分组文章并排序,缓存热度最高的30条文章
实时计算:
behavior微服务中,在记录用户行为后,发送消息到kafka的一个topic中
article微服务中,创建handle获取kafka的消息,记录、转换后发送到另一个topic中
之后article里面创建一个listener来监听这个topic来更新数据库和缓存
自媒体端
登录
同app端的登录
上传素材
获取登录用户id
1.1 在wemedia网关中将token中的载荷userId存到header中
1.2 wemedia服务中通过拦截器,将header中的id存到ThreadLocal中
保存素材到minIO
2.1 校验参数,UUID随机生成素材名称
2.2 保存到minIO后,将访问地址保存到数据库中
2.3 返回素材数据供前端回显
展示素材列表
分页查询,查询条件为是全部还是已收藏的素材
根据ThreadLocal中的userId查找用户的素材
发布文章
- 如果文章状态为草稿
- 只保存文章信息,文章和图片的对应关系暂不保存
- 不为草稿
- 保存文章信息和文章与图片的对应关系
- 判断文章封面图片
- 选择无图、单图、三图:选择文章中的图片
- 选择自动:根据文章中图片的数量来决定
展示文章
同展示素材列表
文章审核
接入阿里云的内容安全服务,通过调用第三方接口对文章内容进行审查
需要开通阿里云内容安全服务,accessKeyId,secret
审核完图片和文章后,调用App端微服务的保存文章Feign接口
服务降级:
- 实现feign远程接口,编写降级逻辑
- 在
@FeignClient
注解中指定降级逻辑 - 在自媒体端中添加配置类来扫描服务降级类所在的包
- 在自媒体端的远程配置中开启服务降级
异步调用
- 自媒体端保存文章时异步调用文章审核
- 开启异步调用@EnableAsync
- 在审核文章方法上加@Async
敏感词过滤
文本过滤方案
方案 | 说明 |
---|---|
数据库模糊查询 | 效率太低 |
String.indexOf(“”)查找 | 数据库量大的话也是比较慢 |
全文检索 | 分词再匹配 |
DFA算法 | 确定有穷自动机(一种数据结构) |
DFA算法类似字典树来匹配敏感词
图片过滤方案
OCR (Optical Character Recognition,光学字符识别)是指电子设备(例如扫描仪或数码相机)检查纸上打印的字符,通过检测暗、亮的模式确定其形状,然后用字符识别方法将形状翻译成计算机文字的过程
方案 | 说明 |
---|---|
百度OCR | 收费 |
Tesseract-OCR | Google维护的开源OCR引擎,支持Java,Python等语言调用 |
Tess4J | 封装了Tesseract-OCR ,支持Java调用 |
Tess4J使用
导包
1
2
3
4
5<dependency>
<groupId>net.sourceforge.tess4j</groupId>
<artifactId>tess4j</artifactId>
<version>4.1.1</version>
</dependency>使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public class Main {
public static void main(String[] args) {
try {
// 本地图片文件
File file = new File("C:\\Users\\ASUS\\Desktop\\aaa.jpg");
// 创建Tesseract对象
Tesseract tesseract = new Tesseract();
// 设置字体库路径
tesseract.setDatapath("D:\\download\\");
// 设置具体字体库
tesseract.setLanguage("chi_sim");
// 识别图片文字
String result = tesseract.doOCR(file);
result = result.replace("\\r|\\n", "-");
System.out.println("识别结果:" + result);
} catch (TesseractException e) {
e.printStackTrace();
}
}
}
文章定时发布
对比
DelayQueue:
JDK自带的延迟队列,队列元素实现Delayed接口(获取剩余时间,排序),之后将元素添加到DelayerQueue
中,只有过期的元素才能poll()出来
缺点:
- 服务关闭就丢失队列中的任务
RabbitMQ:
通过死信交换机实现延迟队列,当消息过期后会被发送到死信交换机中
Redis:
先将任务添加到数据库中实现持久化,
然后把未来5分钟之内要执行的任务添加到zSet,zSet再把当前要执行的任务放到list中
Redis实现定时发布
拉取、启动redis
1
2
3docker pull redis
docker run -d --name redis --restart=always -p 6379:6379 redis --requirepass "leadnews"导包
1
2
3
4
5
6
7
8
9<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- redis依赖commons-pool 这个依赖一定要添加 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>添加配置,测试
生产任务
先将任务及日志添加到数据库中实现持久化,
然后把未来5分钟之内要执行的任务添加到zSet,zSet再把当前要执行的任务放到list中
取消任务
将任务从数据库中删除,更改对应日志的状态,
然后删除redis中的任务
消费任务
删除redis、db中任务,更新日志,返回任务
从zSet中刷新任务
将zSet中的到期任务刷到对应list中
使用scan
来获取一定数量的任务,keys *
占用cpu太多不使用
通过定时任务实现定时刷新
通过redis的setnx
实现分布式锁,让多个定时任务同时只能有一个执行刷新任务
从DB中同步任务
先清空redis中的任务,保证任务不会重复存在于redis中
从DB中将5分钟之内的数据刷新到redis中
heima-leadnews-schedule
通过Feign向外提供服务
文章上架下架
知识点
Kafka
mq比较
- RabbitMQ:性能较好,数据量没有那么大
- RocketMQ:可靠性好
- Kafka:吞吐量高
概念
- producer:发布消息的对象称之为主题生产者(Kafka topic producer)
- topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic),一个topic可以在集群上的不同机器上有分区
- consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
- broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
安装
安装zookeeper
1
2
3docker pull zookeeper:3.4.14
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14安装kafka
1
2
3
4
5
6
7
8
9docker pull wurstmeister/kafka:2.12-2.3.1
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1测试
导包
1
2
3
4<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31package com.heima.kafka.sample;
public class ProducerQuickStart {
public static void main(String[] args) {
//1.kafka的配置信息
Properties properties = new Properties();
//kafka的连接地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
//发送失败,失败的重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,5);
//消息key的序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//消息value的序列化器
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
// 生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 消息
ProducerRecord<String, String> record = new ProducerRecord<>("topic001","key001","hello kafka");
// 发送消息
producer.send(record);
System.out.println("=======消息发送成功");
// 关闭消息通道,消息才能发送出去
producer.close();
}
}消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32package com.heima.kafka.sample;
public class ConsumerQuickStart {
public static void main(String[] args) {
//1.添加kafka的配置信息
Properties properties = new Properties();
//kafka的连接地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
//消费者组,同一个组里的同时只能有一个消费者接收到消息
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
//消息的反序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//2.消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//3.订阅主题
consumer.subscribe(Collections.singletonList("topic001"));
// 接收消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.key());
System.out.println(record.value());
}
}
}
}
kafka高可用
集群
集群中的一台服务器称为Broker,即一台服务器宕机了,任然可以对外提供服务
备份
主节点:Leader Replica;从节点:Follower Replica
从节点又分为ISR(in-sync replica)和普通节点。
ISR需要同步备份,从节点异步备份。
所以如果主节点宕机了,如果要数据可靠,则从ISR节点中选择一个作为主节点;如果要高可用,则从任意一个活着的节点中选一个作为主节点
生产者
同步发送消息
1
2RecordMetadata recordMetadata = producer.send(record).get();
System.out.println("偏移量:" + recordMetadata.offset());异步发送消息
1
2
3
4
5
6
7
8producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null)
e.printStackTrace();
System.out.println("偏移量:" + recordMetadata.offset());
}
});部分配置
1
2
3
4
5
6// 发送失败,失败的重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,5);
// 应答方式 0: 不需要应答 1: 主节点收到后应答(默认) 2: 同步给所有从节点后应答
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 压缩方式 snappy,lz4,gzip
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
消费者
消费者组
一个消息在消费者组中只能被一个消费者消费
一个消息可以被不同组的消费者消费
消息有序性
一个Topic的分区里的消息是按顺序消费的,但不同分区不能保证有序性。
要保证有序性那就只能提供一个分区
场景:聊天,多次转账的越变更通知
偏移量
消费者每处理完一批消息,就会往’_consumer_offset’这个Topic里提交偏移量,偏移量用于确定消息队列中哪些消息被消费了
自动提交偏移量可能带来的问题
重复消费
如果消费者处理一批消息到一半宕机了,但还没有提交偏移量,其他消费者就会来接手分区继续处理消息,此时就会重复消费消息
消息丢失
如果正在处理消息时就提交偏移量,此时宕机了,那么正在处理的消息就会丢失
手动提交可以解决这些问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19// 取消自动提交偏移量
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
try {
while (true) {
// 消费消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.key());
System.out.println(record.value());
}
consumer.commitAsync(); // 异步提交偏移量,如果出现异常不会重试
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("错误信息:" + e);
} finally {
consumer.commitSync(); // 同步提交,如果之前异步提交失败了,这里会不断重试知道发送成功
}
Spring集成Kafka
导包
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--spring集成kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--Kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
</dependencies>配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18server:
port: 9991
spring:
application:
name: kafka-demo
kafka:
# Kafka地址
bootstrap-servers: 192.168.200.130:9092
# 生产者配置
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 消费者配置
consumer:
group-id: ${spring.application.name}-test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer生产消息,要发送对象可以转Json
1
2
3
4
5
6
7
8
9
10
11
12
13@RestController
public class HelloController {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("/hello")
public String hello() {
// 发送消息到'test-topic'
kafkaTemplate.send("test-topic", "message");
return "ok";
}
}监听Topic
1
2
3
4
5
6
7
8
9@Component
public class HelloListener {
// 从'test-topic'中接收消息
@KafkaListener(topics = {"test-topic"})
public void onMessage(String message) {
System.out.println(message);
}
}
需求
自媒体端设置文章上下架,通过kafka将文章配置同步到app端
平台管理端
登录
登录接口:根据传来的用户名密码,和数据库中的进行加盐比对来判断是否登录成功
网关:解析token,将解析出来的userId放到请求头中
nginx:将请求反向代理到网关,网关再路由到登录接口
频道管理
CRUD
敏感词管理
CRUD
用户认证审核
CRUD
文章人工审核
CRUD
部署
持续集成
持续集成( Continuous integration , 简称 CI )指的是,频繁地(一天多次)将代码集成到主干
一个自动构建过程, 从检出代码、 编译构建、 运行测试、 结果记录、 测试统计等都是自动完成的, 无需人工干预。
知识点
Jenkins
Jenkins 是一款流行的开源持续集成(Continuous Integration)工具,广泛用于项目开发,具有自动化构建、测试和部署等功能
流程:提交代码到git -> jenkins拉取代码,使用jdk、maven等对代码进行编译、打包等操作 -> 最后把生成的jar或war分发到服务器上
安装
centos7
yum安装
添加安装源
1
2
3sudo wget -O /etc/yum.repos.d/jenkins.repo https://pkg.jenkins.io/redhat-stable/jenkins.repo --no-check-certificate
sudo rpm --import https://pkg.jenkins.io/redhat-stable/jenkins.io.key命令安装
1
yum -y install jenkins
修改配置
1
2
3
4
5
6vim /etc/sysconfig/jenkins
# 修改为对应的目标用户, 这里使用的是root
$JENKINS_USER="root"
# 服务监听端口
JENKINS_PORT="16060"1
2
3
4
5# 修改目录权限
chown -R root:root /var/lib/jenkins
chown -R root:root /var/cache/jenkins
chown -R root:root /var/log/jenkins重启
1
2
3
4
5
6systemctl restart jenkins
# 如果报错
# Starting Jenkins bash: /usr/bin/java: No such file or directory
# 需要创建java环境的软链接
# ln -s /usr/local/jdk/bin/java /usr/bin/java访问控制台
获取管理密码
1
cat /var/lib/jenkins/secrets/initialAdminPassword
安装插件
jenkins的功能需要安装插件来添加
安装方式:
进入【系统管理】-【插件管理】
点击标签页的【可选插件】
安装环境
在linux系统中安装jdk、git、maven等应用
然后到控制台的Manager Jenkins的Global Tools Configuration中指定所在位置
多环境开发
- Development :开发环境
- Production :生产环境
- Test :测试环境
添加配置指定使用哪个环境的配置文件
1 |
|
在nacos中添加${spring.application.name}-环境.yml
来设置不同环境的配置文件
部署
- 代码提交到远程仓库
- Jenkins拉取代码,用maven打包,dockerfile创建镜像
- 服务器上运行容器
(示例)
添加maven插件,自动生成镜像
1 |
|
添加Dockerfile
1 |
|
将工程上传到远程仓库
Jenkins工作台 -> 新建Item -> Freestyle project