Netty-自己

Netty

架构

img

img

BUG

(4条消息) 一文整理常见Java后端面试题系列——Netty篇(2022最新版)_java面试netty_程序猿周周的博客-CSDN博客

如何解决 JDK epoll 空轮询问题?
这个 BUG 是指 Java 的 NIO 在 Linux 下进行 selector.select() 时,本来如果轮询的结果为空并且不调用 wakeup 方法的话,这个 selector.select() 应该是一直阻塞的,但是 Java 却会打破阻塞,继续执行,导致程序无限空转,造成 CPU 使用率 100%。(这个问题只存在 Linux 是因为 Linux 的 NIO 是基于 epoll 实现的,而 Java 实现的 epoll 存在 BUG,windows 下 NIO 基于 poll 就不存在此问题)

Netty 的解决方案:

为 Selector 的 select 操作设置超时时间,同时定义可以跳出阻塞的四种情况

有事件发生
wakeup
超时
空轮询 BUG
而前两种返回值不为 0,可以跳出循环,超时有时间戳记录,所以每次空轮询,有专门的计数器进行 +1,如果空轮询的次数超过了 512 次(默认),就认为其触发了空轮询 BUG。

当触发 BUG 后,Netty 直接重建一个 Selector,将原来的 Channel 重新注册到新的 Selector 上,并将旧的 Selector 关掉。

流程一:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Slf4j
public class ChannelDemo6 {
public static void main(String[] args) {
try (ServerSocketChannel channel = ServerSocketChannel.open()) {
channel.bind(new InetSocketAddress(8080));
System.out.println(channel);
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
int count = selector.select();
// int count = selector.selectNow();
log.debug("select count: {}", count);
// if(count <= 0) {
// continue;
// }

// 获取所有事件
Set<SelectionKey> keys = selector.selectedKeys();

// 遍历所有事件,逐一处理
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 判断事件类型
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 必须处理
SocketChannel sc = c.accept();
log.debug("{}", sc);
}
// 处理完毕,必须将事件移除
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

Selector

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public abstract class Selector implements Closeable {

protected Selector() { }

public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}

public abstract boolean isOpen();

public abstract SelectorProvider provider();

public abstract Set<SelectionKey> keys();

public abstract Set<SelectionKey> selectedKeys();

public abstract int selectNow() throws IOException;

public abstract int select(long timeout)
throws IOException;

public abstract int select() throws IOException;

public abstract Selector wakeup();

public abstract void close() throws IOException;

}

SelectImpl

方法的流程具体实现

WindowsSelectImpl

调用系统的本地方法实现,select - poll - epoll的功能

SelectSelectionKeySetSelector

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package io.netty.channel.nio;

// 位于一个netty自己的实现类(selector)
final class SelectedSelectionKeySetSelector extends Selector {
维护selectKeys集合
private final SelectedSelectionKeySet selectionKeys;
private final Selector delegate;
}
package io.netty.channel.nio;
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
SelectionKey[] keys = new SelectionKey[1024];
int size;
}

public abstract class SelectionKey {

public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;
/**
* Returns the selector for which this key was created. This method will
* continue to return the selector even after the key is cancelled.
*
* @return This key's selector
*/
public abstract Selector selector();
}

image-20230412170507846

Channel

1
2
3
4
5
6
public interface Channel extends Closeable {

public boolean isOpen();
public void close() throws IOException;

}

image-20230412171619112

image-20230412171955676

SocketChannle

image-20230412171744654

ServerSocketChannel

image-20230412171815095

流程二:

(10条消息) netty全过程图解(最详细清晰版)_netty工作流程_”PANDA的博客-CSDN博客

img

Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class Server {
public static void main(String[] args) throws Exception {
//创建两个线程组 boosGroup、workerGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建服务端的启动对象,设置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//设置两个线程组boosGroup和workerGroup
bootstrap.group(bossGroup, workerGroup)
//设置服务端通道实现类型
.channel(NioServerSocketChannel.class)
//设置线程队列得到连接个数
.option(ChannelOption.SO_BACKLOG, 128)
//设置保持活动连接状态
.childOption(ChannelOption.SO_KEEPALIVE, true)
//使用匿名内部类的形式初始化通道对象
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//给pipeline管道设置处理器
socketChannel.pipeline().addLast(new ServerHandler());
}
});//给workerGroup的EventLoop对应的管道设置处理器
System.out.println("java技术爱好者的服务端已经准备就绪...");
//绑定端口号,启动服务端
ChannelFuture channelFuture = bootstrap.bind(6666).sync();
//对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Client {
public static void main(String[] args) throws Exception {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
//创建bootstrap对象,配置参数
Bootstrap bootstrap = new Bootstrap();
//设置线程组
bootstrap.group(eventExecutors)
//设置客户端的通道实现类型
.channel(NioSocketChannel.class)
//使用匿名内部类初始化通道
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//添加客户端通道的处理器
ch.pipeline().addLast(new ClientHandler());
}
});
System.out.println("客户端准备就绪,随时可以起飞~");
//连接服务端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
//对通道关闭进行监听
channelFuture.channel().closeFuture().sync();
} finally {
//关闭线程组
eventExecutors.shutdownGracefully();
}
}
}

