Netty

Netty

Netty是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端

NIO基础

NIO(non-blocking io)非阻塞IO

三大组件

Channel

读写数据的双向通道

常见Channel有:

  • FileChannel:传输文件
  • DatagramChannel:UDP
  • SocketChannel:TCP
  • ServerSocketChannel:TPC,专用于服务器

Buffer

缓冲读写数据

常见Buffer有:

  • ByteBuffer

Selector

在服务器和用户连接上的几种策略:

  1. 多线程:一个线程对应一个用户(socket)。缺点:内存占用高,上下文切换成本高,只适合连接少的场景
  2. 线程池:一个线程池对应多个用户(socket)。缺点:阻塞模式下,线程仅能处理一个socket,仅适合短连接场景
  3. selector:一个selector和一个线程管理多个channel,当channel有任务要线程处理时,由selector通知线程去处理,线程是非阻塞的。适合连接多,流量低的场景。

多线程版

线程池版

selector版

ByteBuffer

基本使用

功能:在项目目录下创建一个data.txt文件,读取这个文件的内容

使用步骤:

  1. 像buffer写入数据,如channel.read(buffer)
  2. 调用filp()切换至读模式
  3. 从buffer中读取数据,如buffer.get()
  4. 调用clear()或compact()切换至写模式
  5. 重复以上步骤
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public static void main(String[] args) {
// 1. 准备输入输出流
try (FileChannel channel = new FileInputStream("data.txt").getChannel()){
// 2.准备缓冲区
ByteBuffer buffer = ByteBuffer.allocate(10); // 划分10个字节作为缓冲区
// 3.写入缓冲区
while (channel.read(buffer) != -1) {
// 4.从buffer中读出数据
buffer.flip(); // 切换到读模式
while (buffer.hasRemaining()) {
System.out.println((char) buffer.get());
}
buffer.clear(); // 切换到写模式
}
} catch (IOException e) {
e.printStackTrace();
}
}

结构

属性:

  1. capacity 容量
  2. position 读写指针
  3. limit 读写的尾部指针

初始时

写入内容,p指针后移

调用flip读内容,l指针移到p指针位置,重置p指针

调用clean,重置为初始状态,从头开始写

调用compact,会把未读的内容保留,在后面追加写

常见方法

分配空间

分配一块固定的容量(字节)

1
2
3
4
5
// 分配堆内存,效率较低
ByteBuffer.allocate(10);
// 分配直接内存(系统内存),效率较高(零拷贝),不受GC影响,
// 但分配效率低,使用不当可能造成内存泄漏
ByteBuffer.allocateDirect(16);
写入数据
1
2
3
4
// 从channel中读到buffer
channel.read(buffer);
// 直接向buffer中写
buffer.put();
读取数据
1
2
3
4
// 把buffer中的数据写到channel
channel.write(buffer);
// 直接从buffer中读
buffer.get();
  • rewind() 将position移到头部
  • get(int idx) 读取指定下标的数据,不会改变position指针
做标记
1
2
3
4
5
6
7
8
9
10
ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put(new byte[]{'a', 'b', 'c', 'd'});
buffer.flip();

System.out.println(buffer.get()); // 97
buffer.mark(); // 在下标为1的位置做个标记
System.out.println(buffer.get()); // 98
System.out.println(buffer.get()); // 99
buffer.reset(); // 返回做标记的位置
System.out.println(buffer.get()); // 98
String和ByteBuffer转换
1
2
3
4
// 直接转为buffer
ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("hello");
// 转为String
System.out.println(StandardCharsets.UTF_8.decode(buffer1).toString());

粘包、半包

粘包

  1. 传输换行的数据时,多行数据在一行传输
  2. 是在效率上考虑,多条一起发效率高

半包

  1. 一行数据被拆成多行显示
  2. 因为缓冲区大小导致数据被拆
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static void main(String[] args) {
// 模拟网络传输数据
ByteBuffer source = ByteBuffer.allocate(32);
source.put("Hello,world\nI'm zhangsan\nHo".getBytes());
split(source);
source.put("w are you?\n".getBytes());
split(source);
}

