Netty源码系列 之 ChannelPipeline IO处理回顾 源码

目录

ChannelPipeline【包含AbstractUnsafe.write的源码流程,比之前更加深化了,必看】

ChannelPipeline概念回顾

ChannelPipeline的创建

Inbound(输入Handler)所对应的事件传播

Outbound(输出Handler)所对应的事件传播【包含AbstractUnsafe.write的源码流程,比之前更加深化了,必看】


ChannelPipeline【包含AbstractUnsafe.write的源码流程,比之前更加深化了,必看】

在细致剖析ChannelPipeline之前,我们拿pipeline管道add的一个最常见的Handler:帧解码器类LineBasedFrameDecoder,来进行debug源码展示其流程,进而为后续清晰描绘ChannelPipeline做铺垫

所有Handler,无论是是自定义的Handler还是netty原生的Handler,都是通过ChannelPipeline.addLast进行添加的,LineBasedFrameDecoder也不例外。

pipeline.addLast(new LineBasedFrameDecoder(1024));

以如下案例进行debug源码演示其流程

  • 以下测试案例以及源码过程演示的是消息数据出现粘包现象时,帧解码器LineBasedFrameDecoder的处理
package com.messi.netty_source_03.Test06;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;public class MyNettyClient1 {private static final Logger log = LoggerFactory.getLogger(MyNettyClient1.class);public static void main(String[] args) throws InterruptedException {log.debug("myNettyClientStarter------");EventLoopGroup eventLoopGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);Bootstrap group = bootstrap.group(eventLoopGroup);//32 ---> 1 IO操作 31线程bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new StringEncoder());}});Channel channel = bootstrap.connect(new InetSocketAddress(8000)).sync().channel();//半包?粘包?  1  0//粘包 --->ByteBuf(1024) ---> socket 65535 --- serverchannel.writeAndFlush("sunshuai\nxiaohei\nxiaojr\n");
//        channel.writeAndFlush("sunshuaixiaohei666\n");}
}
package com.messi.netty_source_03.Test06;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MyNettyServer1 {private static final Logger log = LoggerFactory.getLogger(MyNettyServer1.class);public static void main(String[] args) {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(new NioEventLoopGroup());//接受socket缓冲区大小 等同于 滑动窗口的初始值 65535//serverBootstrap.option(ChannelOption.SO_RCVBUF, 100);//netty创建ByteBuf时 执行大小 默认1024 child ScoketChannel相关
//        serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,16,16));serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Override//protected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//sunshuai\ni love you\n//xxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n//最大长度 指的是 如果超过最大长度,还没有发现分隔符,不处理。pipeline.addLast(new LineBasedFrameDecoder(1024));pipeline.addLast(new LoggingHandler());}});//serverBootstrap.bind(8000);}
}
  • debug源码流程 【之前总结过的源码流程一笔带过】

1.先debug启动服务端,完成服务端初始化操作后,会在NioEventLoop中Selector阻塞等待客户端连接,IO事件的触发

2.以运行方式进行启动客户端

3.服务端停止阻塞,开始处理Accept事件

虽然本次调用的也是read方法,但是触发的是Accept连接事件,客户端连接上来,服务端先处理连接,然后开启一个新NioEventLoop线程去完成客户端的注册和后续该SocketChannel所对应的IO事件的处理。

4.

5.客户端注册register的逻辑和服务端注册的逻辑是一致的

6.初始化工作

最终会回调到initChannel方法,完成自定义Handler的add

7.Accept连接事件处理完后,服务端接下来进行处理当前NioSocketChannel所对应的IO事件

服务端触发read事件,读取客户端发送过来的数据

前面的逻辑之前分析过,这里重点关注fireChannelRead中产生的逻辑:帧解码器Handler类的作用操作

回调帧解码器LineBasedFrameDecoder这一Handler的channelRead方法

ByteToMessageDecoder类:

callDecode方法:

一定注意一点:每一次通过帧解码器类进行解码消息,无论消息多长多短,一次只能解码出一条完整的消息,啥叫完整的消息?每遇见一个分隔符被称之为一条完整的消息数据

解码出一条完整的消息后:

直接看最后一条消息数据的处理

  • 修改一下测试案例,测试一下半包情况下,帧解码器的处理情况
package com.messi.netty_source_03.Test06;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MyNettyServer1 {private static final Logger log = LoggerFactory.getLogger(MyNettyServer1.class);public static void main(String[] args) {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(new NioEventLoopGroup());//接受socket缓冲区大小 等同于 滑动窗口的初始值 65535//serverBootstrap.option(ChannelOption.SO_RCVBUF, 100);//netty创建ByteBuf时 执行大小 默认1024 child ScoketChannel相关serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,16,16));serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Override//protected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//sunshuai\ni love you\n//xxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n//最大长度 指的是 如果超过最大长度,还没有发现分隔符,不处理。pipeline.addLast(new LineBasedFrameDecoder(1024));pipeline.addLast(new LoggingHandler());}});//serverBootstrap.bind(8000);}
}
package com.messi.netty_source_03.Test06;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;public class MyNettyClient1 {private static final Logger log = LoggerFactory.getLogger(MyNettyClient1.class);public static void main(String[] args) throws InterruptedException {log.debug("myNettyClientStarter------");EventLoopGroup eventLoopGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);Bootstrap group = bootstrap.group(eventLoopGroup);//32 ---> 1 IO操作 31线程bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new StringEncoder());}});Channel channel = bootstrap.connect(new InetSocketAddress(8000)).sync().channel();//半包?粘包?  1  0//粘包 --->ByteBuf(1024) ---> socket 65535 --- server
//        channel.writeAndFlush("sunshuai\nxiaohei\nxiaojr\n");channel.writeAndFlush("sunshuaixiaohei666\n");}
}

改动之处:

服务端:

客户端:

  • debug测试

以同样的方式启动服务端与客户端

相同的逻辑不再记录:

return null:

由于在第一轮decode后没有找到分隔符,所以out中没有数据

为什么?因为我们设置ByteBuf的最大值 初始值 最小值都为16,当read读取的数据长度一次最多为16,并且在读取到的ByteBuf中,数据没有发现分隔符,所以out输出集合中就没有add。

由于客户端写过来的消息数据还没有read完,所以会再一次让NioEventLoop线程进行处理read这一IO

这一次才找到分隔符,这样才可以返回一条完整的消息数据,并且在out集合中也会add这一条完整的消息数据

ChannelPipeline概念回顾

回顾ChannelPipeline相关的概念:

1.每一个客户端(SocketChannel | 线程)都独立享有一套pipeline管道,这样以空间换安全的方式,避免了多线程共享临界区资源导致并发安全问题。

2.Pipeline管道维护的是一组addLast进来的Handler,Pipeline的结构为双向链表。

3.Pipeline所维护的Handler的类型有哪些?ChannelInbound(输入[读]类型),ChannelOutbound(输出[写]类型) 。注释:head,tail这两个Handler是Pipeline自带的Handler。

4.channel.writeAndFlush() :从tail尾部这一Handler往前找,一直找到第一个ChannelOutBoundHandler 进行写输出处理

ctx.writeAndFlush():从当前Handler往前找,一直找到第一个ChannelOutBoundHandler 进行写输出处理

5.每一个ChannelHandler(无论输入还是输出,只要是存在于Pipeline管道中的),这些Handler都存在于ChannelContext中。ChannelContext提供了ByteBuf的内存分配器,Handler事件传播功能等。

ChannelPipeline的创建

1.创建NioServerSocketChannel或NioSocketChannel时

2.

3.初始化ChannelPipeline管道,Pipeline管道中默认带有head,tail这两个内置的Handler。

  • 除了每一个ChannelPipeline默认自带的Handler:head,tail之外,我还可以手工Pipeline.addLast(xxxHandler)添加自定义的Handler
package com.messi.netty_source_03.Test06;import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;public class MyNettyClient1 {private static final Logger log = LoggerFactory.getLogger(MyNettyClient1.class);public static void main(String[] args) throws InterruptedException {log.debug("myNettyClientStarter------");EventLoopGroup eventLoopGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);Bootstrap group = bootstrap.group(eventLoopGroup);//32 ---> 1 IO操作 31线程bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new StringEncoder());}});ByteBuf byteBuf = ByteBufAllocator.DEFAULT.directBuffer(10);Channel channel = bootstrap.connect(new InetSocketAddress(8000)).sync().channel();//半包?粘包?  1  0//粘包 --->ByteBuf(1024) ---> socket 65535 --- server
//        channel.writeAndFlush("sunshuai\nxiaohei\nxiaojr\n");channel.writeAndFlush("sunshuaixiaohei666sunshuaixiaohei666\n");}
}
package com.messi.netty_source_03.Test06;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MyNettyServer1 {private static final Logger log = LoggerFactory.getLogger(MyNettyServer1.class);public static void main(String[] args) {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(new NioEventLoopGroup());DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();//接受socket缓冲区大小 等同于 滑动窗口的初始值 65535//serverBootstrap.option(ChannelOption.SO_RCVBUF, 100);//netty创建ByteBuf时 执行大小 默认1024 child ScoketChannel相关serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,32,1024));serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Override//protected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//sunshuai\ni love you\n//xxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n//最大长度 指的是 如果超过最大长度,还没有发现分隔符,不处理。pipeline.addLast(defaultEventLoopGroup,"lineBased",new LineBasedFrameDecoder(1024));pipeline.addLast(new LoggingHandler());}});//serverBootstrap.bind(8000);}
}

debug追踪一下源码:【debug方式启动Server服务端,完成bind后再以运行的方式启动Client客户端】

1.pipeline.addLast添加自定义的Handler处理器类

(1)defaultEventLoopGroup:该参数传递的是额外创建的线程组。如果不传递该线程组呢?如果不传递该线程组,那么该Handler是由处理IO事件的线程去处理。但是假设说该自定义的Handler处理的逻辑过长,导致线程资源占用时间过久,那么是不是处理IO的线程资源就紧张了?对吧。所以我们额外补充了一组线程:defaultEventLoopGroup来用作该自定义Handler的逻辑处理。但是一定要显示配置呀。

defaultEventLoopGroup线程组和普通的Thread线程是一样的,只不过使用DefaultEventLoopGroup更加完美的和Netty体系相融合,共用同一份代码。

(2)lineBased:显示指定的Handler的名字。如果不指定,默认情况下Handler的名字为"类名#0"。比如这个Handler的name为:LineBasedFrameDecoder#0

(3)new LineBasedFrameDecoder(xxx):该参数传递的就是要添加到ChannelPipeline的Handler类啦

2.在客户端连接上服务端后,服务端会分配给客户端一个对应的NioSocketChannel,在NioSocketChannel初始化阶段会完成ChannelPipeline的创建和对应Handler的addLast添加【细致过程见之前的总结】

3.

4.checkMultiplicity方法:

当客户端连接数变多后,如果同一个Handler是非@Sharable 且 之前其他线程addLast添加过,那么直接抛出异常。避免了线程并发安全问题。

5.

Handler内部其实封装的有Context,Context真正的去封装一系列的参数:

filterName方法:

generateName方法:

构造名字name的时,啥时候容易会出现名字重复呢?

匿名内部类的情况很容易造成名字重复,因为匿名内部类都是一样的,为ChannelInbound HandlerAdapter,如果添加多个,会造成生成的名字重复,如果造成名字重复怎么办?

下面是解决方案:

如果说检测到name名字重复,假设重复的名字为ABCServer#0,那么重新生成的名字为ABCServer#1,但是会循环判断重新生成的名字是否还是重复的,直到最终context0(newName)==null退出循环为止。

context0方法:

6.addLast0方法:

7.callHandlerAddedInEventLoop(newCtx, executor)方法:

由于该Handler的线程组执行器使用的是自定义创建的DefaultEventLoopGroup,所以可能会之前IO事件处理线程组NioEventLoopGroup处理逻辑代码不太一样。其实DefaultEventLoopGroup更像是一种简化,因为Handler的逻辑处理视作是一种普通任务task的run执行,所以只把这部分的逻辑从NioEventLoop中抽离出来即可。

关键是这一个task的处理:

这不就和之前的逻辑重合到一起啦:

Inbound(输入Handler)所对应的事件传播

Inbound事件都对应有哪些?

其实说是事件,实际上各个事件方法也就是Handler的生命周期回调方法。当Handler执行到生命周期的某个步骤后,Handler就可以回调执行某一个事件方法。比如说:当创建完Channel管道后(完成生命周期中创建管道的阶段),那么就会回调ChannelActive这一方法。当Channel有输入流入时(完成生命周期中读入数据的阶段),就会回调ChannelRead这一方法。

图中所有的Handler:head,h1,h2,tail 都拥有输入Handler所对应的Inboud事件方法

head和tail所对应的Handler(Context)继承Inbound和Outbound,所以这俩具有输入,输出所对应的所有事件方法。

Inbound输入事件是通过什么api在多个Handler之间进行传播的?

ChannelPipeline通过一个双向链表的数据结构把多个Handler进行链接维护起来,但是msg数据,事件是如何在多个Handler之间传递的呢?是通过两类的API:1.ctx.fireChannelxxx(); 2.super.fireChannelxxx()

如果要传递channelActive事件,那么调用的方法就是:

1.ctx.fireChannelActive(); 2.super.fireChannelActive()

以channelRead为例:

channelRead事件的源头是什么?

NioByteUnsafe类的read方法--->pipeline.fireChannelRead()

debug流程如下:

1.

2.第一个Handler是HeadContext

3.

4.

5.

6.

从Head头handler开始往尾部方向去找,找到第一个发现的输入Inbound-Handler

本次找到的下一个输入handler为ServerBootstrapAcceptor#0

7.执行ServerBootstrapAcceptor#0这一输入handler,同理前面的流程即可。

8.

9.

注册完后,就会开始真正的读数据通信。过程很复杂,之前总结过,这里不再总结。参考之前的总结笔记去debug吧。

读数据通信时,首先是到HeadHandler#0,然后会把数据传递给LineBaseFrameDecoder这一封帧解码器去解码数据,并且解决半包粘包的问题。

下面简单记录一下:

首先会到HeadHandler#0:

接着会传递给LineBaseFrameDecoder所对应的Handler:

找到LineBaseFrameDecoder所对应的Handler去执行

【又回到解码器的流程啦,熟悉吧,之前总结过】

这个过程:在建立C-S连接,完成NioSocketChannel管道的注册后,开始read读通信操作,首先输入的数据会经过HeadHandler#0,其次读入的数据会传递给LineBasedFrameDecoder这一我们配置的解码Handler,无论是哪一个Handler都是回调其Handler重写的channelRead方法。在LineBasedFrameDecoder对应的Handler中,我们完成数据的封帧操作,解决半包粘包问题,并且完成解码操作。

封帧完毕后,把每一条完整的消息数据再一次通过fireChannelRead传递给下一个Handler:

会找到LoggingHandler#0完成对完整消息数据的控制台输出:

控制台输出:

onUnhandledInboundMessage方法:主要是完成ByteBuf资源的释放和循环再利用。如果不及时释放,那么内存溢出不是梦哈。

以下是释放ByteBuf内存的底层方法:

等所有Handler执行完毕后,NioEventLoop会再一次陷入阻塞等待:

Outbound(输出Handler)所对应的事件传播【包含AbstractUnsafe.write的源码流程,比之前更加深化了,必看】

Outbound事件都对应有哪些?

Outbound事件的传播的源头:

1.channel.writeAndFlush():

从tail这一Handler从后往前去找,直到找到第一个出现的Outbound-Handler

2.ctx.writeAndFlush():

从当前触发该操作的Handler往前寻找,直到找到第一个出现的Outbound-Handler

  • 测试案例
package com.messi.netty_source_03.Test06;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MyNettyServer1 {private static final Logger log = LoggerFactory.getLogger(MyNettyServer1.class);public static void main(String[] args) {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(new NioEventLoopGroup());DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();//接受socket缓冲区大小 等同于 滑动窗口的初始值 65535//serverBootstrap.option(ChannelOption.SO_RCVBUF, 100);//netty创建ByteBuf时 执行大小 默认1024 child ScoketChannel相关serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,32,1024));serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Override//protected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//sunshuai\ni love you\n//xxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n//最大长度 指的是 如果超过最大长度,还没有发现分隔符,不处理。pipeline.addLast(defaultEventLoopGroup,"lineBased",new LineBasedFrameDecoder(1024));pipeline.addLast(new LoggingHandler());}});//serverBootstrap.bind(8000);}
}
package com.messi.netty_source_03.Test06;import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;public class MyNettyClient1 {private static final Logger log = LoggerFactory.getLogger(MyNettyClient1.class);public static void main(String[] args) throws InterruptedException {log.debug("myNettyClientStarter------");EventLoopGroup eventLoopGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);Bootstrap group = bootstrap.group(eventLoopGroup);//32 ---> 1 IO操作 31线程bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new StringEncoder());ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {if(ctx.channel().isWritable()) {ctx.writeAndFlush("sunshuaixiaohei666sunshuaixiaohei666\n");}}});}});Channel channel = bootstrap.connect(new InetSocketAddress(8000)).sync().channel();//半包?粘包?  1  0//粘包 --->ByteBuf(1024) ---> socket 65535 --- server
//        channel.writeAndFlush("sunshuai\nxiaohei\nxiaojr\n");
//        channel.writeAndFlush("sunshuaixiaohei666sunshuaixiaohei666\n");}
}
  • 测试

1.

2.省略很多步骤

3.从write写的源头开始debug追踪

4.

5.

findContextOutbound方法:

从当前触发该操作的Handler往前寻找,直到找到第一个出现的Outbound-Handler。当前找到的handler为StringEncoder,该handler是对写出的数据进行编码操作,编码转换成ByteBuf格式。实际上编码这个过程就是把消息数据写到应用层缓冲区ByteBuf(实际上封装的是ByteBuffer,最终底层还是写到ByteBuffer中)

6.

7.

这一过程称之为编码Encoder:

8.继续向后依次唤醒pipeline双向链表的每一个Handler

9.唤醒LoggingHandler#0

10.

这一次向前找到HeadContext(Handler)

11.

12.当HeadContext执行的write操作,才是真正的unsafe.write,直接把ByteBuf中存储封装的消息数据写出到OutboundBuffer,OutboundBuffer是由一个双向链表结构,链表的每一个元素为Entry类型,msg消息数据封装到该Entry中,Entry还会封装一些其他的元数据信息。此时数据状态为unflush

filterOutboundMessage方法:

Entry.newInstance方法:

Entry是ChannelOutboundBuffer的一个内部类,为什么设置成一个内部类?因为Entry只在该类中使用,所以创建成一个内部类。Entry中封装的有ByteBuf类型的msg消息数据。还有一系列关于消息数据的元数据信息

incrementPendingOutboundBytes方法:

如果累计写出的消息数据大小超过了高水位线,那么设置为Unwritable不可写状态。

12.flush操作:

HeadContext的flush操作才是真正的把应用层缓冲区ChannelOutboundBuffer的数据写出到socket-send缓冲区。

以下过程同理write,都是会一个个迭代遍历所有Handler

直到最终找到Head-Handler,HeadHandler(Context)会把应用层缓冲区的数据真正的写到socket-send缓冲区

LoggingHandler处理完flush操作后,会继续往前传递寻找下一个写出-Handler执行flush操作:

找到HeadContext#0:

真正的AbstractUnsafe.flush:

flush0方法:


doWrite方法:

((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite():

获取到socket-send缓冲区的大小

in.nioBuffers(1024, maxBytesPerGatheringWrite):

通过ChannelOutboundBuffer(in),把封装在该buffer中的所有Entry转换一个个对应的ByteBuffer。但是最大不超过1024个。为什么要把一个个转换成一个个对应的ByteBuffer?因为为了便于操作,如果把所有的Entry都放在一个ByteBuffer中,也可以,但是需要操作读写指针的难度将会大幅度提升。

nioBuffers方法中使用到的internalNioBuffer方法:

这个方法就是提取出ByteBuf中的ByteBuffer对象

incompleteWrite(true):

当localWrittenBytes <= 0时,说明没有写出数据到socket-send缓冲区,此时需要一个兜底操作:让SelectionKey监控WRITE写事件,使得下一次写操作时可以及时监控到。

adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite)

实时的调整socket-send内核缓冲区大小

in.removeBytes(localWrittenBytes):

把已经发出去的数据从ChannelOutboundBuffer中更新删除

int writeSpinCount = config().getWriteSpinCount():

无论是读read还是write写,都会有一个次数限制,意思就是,占用线程资源的read或write操作最多连续执行16次。如果16次还没有完成读或写的任务时,线程就不会再被IO所占用,而是会切换到执行非IO的task,这是为了防止非IO-task被阻塞时间过长,可能非IO-task相对很快就执行完毕了,所以netty做了一个这样的设计。但是话说回来,16次循环IO,如果缓冲区设置的够大,可以实现16GB的IO转换,一般都是可以IO传输完毕的。

final int localWrittenBytes = ch.write(buffer):

拿到转换好的一个个的ByteBuffer数据,通过SocketChannel管道写出到socket-send内核缓冲区

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2775928.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

一款VMP内存DUMP及IAT修复工具

前言 加壳是恶意软件常用的技巧之一&#xff0c;随着黑客组织技术的不断成熟&#xff0c;越来越多的恶意软件家族都开始使用更高级的加壳方式&#xff0c;以逃避各种安全软件的检测&#xff0c;还有些恶意软件在代码中会使用各种多态变形、加密混淆、反调试、反反分析等技巧&a…

Vue3.0(五):Vue-Router 4.x详解

Vue-Router详解 vue-router教程 认识前端路由 路由实际上是网络工程中的一个术语 在架构一个网络的时候&#xff0c;常用到两个很重要的设备—路由器和交换机路由器实际上就是分配ip地址&#xff0c;并且维护着ip地址与电脑mac地址的映射关系通过映射关系&#xff0c;路由器…

Window环境下使用go编译grpc最新教程

网上的grpc教程都或多或少有些老或者有些问题&#xff0c;导致最后执行生成文件时会报很多错。这里给出个人实践出可执行的编译命令与碰到的报错与解决方法。&#xff08;ps:本文代码按照煎鱼的教程编写&#xff1a;4.2 gRPC Client and Server - 跟煎鱼学 Go (gitbook.io)&…

【MySQL】_JDBC编程

目录 1. JDBC原理 2. 导入JDBC驱动包 3. 编写JDBC代码实现Insert 3.1 创建并初始化一个数据源 3.2 和数据库服务器建立连接 3.3 构造SQL语句 3.4 执行SQL语句 3.5 释放必要的资源 4. JDBC代码的优化 4.1 从控制台输入 4.2 避免SQL注入的SQL语句 5. 编写JDBC代码实现…

《Git 简易速速上手小册》第2章:理解版本控制(2024 最新版)

文章目录 2.1 本地仓库与版本历史2.1.1 基础知识讲解2.1.2 重点案例&#xff1a;回滚错误提交2.1.3 拓展案例 1&#xff1a;利用 git bisect 查找引入 bug 的提交2.1.4 拓展案例 2&#xff1a;合并提交历史 2.2 远程仓库的使用2.2.1 基础知识讲解2.2.2 重点案例&#xff1a;在 …

midnightsun-2018-flitbip:任意地址写

题目下载 启动脚本 启动脚本如下&#xff0c;没开启任何保护 #!/bin/bash qemu-system-x86_64 \-m 128M \-kernel ./bzImage \-initrd ./initrd \-nographic \-monitor /dev/null \-append "nokaslr root/dev/ram rw consolettyS0 oopspanic paneic1 quiet" 2>…

预测模型:MATLAB线性回归

1. 线性回归模型的基本原理 线性回归是统计学中用来预测连续变量之间关系的一种方法。它假设变量之间存在线性关系&#xff0c;可以通过一个或多个自变量&#xff08;预测变量&#xff09;来预测因变量&#xff08;响应变量&#xff09;的值。基本的线性回归模型可以表示为&…

备战蓝桥杯---动态规划(基础2)

本专题主要是介绍几个比较经典的题目&#xff1a; 假设我们令f[i]为前i个的最长不下降子序列&#xff0c;我们会发现难以转移方程很难写&#xff08;因为我们不知道最后一个数&#xff09;。 于是&#xff0c;我们令f[i]为以i结尾的最长不下降子序列&#xff0c;这样子我们就可…

香港倾斜模型3DTiles数据漫游

谷歌地球全香港地区倾斜摄影数据&#xff0c;通过工具转换成3DTiles格式&#xff0c;将这份数据完美加载到三维数字地球Cesium上进行完美呈现&#xff0c;打造香港地区三维倾斜数据覆盖&#xff0c;完美呈现香港城市壮美以及维多利亚港繁荣景象。再由12.5米高分辨率地形数据&am…

SpringCloud-Ribbon:负载均衡(基于客户端)

6. Ribbon&#xff1a;负载均衡(基于客户端) 6.1 负载均衡以及Ribbon Ribbon是什么&#xff1f; Spring Cloud Ribbon 是基于Netflix Ribbon 实现的一套客户端负载均衡的工具。简单的说&#xff0c;Ribbon 是 Netflix 发布的开源项目&#xff0c;主要功能是提供客户端的软件负…

【Java EE】----SpringBoot的日志文件

1.SpringBoot使用日志 先得到日志对象通过日志对象提供的方法进行打印 2.打印日志的信息 3.日志级别 作用&#xff1a; 可以筛选出重要的信息不同环境实现不同日志级别的需求 ⽇志的级别分为&#xff1a;&#xff08;1-6级别从低到高&#xff09; trace&#xff1a;微量&#…

SCI 1区论文:Segment anything in medical images(MedSAM)[文献阅读]

基本信息 标题&#xff1a;Segment anything in medical images中文标题&#xff1a;分割一切医学图像发表年份: 2024年1月期刊/会议: Nature Communications分区&#xff1a; SCI 1区IF&#xff1a;16.6作者: Jun Ma; Bo Wang(一作&#xff1b;通讯)单位&#xff1a;加拿大多…

排序算法---插入排序

原创不易&#xff0c;转载请注明出处。欢迎点赞收藏~ 插入排序是一种简单直观的排序算法&#xff0c;它的基本思想是将待排序的元素分为已排序和未排序两部分&#xff0c;每次从未排序部分中选择一个元素插入到已排序部分的合适位置&#xff0c;直到所有元素都插入到已排序部分…

微软技术专家带你学 AI|Azure OpenAI 服务

点击蓝字 关注我们 编辑&#xff1a;Alan Wang 排版&#xff1a;Rani Sun 微软技术专家带你学 AI 新的一年&#xff0c;为帮助开发者们在 Azure 上掌握人工智能&#xff0c;我们特别带来「微软技术专家带你学 AI」系列&#xff0c;通过4期的课程&#xff0c;带大家从机器学习的…

ES高可用架构涉及常用功能整理

ES高可用架构涉及常用功能整理 1. es的高可用系统架构和相关组件2. es的核心参数2.1 常规配置2.2 特殊优化配置2.2.1 数据分片按ip打散2.2.2 数据分片机架感知2.2.3 强制要求数据分片机架感知2.2.4 写入线程池优化2.2.5 分片balance优化2.2.6 限流控制器优化 3. es常用命令3.1 …

在屏蔽任何FRP环境下从零开始搭建安全的FRP内网穿透服务

背景 本人目前在境外某大学读博&#xff0c;校园网屏蔽了所有内网穿透的工具的数据包和IP访问&#xff0c;为了实现在家也能远程访问服务器&#xff0c;就不得不先开个学校VPN&#xff0c;再登陆。我们实验室还需要访问另一个大学的服务器&#xff0c;每次我都要去找另一个大学…

海外云手机——平台引流的重要媒介

随着互联网的飞速发展&#xff0c;跨境电商、短视频引流以及游戏行业等领域正经历着迅猛的更新换代。在这个信息爆炸的时代&#xff0c;流量成为至关重要的资源&#xff0c;而其中引流环节更是关乎业务成功的关键。海外云手机崭露头角&#xff0c;成为这一传播过程中的重要媒介…

消息中间件:Puslar、Kafka、RabbigMQ、ActiveMQ

消息队列 消息队列&#xff1a;它主要用来暂存生产者生产的消息&#xff0c;供后续其他消费者来消费。 它的功能主要有两个&#xff1a; 暂存&#xff08;存储&#xff09;队列&#xff08;有序&#xff1a;先进先出 从目前互联网应用中使用消息队列的场景来看&#xff0c;…

【龙年大礼】| 2023中国开源年度报告!

【中国开源年度报告】由开源社从 2015 年发起&#xff0c;是国内首个结合多个开源社区、高校、媒体、风投、企业与个人&#xff0c;以纯志愿、非营利的理念和开源社区协作的模式&#xff0c;携手共创完成的开源研究报告。后来由于一些因素暂停&#xff0c;在 2018 年重启了这个…

Qt PCL学习(二):点云读取与保存

注意事项 版本一览&#xff1a;Qt 5.15.2 PCL 1.12.1 VTK 9.1.0前置内容&#xff1a;Qt PCL学习&#xff08;一&#xff09;&#xff1a;环境搭建 0. 效果演示 1. pcl_open_save.pro QT core guigreaterThan(QT_MAJOR_VERSION, 4): QT widgets// 添加下行代码&#…