EventLoopGroup

image-20230329211714507

image-20230329211634741

NioEventLoop

== select + TaskQueue

1
2
3
4
5
6
7
8
9
10
11
private Selector selector;
private Selector unwrappedSelector;
// SelectKey
private SelectedSelectionKeySet selectedKeys;
// ServerSocketChannel##open() 调用执行 SelectorProvider##openServerSocketChannel()
// Selector##open() 调用执行 SelectorProvider##openSelector() -》WindowsSelectorProvider##openSelector()
private final SelectorProvider provider;
private static final long AWAKE = -1L;
private static final long NONE = 9223372036854775807L;
private final AtomicLong nextWakeupNanos = new AtomicLong(-1L);
private final SelectStrategy selectStrategy;

image-20230330224533777

启动流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//1 netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector
Selector selector = Selector.open();

//2 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config
NioServerSocketChannel attachment = new NioServerSocketChannel();

//3 创建 NioServerSocketChannel 时,创建了 java 原生的 ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);

//4 启动 nio boss 线程执行接下来的操作

//5 注册(仅关联 selector 和 NioServerSocketChannel),未关注事件
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);

//6 head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor

//7 绑定端口
serverSocketChannel.bind(new InetSocketAddress(8080));

//8 触发 channel active 事件,在 head 中关注 op_accept 事件
selectionKey.interestOps(SelectionKey.OP_ACCEPT);

Selector

TaskQueue

Channel

NioSocketChannel: 异步非阻塞的客户端 TCP Socket 连接。

NioServerSocketChannel: 异步非阻塞的服务器端 TCP Socket 连接。

常用的就是这两个通道类型,因为是异步非阻塞的。所以是首选。

OioSocketChannel: 同步阻塞的客户端 TCP Socket 连接。

OioServerSocketChannel: 同步阻塞的服务器端 TCP Socket 连接。

稍微在本地调试过,用起来和Nio有一些不同,是阻塞的,所以API调用也不一样。因为是阻塞的IO,几乎没什么人会选择使用Oio,所以也很难找到例子。我稍微琢磨了一下,经过几次报错之后,总算调通了。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
//server端代码,跟上面几乎一样,只需改三个地方
//这个地方使用的是OioEventLoopGroup
EventLoopGroup bossGroup = new OioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup)//只需要设置一个线程组boosGroup
.channel(OioServerSocketChannel.class)//设置服务端通道实现类型

//client端代码,只需改两个地方
//使用的是OioEventLoopGroup
EventLoopGroup eventExecutors = new OioEventLoopGroup();
//通道类型设置为OioSocketChannel
bootstrap.group(eventExecutors)//设置线程组
.channel(OioSocketChannel.class)//设置客户端的通道实现类型

NioSctpChannel: 异步的客户端 Sctp(Stream Control Transmission Protocol,流控制传输协议)连接。

NioSctpServerChannel: 异步的 Sctp 服务器端连接。

本地没启动成功,网上看了一些网友的评论,说是只能在linux环境下才可以启动。从报错信息看:SCTP not supported on this platform,不支持这个平台。因为我电脑是window系统,所以网友说的有点道理。

ChannelOption

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
下面简单的总结一下ChannelOption的含义已及使用的场景
  1、ChannelOption.SO_BACKLOG
  
    ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数,函数listen(int socketfd,int backlog)用来初始化服务端可连接队列,
    服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小
    
  2、ChannelOption.SO_REUSEADDR
  
    ChanneOption.SO_REUSEADDR对应于套接字选项中的SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口,
    比如,某个服务器进程占用了TCP的80端口进行监听,此时再次监听该端口就会返回错误,使用该参数就可以解决问题,该参数允许共用该端口,这个在服务器程序中比较常使用,
    比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置SO_REUSEADDR就无法正常使用该端口。
    
  3、ChannelOption.SO_KEEPALIVE

    Channeloption.SO_KEEPALIVE参数对应于套接字选项中的SO_KEEPALIVE,该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。

  4、ChannelOption.SO_SNDBUF和ChannelOption.SO_RCVBUF

    ChannelOption.SO_SNDBUF参数对应于套接字选项中的SO_SNDBUF,ChannelOption.SO_RCVBUF参数对应于套接字选项中的SO_RCVBUF这两个参数用于操作接收缓冲区和发送缓冲区的大小,接收缓冲区用于保存网络协议站内收到的数据,直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成功。

  5、ChannelOption.SO_LINGER

    ChannelOption.SO_LINGER参数对应于套接字选项中的SO_LINGER,Linux内核默认的处理方式是当用户调用close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证会发生剩余的数据,造成了数据的不确定性,使用SO_LINGER可以阻塞close()的调用时间,直到数据完全发送

  6、ChannelOption.TCP_NODELAY

    ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关,Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到了,组装成大的数据包进行发送,虽然该方式有效提高网络的有效负载,但是却造成了延时,而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输,于TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送