private static void split(ByteBuffer source) {
// 将source中的内容按'\n'分割来读取
source.flip();
for (int i = 0; i < source.limit(); i++) {
if (source.get(i) == '\n') {
int length = i - source.position() + 1;
ByteBuffer target = ByteBuffer.allocate(length);
for (int j = 0; j < length; j++) {
target.put(source.get());
}
target.flip();
System.out.println(StandardCharsets.US_ASCII.decode(target));
}
}
// 改为写模式,保留未读取的数据
source.compact();
}

文件编程

FileChannel

FileChannel只能工作在阻塞模式下,不能使用selector

获取方式
  1. FileInputStream:获取的channel只能读
  2. FileOutputStream:获取的channel只能写
  3. RandomAccessFile:根据构造RandomAccessFile时的读写模式决定
读取
1
int readBytes = channel.read(buffer);
写入
1
2
3
4
5
6
7
ByteBuffer buffer = ...;
buffer.put(...);
buffer.flip(); // 切换读模式

while (buffer.hasRemaining()) {
channel.write(buffer);
}
关闭
1
channel.close();
强制写入

​ 为了减少io操作,写的数据不会立即写到磁盘中。可以调动force(true)立即写入磁盘

传输数据到另一个文件
1
2
3
4
5
6
7
8
9
10
11
12
try (
FileChannel from = new FileInputStream("data.txt").getChannel();
FileChannel to = new FileOutputStream("to.txt").getChannel();
) {
// 把文件data.txt的内容写到to.txt中,每次最高传输2G数据
// 采用的是零拷贝,效率高
long size = from.size();
for (long left = size; left > 0;)
left -= from.transferTo(size - left, left, to);
} catch (IOException e) {
e.printStackTrace();
}

Path & Files

  • Path用于表示文件路径

  • Paths是工具类,用于获取Path实例

1
Path source = Paths.get("1.txt");
  • Files是操作文件的工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 判断文件是否存在
Path path = Path.get("data.txt");
System.out.println(Files.exists(path));

// 创建一级目录,已存在则抛异常
Path path = Path.get("hello/d1");
Files.createDirectory(path);

// 创建多级目录
Path path = Path.get("hello/d1/d2");
Files.createDirectorys(path);

// 拷贝文件
Path source = Path.get("helloword/data.txt");
Path target = Path.get("helloword/target.txt");
Files.copy(source, target);

// 遍历目录
Files.walkFileTree(Paths.get("F:\\aaa"), new SimpleFileVisitor<Path>(){
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
System.out.println(dir);
return super.preVisitDirectory(dir, attrs);
}

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
System.out.println(file);
return super.visitFile(file, attrs);
}
});

。。。

:star:网络编程

阻塞&非阻塞

单线程模式下

阻塞模式

单线程不能很好的处理多个连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/*
阻塞模式
*/
public class Server {
public static void main(String[] args) throws IOException {

// 1.创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
// 2.绑定监听端口
ssc.bind(new InetSocketAddress((8080)));
// 3.建立与客户端的连接
List<SocketChannel> socketChannels = new ArrayList<>();
ByteBuffer buffer = ByteBuffer.allocate(16);
while (true) {
SocketChannel sc = ssc.accept(); // 会阻塞等待客户端连接
socketChannels.add(sc);
// 4.接收数据
for (SocketChannel channel : socketChannels) {
channel.read(buffer); // 会阻塞等待客户端发消息
buffer.flip();
while (buffer.hasRemaining()) {
System.out.print((char) buffer.get());
}
System.out.println();
buffer.clear();
}
}
}
}
1
2
3
4
5
6
7
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
sc.write(StandardCharsets.UTF_8.encode("asd"));
}
}

非阻塞模式

1
2
3
4
5
6
7
8
9
10
// 设置服务器端为非阻塞模式
ssc.configureBlocking(false);
// 影响的是accept,阻塞模式下会停下来等待客户端的连接
// 非阻塞模式下没有连接会返回null
SocketChannel sc = ssc.accept();