数据,适用于文件传输。

ChannelPipeline

image-20230330230021915

1
2
3
4
5
6
7
8
9
10
11
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
private final Channel channel;
private final ChannelFuture succeededFuture;
private final VoidChannelPromise voidPromise;
private final boolean touch = ResourceLeakDetector.isEnabled();
private Map<EventExecutorGroup, EventExecutor> childExecutors;
private volatile Handle estimatorHandle;
private boolean firstRegistration = true;
private DefaultChannelPipeline.PendingHandlerCallback pendingHandlerCallbackHead;
private boolean registered;

context

img

DefaultChannelHandlerContext

image-20230330230741880

HeadContextTailContext

image-20230330230611517

image-20230330230906964

image-20230330230722582

Handler

img

处理器Handler主要分为两种:

ChannelInboundHandlerAdapter(入站处理器)、ChannelOutboundHandler(出站处理器)

入站指的是数据从底层java NIO Channel到Netty的Channel。

出站指的是通过Netty的Channel来操作底层的java NIO Channel。

ChannelInboundHandlerAdapter处理器常用的事件有

  1. 注册事件 fireChannelRegistered。
  2. 连接建立事件 fireChannelActive。
  3. 读事件和读完成事件 fireChannelRead、fireChannelReadComplete。
  4. 异常通知事件 fireExceptionCaught。
  5. 用户自定义事件 fireUserEventTriggered。
  6. Channel 可写状态变化事件 fireChannelWritabilityChanged。
  7. 连接关闭事件 fireChannelInactive。

ChannelOutboundHandler处理器常用的事件有

  1. 端口绑定 bind。
  2. 连接服务端 connect。
  3. 写事件 write。
  4. 刷新时间 flush。
  5. 读事件 read。
  6. 主动断开连接 disconnect。
  7. 关闭 channel 事件 close。

还有一个类似的handler(),主要用于装配parent通道,也就是bossGroup线程。一般情况下,都用不上这个方法。

ChannelOutboundHandlerAdpater

image-20230330231418573

image-20230330231521492

ChannelInboundHandlerAdpater

image-20230330231434637

image-20230330231539310

Read()

Write()

5种IO模型

大白话详解5种网络IO模型 (qq.com)

1
2
3
4
5
6
7
8
9
10
关键指令:
等待数据 - 数据内核到用户进程, 内核 - 用户进程

阻塞IO:Recvfrom
非阻塞IO:Ewouldb lock,Recvfrom
IO多路复用:select()、poll(链表)、epoll(红黑树) -- op_connect, op_accept, op_read, op_write,Recvfrom
信号量:SIG IO,Recvfrom
AIO:aio_read

本质上:Recvfrom都是阻塞

阻塞IO

图片

非阻塞IO

图片

IO多路复用

图片

信号量机制

图片

AIO异步非阻塞

图片

Reactor和Proactor

(5 封私信) 如何深刻理解Reactor和Proactor? - 知乎 (zhihu.com)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
NIO模型
同步非阻塞
NIO有同步阻塞和同步非阻塞两种模式,一般讲的是同步非阻塞,服务器实现模式为一个请求一个线程,但客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理。
AIO模型
异步非阻塞
服务器实现模式为一个有效请求一个线程,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理,
注:AIO又称为NIO2.0,在JDK7才开始支持。

为什么Netty使用NIO而不是AIO?

Netty不看重Windows上的使用,在Linux系统上,AIO的底层实现仍使用EPOLL,没有很好实现AIO,因此在性能上没有明显的优势,而且被JDK封装了一层不容易深度优化
Netty整体架构是reactor模型, 而AIO是proactor模型, 混合在一起会非常混乱,把AIO也改造成reactor模型看起来是把epoll绕个弯又绕回来
AIO还有个缺点是接收数据需要预先分配缓存, 而不是NIO那种需要接收时才需要分配缓存, 所以对连接数量非常大但流量小的情况, 内存浪费很多
Linux上AIO不够成熟,处理回调结果速度跟不到处理需求,比如外卖员太少,顾客太多,供不应求,造成处理速度有瓶颈(待验证)

阻塞IO:数据处理、数据拷贝都会阻塞

非阻塞IO :数据处理轮询,数据拷贝阻塞

异步IO:数据处理和数据拷贝 都不阻塞

EventLoopGroup

我们先看一下EventLoopGroup的类图:

img

其中包括了常用的实现类NioEventLoopGroup。OioEventLoopGroup在前面的例子中也有使用过。

从Netty的架构图中,可以知道服务器是需要两个线程组进行配合工作的,而这个线程组的接口就是EventLoopGroup。

每个EventLoopGroup里包括一个或多个EventLoop,每个EventLoop中维护一个Selector实例。


Netty-自己
http://example.com/2023/06/01/分布式组件+常见组件/Netty/
作者
where
发布于
2023年6月1日
许可协议