// 设置客户端连接为非阻塞
sc.configureBlocking(false);
// 影响的是read,不会停下来等客户端发送消息
channel.read(buffer);
Selector

多路复用:单线程配合Selector完成对多个Channel可读写事件的监控

事件的种类
  1. accept:服务端在有连接请求时触发
  2. connect:客户端连接建立后触发
  3. read:客户端发送来消息、客户端正常/异常关闭时触发,发送数据超过缓冲区会触发多次
  4. write:可写事件
处理连接和读事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class Server {
public static void main(String[] args) throws IOException {
// selector管理channel
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);

// 注册到selector上
// 通过SelectionKey可以知道是哪个channel发生了哪个事件
// 关注accept事件
ssc.register(selector, SelectionKey.OP_ACCEPT, null);

ssc.bind(new InetSocketAddress((8080)));
while (true) {
// 没有发生事件就阻塞
selector.select();

// 处理事件,selector.selectedKeys()获取所有发生的事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isAcceptable()) {
// 处理连接事件
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();// 不处理(accept或cancel)key会重新加到keys中

// 把客户端Channel注册到selector上
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ, null);
} else if (key.isReadable()) {
// 处理读事件
try {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(16);
int read = channel.read(buffer); // 正常断开返回-1
if (read == -1)
key.cancel();
buffer.flip();
System.out.println(StandardCharsets.UTF_8.decode(buffer));
} catch (IOException e) {
System.err.println("客户端异常断开");
key.cancel(); // 没有消费掉key(read),需要手动取消
}
}

// selector检测到事件后把key加到keys中不会主动删除
// 需要手动删除
iterator.remove();
}
}
}
}
处理写事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class WriteServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);

Selector selector = Selector.open();

ssc.register(selector, SelectionKey.OP_ACCEPT, null);
ssc.bind(new InetSocketAddress(8080));

while (true) {
selector.select();

Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, 0, null);

// 发送数据
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 30000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
sc.write(buffer);

if (buffer.hasRemaining()) {
// 关注写事件
scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
scKey.attach(buffer);
}
} else if (key.isWritable()) {
ByteBuffer buffer =(ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();

int write = sc.write(buffer);
System.out.println(write);
// 释放读完的buffer,不再关注写事件
if (!buffer.hasRemaining()) {
key.attach(null);
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
}
}
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class WriteClient {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));

int count = 0;
while (true) {
ByteBuffer buffer = ByteBuffer.allocate(1026 * 1024);
count += sc.read(buffer);
System.out.println(count);
buffer.clear();
}
}
}
消息边界问题

utf-8编码下汉字占三个字节,如果没有读全就会产生乱码

处理方式

  1. 客户端和服务器端约定固定长度的消息格式。缺点:浪费空间
  2. 在消息中加分隔符,根据分隔符之间的消息长度分配buffer。缺点:慢
  3. 带头结点,头结点里有后续内容的长度信息,根据长度分配不同长度的buffer。
    • TLV格式:Type Length Value
    • Http1.1是TLV格式
    • Http2.0是LTV格式
多线程优化

好处:

  1. 充分利用多核cpu资源
  2. 和耗时较长的任务并行执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
// 接收客户端连接的线程
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
ssc.register(boss, SelectionKey.OP_ACCEPT, null);
ssc.bind(new InetSocketAddress(8080));
// 创建worker线程池
Worker[] workers = new Worker[2];
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker("worker-" + i);
}

int index = 0;
while (true) {
boss.select();
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
workers[index].register(sc);
index = (index + 1) % workers.length;
}
}
}
}

static class Worker implements Runnable{
// 处理任务的线程
private Thread thread;
private Selector selector;
private String name;
private volatile boolean start = false;
// 消息队列
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();

public Worker(String name) {
this.name = name;
}

// 初始化线程和selector
public void register(SocketChannel sc) throws IOException {
if (!start) {
selector = Selector.open();
thread = new Thread(this, name);
thread.start();
start = true;
}
queue.offer(() -> {
try {
sc.register(selector, SelectionKey.OP_READ, null);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
// selector在select()上阻塞着
// 需要唤醒selector
selector.wakeup();
}

@Override
public void run() {
while (true) {
try {
selector.select();
Runnable task = queue.poll();
if (task != null) {
// 执行sc注册到selector
// 必须要在selector执行select()之后注册
task.run();
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel sc = (SocketChannel) key.channel();
int read = sc.read(buffer);
buffer.flip();
System.out.println(this.name + " : " + Charset.defaultCharset().decode(buffer));
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
NIO和BIO对比
stream 和 channel
  • 不同:stream不会自动缓冲数据,channel会利用系统提供的发送缓冲区、接收缓冲区
  • stream仅支持阻塞API,channel同时支持阻塞、非阻塞API,网络channel可配合selector实现多路复用
  • 相同:均为全双工(读写可同时进行)

零拷贝

零拷贝指的是不需要把数据拷贝到jvm中

不使用零拷贝数据中共复制了4次,切换用户态/内核态3次

使用DirectBuffer让用户缓冲区和内核缓冲区共用一块内存,减少一次拷贝

直接从内核缓冲区向客户端的soket缓冲区发数据,减少一次内核切换

零拷贝的有点有:

  • 更少的用户态与内核态的切换
  • 不利用cpu计算(DMA),减少cpu缓存伪共享
  • 零拷贝适合小文件传输

Netty

Netty是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端

入门

为何要异步

  1. 单线程没法异步提高效率,必须配合多线程、多核cpu才能发挥异步的优势
  2. 异步并没有缩短响应时间,反而有所增加,异步提高的是单位时间内的吞吐量
  3. 合理进行任务拆分,也是利用异步的关键

Hello World

  1. 导包

    1
    2
    3
    4
    5
    <dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.59.Final</version>
    </dependency>
  2. 编写服务器端

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    public class HelloServer {
    public static void main(String[] args) {
    // 启动器:组装netty组件
    new ServerBootstrap()
    // 组里有负责可连接事件的loop,和负责处理可读事件的loop(selector, thread)
    .group(new NioEventLoopGroup())

    // 选择服务器端channel的实现
    .channel(NioServerSocketChannel.class)

    // 决定了worker(child)能执行哪些操作
    .childHandler(new ChannelInitializer<NioSocketChannel>() {

    // 给客户端channel添加handler
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
    // 解码handler ByteBuf --> 字符串
    ch.pipeline().addLast(new StringDecoder());
    // 自定义handler
    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    System.out.println(msg);
    }
    });
    }
    })
    .bind(8080);
    }
    }
  3. 编写客户端

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public class HelloClient {
    public static void main(String[] args) throws InterruptedException {
    // 客户端启动器
    new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
    // 编码handler
    ch.pipeline().addLast(new StringEncoder());
    }
    })
    // 异步连接服务器
    .connect(new InetSocketAddress("localhost", 8080))
    // 阻塞方法,连接建立后往下执行
    .sync()
    // 客户端和服务器端的channel
    .channel()
    // 发送数据
    .writeAndFlush("Hello World");
    }
    }

组件

EventLoop

​ EventLoop本质是一个单线程执行器(selector + thread),处理channel上发生的事件

​ EventLoopGroup是一组EventLoop,Channel会绑定其中一个EventLoop,后续这个Channel上的io事件都由此EventLoop来处理(保证io事件处理时的线程安全)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class TestEventLoop {
public static void main(String[] args) {
// 创建事件循环组,可以指定线程数
// NioEventLoopGroup的实现可以执行io,普通,定时任务
EventLoopGroup group = new NioEventLoopGroup(2);
// 获取EventLoop对象
System.out.println(group.next());
// 执行普通任务
group.next().submit(() -> {
System.out.println("task");
});
// 执行定时任务
group.next().scheduleAtFixedRate(() -> {
System.out.println("schedule:" + LocalTime.now());
}, 1, 1, TimeUnit.SECONDS);
}
}

​ 分工:

  • group(EventLoopGroup, EventLoopGroup):可以指定两个group来分工,前一个为监听accpet事件,后一个为处理读写事件
  • addLast(EventExecutorGroup, String, ChannelHandler):指定EventLoopGroup执行这个处理器;指定处理器名;处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class EventLoopServer {
public static void main(String[] args) {
DefaultEventLoopGroup group = new DefaultEventLoopGroup();

new ServerBootstrap()
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString(Charset.defaultCharset()));
// 传给下一个handler
ctx.fireChannelRead(msg);
}
}).addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
// 前后两个handler执行的线程不同时,由上一个线程发一个Runnable给下一个线程来执行
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
}
}
Channel

读写数据的双向通道

主要方法:

  • close()
  • closeFuture():关闭channel后调用的方法
  • pipeline():添加处理器
  • write():写入数据到缓冲区
  • writeAndFlush():写入数据并刷出

处理连接结果

connect()是非阻塞方法,可以通过sync()当前线程阻塞等待连接建立,或者addListener()添加listener让其他线程等待连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException, IOException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));

channelFuture.addListener(new ChannelFutureListener() {
// 完成连接后会调用operationComplete
@Override
public void operationComplete(ChannelFuture future) throws Exception {
future.channel().writeAndFlush("www");
}
});
}
}

处理关闭

调用channel.close()关闭channel后,可以用ChannelFuture future = channel.closeFuture();来进行善后处理

sync()同步处理,addListener()异步处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 前略
Channel channel = channelFuture.channel();
new Thread(() -> {
Scanner sc = new Scanner(System.in);
while (true) {
System.out.print(": ");
String msg = sc.nextLine();
if ("q".equals(msg)) {
channel.close();
break;
}
channel.writeAndFlush(msg);
}
}, "input").start();

ChannelFuture future = channel.closeFuture();
future.sync();
System.out.println("善后工作");
Future & Promise

继承关系:jdk的Future <– netty的Future <– Promise

  • jdk的Future只能同步等待任务结束
  • netty的Future可以同步或异步等待任务结束
  • Promise不仅有Future的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class TestJdkFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(2);
Future<Integer> future = service.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(1000);
return 1;
}
});
// 同步等待
Integer i = future.get();
System.out.println(i);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class TestNettyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup(2);
EventLoop eventLoop = group.next();
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(1000);
return 2;
}
});
// 异步等待结果
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
System.out.println(Thread.currentThread() + ":" + future.get());
}
});
System.out.println(Thread.currentThread());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class TestPromise {
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup(2);
DefaultPromise<Integer> promise = new DefaultPromise<>(group.next());
new Thread(() -> {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
// 往promise里装结果
promise.setSuccess(1);
}).start();

System.out.println(promise.get());
}
}
Handler & Pipeline

Pipeline类似流水线,handler为流水线上的一道道工序

handler分为入站、出站两种

  • 入站处理器通常是ChannelInBoundHandlerAdapter的子类,读取客户端数据时触发,也可以向客户端写数据
  • 出站处理器通常是ChannelOutBoundHandlerAdapter的子类,向客户端写回数据时触发,出站处理器的执行顺序适合入站处理器相反的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class TestPipeline {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 加在 head -> ... -> h1 -> tail
pipeline.addLast("h1", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(1);
super.channelRead(ctx, msg);
}
});
// 出站处理器在往channel中写数据时触发
// 出站的处理顺序和入站时相反
pipeline.addLast("h2", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println(2);
super.write(ctx, msg, promise);
}
});
}
})
.bind(8080);
}
}
ByteBuf

是对字节数据的封装

优势
  • 池化
  • 读写指针分离
  • 可以自动扩容
  • 支持链式调用
  • 支持浅拷贝
自动扩容

写入后数据大小未超过512,则选择下一个16的整数倍扩容

写入后数据大小超过512,则选择下一个2^n扩容

1
2
3
4
5
6
7
8
9
10
11
12
13
public class TestByteBuf {
public static void main(String[] args) {
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
System.out.println(buf); // 起始256字节容量

StringBuilder sb = new StringBuilder();
for (int i = 0; i < 300; i++) {
sb.append("a");
}
buf.writeBytes(sb.toString().getBytes());
System.out.println(buf); // 扩容后512字节容量
}
}
选择使用的内存

使用堆内存,写快读慢

1
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.heapBuffer(10);

使用直接内存,写慢读快

1
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.directBuffer(10);
池化

通过池来重用buf,4.1后默认开启

回收

根据实现方式不同,内存回收方式也不同

  • UnpooledHeapByteBuf:使用的是JVM内存,只需GC内存回收即可
  • UnpooledDirectByteBuf:使用直接内存,需要特殊方法回收
  • PooledByteBuf:池化,不用时归还回池中,需要更复杂的规则来回收内存

每个ByteBuf都实现ReferenceCounted接口,通过引用计数方式来判断是否需要回收

如果没释放Buf,在处理器链的尾/头会被释放

切片

将一块ByteBuf切成逻辑上的两块,实际上没有发生数据的拷贝

切片是不能扩容的

原buf释放内存会影响切片,可以通过retain()方法来计数加1从而不会释放内存

1
2
3
4
5
6
7
8
9
10
11
public class TestByteBuf {
public static void main(String[] args) {
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
buf.writeBytes(new byte[]{'a', 'b', 'c', 'd'});

ByteBuf f1 = buf.slice(0, 2);
ByteBuf f2 = buf.slice(2, 4);
System.out.println(f1);
System.out.println(f2);
}
}
合并

将多块ByteBuf合并成逻辑上的一块,实际上没有发生数据的拷贝

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class TestByteBuf {
public static void main(String[] args) {
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(10);
buf1.writeBytes(new byte[]{'a', 'b', 'c', 'd'});

ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(10);
buf2.writeBytes(new byte[]{'a', 'b', 'c', 'd'});

CompositeByteBuf buffer = ByteBufAllocator.DEFAULT.compositeBuffer();
buffer.addComponent(true, buf1);
buffer.addComponent(true, buf2);

System.out.println(buffer);
}
}

进阶

粘包 半包

原因:用TCP协议传输数据,由于滑动窗口(缓冲区)传送的时候可能把一段数据分成了几份,导致接收方收到的数据不全(半包),或几条数据一起发过来(粘包);ByteBuf大小也类似

解决方式

  1. 定长解码器:ch.pipline().addLast(new FixedLengthFrameDecode(长度))规定每条消息的长度固定
    • 缺点:可能造成带宽的浪费
  2. 分隔符:ch.pipline().addLast(new LineBaseFrameDecoder(限定最大长度)),以换行符为分隔符
    • 效率低
  3. LTC解码器:类似报头,如包含内容长度信息等

协议

实例:处理http的入站信息和返回信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class TestHttp {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();

try {
ChannelFuture channelFuture = new ServerBootstrap()
.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// http协议编码解码处理器
ch.pipeline().addLast(new HttpServerCodec());
// 指定处理HttpRequest的信息
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
System.out.println(msg.uri());

DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.getProtocolVersion(), HttpResponseStatus.OK);
byte[] bytes = "<h1>Hello</h1>".getBytes();
response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, bytes.length);
response.content().writeBytes(bytes);
ctx.writeAndFlush(response);
}
});
}
})
.bind(8080)
.sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
自定义协议
  1. 要素

  1. 消息类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    @Data
    public abstract class Message implements Serializable {

    /**
    * 根据消息类型字节,获得对应的消息 class
    * @param messageType 消息类型字节
    * @return 消息 class
    */
    public static Class<? extends Message> getMessageClass(int messageType) {
    return messageClasses.get(messageType);
    }

    // 请求序列号
    private int sequenceId;

    // 消息类型
    private int messageType;

    public abstract int getMessageType();

    // 指令类型
    public static final int LoginRequestMessage = 0;
    public static final int LoginResponseMessage = 1;
    public static final int ChatRequestMessage = 2;
    public static final int ChatResponseMessage = 3;
    public static final int GroupCreateRequestMessage = 4;
    public static final int GroupCreateResponseMessage = 5;
    public static final int GroupJoinRequestMessage = 6;
    public static final int GroupJoinResponseMessage = 7;
    public static final int GroupQuitRequestMessage = 8;
    public static final int GroupQuitResponseMessage = 9;
    public static final int GroupChatRequestMessage = 10;
    public static final int GroupChatResponseMessage = 11;
    public static final int GroupMembersRequestMessage = 12;
    public static final int GroupMembersResponseMessage = 13;
    public static final int PingMessage = 14;
    public static final int PongMessage = 15;
    /**
    * 请求类型 byte 值
    */
    public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
    /**
    * 响应类型 byte 值
    */
    public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;

    private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();

    // 抽象类的实现类
    static {
    messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);
    messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);
    messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);
    messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);
    messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);
    messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);
    messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);
    messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);
    messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);
    messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);
    messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);
    messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);
    messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);
    messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);
    messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
    messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
    }

    }
  2. 编解码器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    public class MessageCodec extends ByteToMessageCodec<Message> {
    /**
    * 编码成自定义的协议格式
    * @param channelHandlerContext
    * @param msg
    * @param out
    * @throws Exception
    */
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Message msg, ByteBuf out) throws Exception {
    // 魔数 5字节
    out.writeBytes(new byte[]{1, 1, 4, 5, 1});
    // 版本号 1字节
    out.writeByte(1);
    // 序列化方式,指定 0 jdk,1 json 1字节
    out.writeByte(0);
    // 指令类型 1字节
    out.writeByte(msg.getMessageType());
    // 请求序号,提供异步能力 4字节
    out.writeInt(msg.getSequenceId());
    // 获取正文
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    ObjectOutputStream oos = new ObjectOutputStream(bos);
    oos.writeObject(msg);
    byte[] bytes = bos.toByteArray();
    // 正文长度 4字节
    out.writeInt(bytes.length);
    // 正文
    out.writeBytes(bytes);
    }

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
    // 读取魔数
    byte[] magic = new byte[5];
    in.readBytes(magic);
    // 读取版本
    byte version = in.readByte();
    // 读取序列化方式
    byte serializerType = in.readByte();
    // 读取指令类型
    byte messageType = in.readByte();
    // 读取序列号
    int sequenceId = in.readInt();
    // 读取正文长度
    int length = in.readInt();
    // 读取正文
    byte[] bytes = new byte[length];
    in.readBytes(bytes, 0, length);

    // 反序列化成Message
    ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
    Message message = (Message) ois.readObject();

    System.out.println(Arrays.toString(magic));
    System.out.println(version);
    System.out.println(serializerType);
    System.out.println(messageType);
    System.out.println(sequenceId);
    System.out.println(length);
    System.out.println(message);

    // 传给下一个handler
    out.add(message);
    }
    }
  3. 测试

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public class Main {

    public static void main(String[] args) throws Exception {
    EmbeddedChannel channel = new EmbeddedChannel(
    new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0), // 解决粘包半包
    new MessageCodec());

    // 模拟发送消息
    LoginRequestMessage message = new LoginRequestMessage("zs", "123456");
    channel.writeOutbound(message);

    // 模拟接收
    ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
    new MessageCodec().encode(null, message, buf); // 编码后写到buf
    channel.writeInbound(buf); // 入站
    }
    }
    /*
    输出
    [1, 1, 4, 5, 1]
    1
    0
    0
    0
    193
    LoginRequestMessage(super=Message(sequenceId=0, messageType=0), username=zs, password=123456)
    */

Netty
http://xwww12.github.io/2023/02/26/网络/Netty/
作者
xw
发布于
2023年2月26日
许可协议