Netty应用(三) 之 NIO开发使用 网络编程 多路复用

目录

重要:logback日志的引入以及整合步骤

5.NIO的开发使用

5.1 文件操作

5.1.1 读取文件内容

5.1.2 写入文件内容

5.1.3 文件的复制

5.2 网络编程

5.2.1 accept,read阻塞的NIO编程

5.2.2 把accept,read设置成非阻塞的NIO编程

5.2.3 引入Selector监管者 【IO多路复用】

5.2.4 补充几个仍然存在的问题

5.2.5 引入服务器端的写操作

5.2.6 Selector多路复用的核心含义


重要:logback日志的引入以及整合步骤

sl4j是一个日志门面,它有两种具体的实现:

1.log4j2(年代久远,时间长,用的少)

2.logback(用的多,支持好)

而且最重要的一点就是:logback可以输出线程名,在多线程场景下这一点很重要。

  • 引入依赖
<dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.12</version>
</dependency>
  • 引入logback.xml配置文件
<?xml version="1.0" encoding="UTF-8"?>
<configuration><!-- 控制台输出 --><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><encoder><!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符--><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern></encoder></appender><root level="DEBUG"><appender-ref ref="STDOUT"/></root></configuration>
  • 按照idea插件:log support

安装插件后,进行相关的配置:

  • 测试相关的快捷键

引入idea插件后,就可以使用快捷键并且插件会帮你自动生成相关的类!

运行:

  • 总结

  • 补充

为了让控制台对不同日志级别进行不同颜色的甄别输出,再安装一个插件:Grep Console!

重启idea后,再一次进行测试展示具体效果:哈哈哈哈,颜色不一样了哦!

5.NIO的开发使用

5.1 文件操作

5.1.1 读取文件内容

第一个程序,就是读取文件内容的程序

public class TestNIO1 {public static void main(String[] args) throws Exception{//1.创建ChannelFileChannel channel = new FileInputStream("D:\\Daily_Code\\Netty_NEW\\data.txt").getChannel();//2.创建缓冲区ByteBuffer buffer = ByteBuffer.allocate(10) ;//因为可能缓冲区一次无法读取完文件的所有字节,所以要while循环读取,可能多次反复读取到申请的缓冲区中while (true) {//在创建完缓冲区后,默认是写模式//3.把通道内获取的文件数据,写入缓冲区int read = channel.read(buffer) ;//此时说明文件已读取完,循环应该退出if(read == -1) {break;}//4.程序读取buffer的内容数据,后续的操作,设置buffer为读模式buffer.flip();//5.循环读取缓冲区的数据while(buffer.hasRemaining()) {byte b = buffer.get();//把字节转换成可视化的字符//UTF-8编码中:一个字符占用1~4的字节大小。汉字,日文,韩文占用3个字节,ASCII表的字符占用1个字节,xxx字符占用2个字节,一些少数语言字符占用4个字节System.out.println("(char)b = " + (char) b);}//6.设置buffer为写模式,因为循环上去后需要写入缓冲区,所以需要重新设置为写模式buffer.clear();}}}
5.1.2 写入文件内容
public class TestNIO11 {public static void main(String[] args) throws IOException {//1.获取Channel ,方法:1.FileOutputStream.getChannel() 2.RandomAccessFileFileChannel channel = new FileOutputStream("D:\\Daily_Code\\Netty_NEW\\data1").getChannel();//2.获取ByteBufferByteBuffer buffer = Charset.forName("UTF-8").encode("梅西");//3.write : 把ByteBuffer写入到data1这个文件channel.write(buffer);}}
5.1.3 文件的复制

对于文件的复制来说:Channel管道对象的transferTo方法拷贝的性能要高于IO流的拷贝性能以及IOUtils.copy的性能。因为transferTo方法底层采用的是零拷贝技术,后面会详细总结。

public class TestNIO12 {public static void main(String[] args) throws Exception{//把data的数据拷贝到data2//方法1:使用IO流方式实现拷贝 效率较低
//        FileInputStream inputStream = new FileInputStream("D:\\Daily_Code\\Netty_NEW\\data.txt");
//        FileOutputStream fileOutputStream = new FileOutputStream("D:\\Daily_Code\\Netty_NEW\\data2.txt");
//        byte[] byteBuffer = new byte[1024];
//        while(true) {
//            //把data文件的数据流读取到byteBuffer这个缓冲区
//            int read = inputStream.read(byteBuffer);
//            //读取完毕,退出
//            if(read == -1) {
//                break;
//            }
//            //把data文件的数据拷贝到fileOutputStream流对应的data1文件
//            fileOutputStream.write(byteBuffer,0,read);
//        }//方法2:使用commons-io这一工具类
//        FileInputStream inputStream = new FileInputStream("D:\\Daily_Code\\Netty_NEW\\data.txt");
//        FileOutputStream fileOutputStream = new FileOutputStream("D:\\Daily_Code\\Netty_NEW\\data2.txt");
//        IOUtils.copy(inputStream,fileOutputStream) ;//方法3:使用NIO方式拷贝,底层是零拷贝,效率较高。但是一次最多拷贝2GB大小的数据,一旦超过2GB,则需要分段拷贝FileChannel from = new FileInputStream("D:\\Daily_Code\\Netty_NEW\\data.txt").getChannel();FileChannel to = new FileOutputStream("D:\\Daily_Code\\Netty_NEW\\data2.txt").getChannel();long left = from.size();while (left > 0) {//一旦拷贝数据超过2GB,则需要分段拷贝!left = left - from.transferTo(from.size()-left,left,to) ;}}}

transferTo方法:

5.2 网络编程

在日常开发中,通常都是多个客户端去连接并请求一个服务器端的,所以我们要把更多的关注点放在服务端如何处理这么多个客户端的请求压力(即:连接操作+读写操作的处理)

  • 网络编程NIO的核心结构

1.Channel管道

服务端---》ServerSocketChannel (接受请求)

客户端---》SocketChannel (进行实际的通信)

2.Buffer缓冲区

无论什么类型,最终都要转换成字节类型,所以缓冲区使用的是ByteBuffer

  • 端口号的意义

端口号:

端口号就是进行区分相同ip下的相同协议的不同程序的一种标识

5.2.1 accept,read阻塞的NIO编程
public class MyClient {public static void main(String[] args) throws Exception{SocketChannel socketChannel = SocketChannel.open();socketChannel.connect(new InetSocketAddress(8000)) ;System.out.println("-------------------------------");}}
public class MyServer {public static void main(String[] args) throws Exception {//1.创建ServerSocketChannelServerSocketChannel serverSocketChannel = ServerSocketChannel.open();//2.设置服务端的监听端口serverSocketChannel.bind(new InetSocketAddress(8000)) ;//3.把建立连接成功的SocketChannel放入一个集合中List<SocketChannel> channelList = new ArrayList<>() ;//4.创建ByteBuffer缓冲区ByteBuffer buffer = ByteBuffer.allocate(20) ;while (true) {//5.阻塞时监听客户端的连接System.out.println("等待连接服务器...");SocketChannel socketChannel = serverSocketChannel.accept();System.out.println("服务器已经连接..."+socketChannel);//6.加入集合if(socketChannel != null) {channelList.add(socketChannel);}for(SocketChannel channel:channelList) {System.out.println("开始实际的数据通信....");//7.把客户端发送的数据写入到ByteBuffer缓冲区channel.read(buffer);//把buffer转换成读模式buffer.flip();//8.显示输出到控制台CharBuffer decode = Charset.forName("UTF-8").decode(buffer);System.out.println("decode.toString() = " + decode.toString());//把buffer转换成写模式buffer.clear();System.out.println("通信已经结束....");}}}}
5.2.2 把accept,read设置成非阻塞的NIO编程
public class MyClient {public static void main(String[] args) throws Exception{SocketChannel socketChannel = SocketChannel.open();socketChannel.connect(new InetSocketAddress(8000)) ;System.out.println("-------------------------------");}}
public class MyServer1 {public static void main(String[] args) throws Exception{ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress(8000));List<SocketChannel> channelList = new ArrayList<>() ;ByteBuffer buffer = ByteBuffer.allocate(20);while (true) {
//            System.out.println("等待连接服务器.....");SocketChannel socketChannel = serverSocketChannel.accept();
//            System.out.println("服务器已经连接...." + socketChannel) ;if(socketChannel != null) {socketChannel.configureBlocking(false);channelList.add(socketChannel) ;}for (SocketChannel channel : channelList) {int read = channel.read(buffer);if(read > 0) {System.out.println("开始实际的数据通信....");buffer.flip();CharBuffer decode = Charset.forName("UTF-8").decode(buffer);System.out.println("decode.toString() = " + decode.toString());buffer.clear();System.out.println("通信已经结束.......");}}}}}
5.2.3 引入Selector监管者 【IO多路复用】
  • 为什么要引入监管者Selector?

由于是非阻塞,所以会一直while循环。不断的while一直循环着十分影响性能问题,所以需要一个监管者来监管着事件状态的发生,而不要以whie死循环进行监管查看事件状态的发生

使用Selector充当监管者进行监管ServerSocketChannel和SocketChannel,但是Selector做了更加细粒度和高效的监管,Selector并不是像while循环一样无时无刻的监管着,而是Selector只在ServerSocketChannel触发ACCEPT状态事件的时候才会监管监控到,只在SocketChannel触发READ,WRITE状态事件的时候才会监管监控到。Selector就像一个警察,他不会无时无刻的监管人,只有当人触发了某些状态事件(如:抢劫,杀人等)时才会进行监控监管到人。Selector同理。这样可以极大的提高Selector监管监控的能力!也解决了while一直循环的问题。

  • Selector的核心结构

Selector有两个核心结构:一个keys,一个SelectionKeys。都是HashSet。

当serverSocketChannel.register(selector, 0, null) 或 socketChannel.register(selector, 0, null) 调用时,是把serverSocketChannel或socketChannel注册存储到keys这个HashSet中

当SSC实际触发了Accept状态事件或SC实际触发了Read或Write状态事件后,会把触发事件的Channel注册存储到SelectionKeys这个HashSet中,但一旦处理完事件,就立马把该事件对应的Channel从SelectionKeys中删除。但是keys中一直存储着register注册的Channel。

  • 为什么设置成非阻塞后,需要一直while循环?为什么引入Selector监管者后,性能就优化了?

深度分析一下吧。

首先明白一个点:调用read或write或accept这些方法,实际上需要从用户态陷入内核态,在内核态中去调用系统函数才能真正的执行对应方法,然后再调相关的硬件驱动,最终才调用到具体的硬件。

可以简单的这样理解:用户态就是我们编写的java代码,内核态就是操作系统内核的代码。我们在用户态做while循环,就是不断的轮询陷入内核去询问内核的缓冲区 "你有read或write或accept事件触发了吗?",如果没有,那么也不阻塞。如果有,那么会拷贝内核缓冲区数据到用户态进行处理。

这就是同步非组塞,我们需要时刻轮询陷入内核询问是否处理完结果了,我们还需要去管理这个过程。

不断的while循环,实际上是不断的陷入内核,开销是极大的。

未引入Selector前,我们的性能消耗为:n次从用户态陷入内核态的切换消耗性能 + n次内核轮询询问是否有accept或read或write事件触发的性能消耗

所以要引入Selector监管者:就是selector.select()方法的调用监管

把所有的Channel,无论是ServerSocketChannel,还是SocketChannel都加入到Selector中。当调用selector.select()时,表示一次就把所有注册的SC或SSC都陷入内核,在内核中会不断的轮询监听SC或SSC,在内核中会不断的监听”注册到SSC或SC是否有accept或read或write事件的触发呢?“

如果没有事件发生,selector.select()会一直阻塞!但是selector.select()只是在用户态阻塞等待着的,而它在内核中还是会不断的去做轮询,不断的去询问内核缓存区 "注册在Selector上的SC或SSC是否有accept等事件的触发呢?",如果有,那么会把内核缓冲区的数据拷贝到用户态,然后进行用户态进行处理该拷贝的数据(插一嘴,如果一次拷贝不完,那么会再一次调用selector.select()去拷贝内核缓冲区的数据)。如果没有,那么继续轮询。

引入Selector前,我们的性能消耗为:1次从用户态陷入内核态的切换消耗性能 + n次内核轮询询问是否有accept或read或write事件触发的性能消耗

总结:

引入Selector这个监管者后,我们降低了(n-1)次从用户态陷入内核态的切换消耗性能

  • 引入Selector后,Server服务器端代码的具体步骤如下:

具体步骤如下:

第一步:当SSC,SC注册到Selector对象后,SSC,SC对象都会被保存到keys中。

第二步:当selector.select()监控到SSC实际触发了Accept状态事件或SC实际触发了Read或Write状态事件后,会把这些实际触发了Accept状态事件的SSC对象 或 实际触发了Read或Write状态事件的SC对象保存到SelectionKeys中。为什么可以这样?因为SC或SSC事先已经在keys这个HashSet中完成了register存储!!selector.select()监控到事件发生后,只是把SSC或SC对象引用做了一个迁移存储。

第三步:后续我们迭代器遍历的就是SelectionKeys这个触发事件Channel集合。一旦事件处理完毕后,就应该把该事件对应的SSC或SC对象从SelectionKeys中删除掉

第四步:当再一次发生相同或不同类型事件时,selector.select()方法会再一次监控到事件的发生,再重新把SC或SSC的对象引用从keys这个HashSet迁移存储到SelectionKeys这个HashSet中!这样才是良性的循环。

补充说明:

1.等于说,无论是keys还是SelectionKeys,存储的都是SSC对象或SC对象。

2.还有一点需要注意:在keys中第一次存储SSC,SC对象时,是真正new创建出SSC,SC对象。但在SelectionKeys再进行存储时,其实就是把SSC,SC对象的引用复制到SelectionKeys中!

3.理论上:keys中存储的SSC或SC对象的数量是要大于等于selectionKeys中存储的SSC或SC的数量。因为有的SSC,SC没有触发事件状态,那么只会在keys中存储着,而不会移动迁移到selectionKeys中。

4.虽然客户端越来越多,注册的SocketChannel就会越来越多,但是注册的ServerSocketChannel一直只有一个

  • 具体代码如下:
public class MyClient {public static void main(String[] args) throws Exception{SocketChannel socketChannel = SocketChannel.open();socketChannel.connect(new InetSocketAddress(8000)) ;System.out.println("-------------------------------");}}
public class MyServer2 {public static void main(String[] args) throws Exception{//获取ServerSocketChannelServerSocketChannel serverSocketChannel = ServerSocketChannel.open();//监听绑定端口8000serverSocketChannel.bind(new InetSocketAddress(8000));//设置为非阻塞serverSocketChannel.configureBlocking(false);//获取Selector监管者Selector selector = Selector.open();//把ServerSocketChannel注册到Selector监管者上。实际上register注册存储到的是Selector的keys这个HashSet中SelectionKey selectionKey = serverSocketChannel.register(selector, 0, null);//添加感兴趣的事件:ACCEPTselectionKey.interestOps(SelectionKey.OP_ACCEPT) ;System.out.println("MyServer2.main");while (true) {//应用层阻塞等待监听Selector注册的所有Channel对应的所有事件状态,但是在内核层会不断的轮询询问是否事件触发开启//当内核层监听到有事件触发,那么应用层会停止等待,并且会把该触发事件对应的Channel存储到Selector对应的SelectionKeys这个HashSet中//如何存储到SelectionKeys中的?是从keys中拷贝对象引用到SelectionKeys中selector.select();System.out.println("-----------------MyServer2---------------------");//获取的是触发事件的所有Channel对象,即SelectionKeys这个HashSet中的所有Channel对象Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();//使用迭代器进行遍历,为了使用完毕后就把Channel从SelectionKeys中删除while (iterator.hasNext()) {SelectionKey key = iterator.next();//把使用完毕的触发事件Channel从SelectionKeys中删除,避免空指针异常//举个例子:如果不删除,第二次进入channel.accept()返回的是null,那么自然就会空指针异常!所以处理完事件后要把Channel从SelectionKeys中及时删除,但keys中一直保留着register注册的Channeliterator.remove();if(key.isAcceptable()) {ServerSocketChannel channel = (ServerSocketChannel) key.channel();//获取到连接的客户端ChannelSocketChannel sc = channel.accept();//设为非阻塞sc.configureBlocking(false);//实际上register注册存储到的是Selector的keys这个HashSet中SelectionKey scKey = sc.register(selector, 0, null);//添加感兴趣的事件scKey.interestOps(SelectionKey.OP_READ);} else if(key.isReadable()) {try {//此时为SocketChannel的读事件SocketChannel sc = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(5);//把Channel管道中的数据读取到Buffer缓冲区int read = sc.read(buffer);if(read == -1) {//当客户端调用socketChannel.close()请求与服务器断开时,会触发一个READ事件给服务器端的。但是这个READ事件比较特殊,服务器端处理后sc.read(buffer)==-1,此时必须cancel()一下//cancel()方法的调用表示服务器已经完成对客户端socketChannel.close()断开的处理。//如果不调用cancel,那么服务端会一直认为没有处理完该特殊的READ事件,会一直反复调用selector.select()方法key.cancel();} else {//转换成读模式buffer.flip();//解码buffer缓冲区中的数据成字符类型CharBuffer decode = Charset.forName("UTF-8").decode(buffer);//打印输出System.out.println("decode.toString() = " + decode.toString());}} catch (IOException e) {e.printStackTrace();//这里是做健壮性的处理//当出现异常时,客户端与服务器也是断开,此时服务器会接受到一个断开的READ事件。所以需要调用cancel方法处理该READ事件key.cancel();}}}}}}
  • debug测试

1.debug启动Server端

2.debug启动Client端

3.测试客户端连接服务端,服务端selector.select()监听到

客户端连接上服务器后,selector.select()方法就会监听到该连接事件,后续就会把该连接事件对应的Channel对象存储到SelectionKeys

4.测试客户端write写,服务端selector.select()监听到READ事件的产生,后续会把该READ事件对应的Channel存储到SelectionKeys中,使用channel.read(buffer),一次处理不完的话会多次调用selector.select()

Server端:

5.测试客户端socketChannel.close(),服务端selector.select()监听到该特殊的READ事件的产生,后续会把该特殊READ事件对应的Channel存储到SelectionKeys中,使用key.cancel()处理。如果不使用key.cancel()进行处理的话,会一直调用selector.select()

Server端:

  • 细节分析1:

客户端一次发送数据"leomessi",如果缓冲区ByteBuffer大小为5,那么一次读取不完客户端发送的数据,会分批处理客户端发送的数据,直到处理完毕,那么会while true循环2次selector.select(),服务端第一次调用selector.select()处理的是leome,第二次调用select执行的是ssi。

结论:

1.当client客户端连接服务器发起操作后,服务器必须全部处理完成后整个交互才算结束,如果没有全部处理完成,select方法就会被一直调用

上述的案例中,由于leomessi这个数据一次没有处理完,所以才会被调用多次了

2.在某些特殊的操作下,服务器端无法处理,那么select方法就会被频繁的调用 (如:socketChannel.close()的调用,服务端不可以通过channel.read(buffer)简单的进行处理,需要调用cancle()处理该断开的READ事件)

  • 细节分析2:针对于细节分析1中结论第二点,会需要cancle方法的引入

为什么引入cancle方法?

客户端与服务器端建立连接后,过一段时间后,SocketChannel调用close方法与服务端断开。此时相当于客户端发送了一个READ事件给到服务端,只不过这个READ事件十分特殊,它是一个告知服务端我要断开的READ事件,服务端需要进行特殊处理,而不是简单的调用read(buffer)!针对于这种特殊的READ事件,在处理时需要调用cancle方法。如果不这样处理,会给服务端一种错觉,总感觉还有数据没有处理完成,会导致selector.select()一直轮询访问内核,性能消耗极大。

  • 解决半包粘包 并且 引入attatchment附件机制

先说半包粘包问题:

当发送数据"leomessi",如果对方ByteBuffer缓冲区大小只有5,那么第一次接收到的为 leome,第二次接收到的为ssi。其实这就是半包粘包的现象,原本服务端想以完整leomessi进行接收的

为解决该问题,引入相关方法,但是又出现了一个新问题:

如果发送的数据为"leo\nmessi"在解决半包粘包过程中,由于ByteBuffer缓冲区为7,那么第一次读取到的是leo\nmes,通过解决半包粘包方法可知,compact会让mes迁移到最前面等待下一次,但是由于每一次我们都创建的是新ByteBuffer缓冲区,所以mes丢失!所以最终输出的是leo 和 si。

为解决该问题,引入attatchment机制,把Channel与ByteBuffer绑定!具体操作就是在Channel.register的时候,把ByteBuffer作为附件注册给该Channel,之后直接通过Channel获取附件即可拿到对应唯一的ByteBuffer缓冲区,只要缓冲区保持唯一,那么就不会出现compact后数据丢失的情况

首先演示半包粘包的问题:

public class MyServer3 {public static void main(String[] args) throws Exception{ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(8000));serverSocketChannel.configureBlocking(false);Selector selector = Selector.open();SelectionKey selectionKey = serverSocketChannel.register(selector, 0, null);selectionKey.interestOps(SelectionKey.OP_ACCEPT);while (true) {selector.select();System.out.println("MyServer3.main");Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();if(key.isAcceptable()) {ServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel socketChannel = channel.accept();socketChannel.configureBlocking(false);SelectionKey scKey = socketChannel.register(selector, 0, null);scKey.interestOps(SelectionKey.OP_READ);} else if (key.isReadable()) {try {SocketChannel channel = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(5);int read = channel.read(buffer);if (read == -1) {key.cancel();} else {buffer.flip();System.out.println("Charset.forName(\"UTF-8\").decode(buffer).toString() = " + Charset.forName("UTF-8").decode(buffer).toString());}} catch (IOException e) {e.printStackTrace();key.cancel();}}}}}}

服务端:启动

客户端:

向服务端发送数据leomessi,由于服务端ByteBuffer过小,导致半包粘包问题

解决方法:

再测试:解决半包粘包问题

但是出现新问题!通过上面解决问题步骤可知,我们必须要把ByteBuffer缓冲区设为"一次可以读取完客户端发送的数据的大小",如果不这样设置,会出现问题!演示如下:

为什么会出现这种问题?

解决方法,把Channel与Buffer进行绑定,使用附件机制进行绑定!!!

测试:

解决半包粘包过程中,由于ByteBuffer不断重新创建导致数据丢失的问题:

public class MyServer3 {private static void doLineSplit(ByteBuffer buffer) {buffer.flip();for (int i = 0; i < buffer.limit(); i++) {if (buffer.get(i) == '\n') {//hellosuint length = i + 1 - buffer.position();ByteBuffer target = ByteBuffer.allocate(length);for (int j = 0; j < length; j++) {target.put(buffer.get());}//截取工作完成target.flip();System.out.println("StandardCharsets.UTF_8.decode(target).toString() = " + StandardCharsets.UTF_8.decode(target).toString());}}buffer.compact();}public static void main(String[] args) throws Exception{ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(8000));serverSocketChannel.configureBlocking(false);Selector selector = Selector.open();SelectionKey selectionKey = serverSocketChannel.register(selector, 0, null);selectionKey.interestOps(SelectionKey.OP_ACCEPT);while (true) {selector.select();System.out.println("MyServer3.main");Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();if(key.isAcceptable()) {ServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel socketChannel = channel.accept();socketChannel.configureBlocking(false);//使用附件机制,把当前SocketChannel与ByteBuffer缓冲区进行绑定ByteBuffer buffer = ByteBuffer.allocate(10);SelectionKey scKey = socketChannel.register(selector, 0, buffer);scKey.interestOps(SelectionKey.OP_READ);} else if (key.isReadable()) {try {SocketChannel channel = (SocketChannel) key.channel();
//                        ByteBuffer buffer = ByteBuffer.allocate(10);ByteBuffer buffer = (ByteBuffer) key.attachment();int read = channel.read(buffer);if (read == -1) {key.cancel();} else {
//                            buffer.flip();
//                            System.out.println("Charset.forName(\"UTF-8\").decode(buffer).toString() = " + Charset.forName("UTF-8").decode(buffer).toString());//解决半包粘包问题doLineSplit(buffer);}} catch (IOException e) {e.printStackTrace();key.cancel();}}}}}}

补充:

假如说:我们不仅想要把ByteBuffer缓冲区作为附件,还想要其他的数据作为附件,该咋怎么办???

方法1:封装多个attatchment元素成一个集合然后register。优点:不存在僵化,后续如果有新的attatchment元素增加,只需要add到集合中即可。缺点:不具有业务含义

方法2:封装多个attatchment元素成一个类然后register。优点:具有业务含义。缺点:存在僵化,如果后续有新的attatchment元素增加,那就无法再增加。当然:类里面可以添加一个集合属性!这样就完美解决了类的缺点

为什么封装成一个类后才能具有业务含义?一个属性name="小明",如果单看name这个属性,你知道name代表的是一个狗的名字还是人的名字,只有把name封装到一个类中,比如 public class User {private String name="小明"}后才可以体现该name代表的是一个人名

  • 解决缓冲区大小不够导致频繁循环。需要扩容解决该问题

客户端发送数据'leomessi\nleo'给服务器,但是Buffer缓冲区大小只有5字节,所以第一次只能接收到leome,由于在接收的数据中查询不到\n,所以不会读取任何字节,所以buffer.compact()调用时,会把未读取的leome迁移到最前面,如下:

演示问题如下:

分析:

解决方法:

扩容,每次按照2倍扩容。

演示:

5.2.4 补充几个仍然存在的问题

1.

除了扩容需要我们清楚,我们依然要考虑缩容的问题:

为什么要缩容?

在第一段时间操作时,由于ByteBuffer缓冲区太小(也可能是客户端发送的数据过大导致后续不断扩容),我们不断的进行扩容操作。但是过了一段时间后,客户端发送的消息数据相对当前缓冲区太小,那么我们应该不断的缩小缓冲区大小,也就是缩容。如果不考虑缩容的话,那么在多线程并发的场景下,每过来一个客户端,我们都需要建立一个SocketChannel,都需要与SocketChannel对应建立一个ByteBuffer缓冲区。假设有10万个客户端与服务端连接,需要建立10万个ByteBuffer。如果ByteBuffer不在客户端发送相对小的消息数据进行合理缩容,由于ByteBuffer是占用内存的!!所以会造成极大的内存浪费!

无论是是扩容还是缩容,后续netty帮我们做的很好了。。。

2.

我们无论是进行扩容还是缩容时,需要把老ByteBuffer缓冲区的数据拷贝到新ByteBuffer缓冲区。ByteBuffer底层就是数组,需要一个个的去拷贝,效率特别低,为了优化,需要引入零拷贝机制。

3.

在网络通信过程中,如何处理解决半包粘包的问题?其实就两种方式,如下:

方法1.使用 \n 区分不同的信息数据,保证了消息的完整性。但是这种挨个字节进行检索\n的方式,效率较低。(使用\n+compact方式进行解决半包粘包问题)

方法2.使用头体分离的方式。在'头'中记录元数据,所谓元数据就是真实数据的各种信息,但是不包括真实数据本身。如:记录真实数据的大小,真实数据的类型。在'体'中记录的才是真实数据本身。

使用头体分离的话,我们就可以通过'头'中元数据中记录的真实数据的大小,然后按这个大小去读取'体'中真实数据。这样也解决了半包粘包的问题。这种方式在当前是流行的。效率较高。

如:HTTP协议,在该协议中响应头中有一个Content-Length,根据这个大小我们再去响应体中去读取Content-Length大小的真实数据!

  • 补充网络知识:分包与流量控制

分包:

要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生分包。

待发送数据大于MSS(最大报文长度),TCP在传输前将进行分包。

流量控制:

5.2.5 引入服务器端的写操作
  • 第一版代码
public class MyServer5 {public static void main(String[] args) throws Exception{ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress(8000));Selector selector = Selector.open();SelectionKey selectionKey = serverSocketChannel.register(selector,0,null);selectionKey.interestOps(SelectionKey.OP_ACCEPT);while (true) {selector.select();System.out.println("MyServer5.main");Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();if (key.isAcceptable()) {ServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel socketChannel = channel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector,SelectionKey.OP_READ);StringBuffer stringBuffer = new StringBuffer();for (int i = 0; i < 200000000; i++) {stringBuffer.append("l");}ByteBuffer buffer = Charset.forName("UTF-8").encode(stringBuffer.toString());while (buffer.hasRemaining()) {int write = socketChannel.write(buffer);System.out.println("每次写给客户端的长度为: " + write);}}}}}
}
public class MyClient1 {public static void main(String[] args) throws Exception{SocketChannel socketChannel = SocketChannel.open();socketChannel.connect(new InetSocketAddress(8000));int read = 0;while (true) {ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);read += socketChannel.read(buffer);System.out.println("本次读取的大小为: " + read);buffer.clear();}}}

测试:

Server:

Client:

  • 分析第一版存在的问题

  • 解决第一版存在的问题,引出第二版代码

代码思路如图所示:

public class MyServer6 {public static void main(String[] args) throws Exception{//ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);//服务端监听端口8000serverSocketChannel.bind(new InetSocketAddress(8000));//创建监管者Selector selector = Selector.open();//注册ServerSocketChannel到Selector监管者上。具体表现为:把该Channel存储到keys这一HashSet中serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);while (true) {//监管者调用select()在操作系统内核轮询询问文件描述符是否就绪(是否触发事件?)【应用层等待阻塞,但是在内核层一直轮询询问是否有事件触发!】// 一旦触发某一事件,则把该触发事件的Channel的引用从keys中复制copy到SelectionKeys这一HashSet中selector.select();System.out.println("MyServer6.main");//获取注册在Selector中的SelectionKeys集合Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();//遍历while (iterator.hasNext()) {SelectionKey key = iterator.next();//一旦处理完该事件,那么就要把该事件对应的Channel从SelectionKeys中删除 避免重复处理该事件iterator.remove();//连接事件if (key.isAcceptable()) {//拿到ServerSocketChannelServerSocketChannel channel = (ServerSocketChannel) key.channel();//accept监听获取到对应连接上的SocketChannelSocketChannel socketChannel = channel.accept();//设为非阻塞socketChannel.configureBlocking(false);//注册READ事件SelectionKey scKey = socketChannel.register(selector, SelectionKey.OP_READ);//拼接StringBuffer sb = new StringBuffer();for (int i = 0; i < 200000000; i++) {sb.append("s");}//创建缓冲区,默认为写模式ByteBuffer buffer = Charset.forName("UTF-8").encode(sb.toString());//把该Buffer缓冲区的数据通过SocketChannel写给client客户端int write = socketChannel.write(buffer);System.out.println("第一次写出的数据大小:" + write);//如果Buffer一次没有写完,那么需要增加WRITE事件,并且把Buffer缓冲区传递下去(共享但要保证线程安全,所以采取附件的方式)//为什么不能把Buffer设为全局变量进行共享?因为在多线程情况下,共享全局变量,产生并发安全问题。局部变量一般都是线程安全的,因为每一个线程都会在自己的栈帧中保持自己独立的局部变量//为什么要增加WRITE事件?触发WRITE事件的条件:客户端此时处理的及,允许你服务端发数据。//啥时候不能触发WRITE事件?客户端处理不过来,压力大,不允许服务端再发。如果不使用WRITE事件方式,则服务端在不能发数据的情况下,也会发空数据包给客户端。这样是不是导致浪费了服务端有限的线程资源?所以需要WRITE事件!if (buffer.hasRemaining()) {scKey.interestOps(scKey.interestOps()+SelectionKey.OP_WRITE);scKey.attach(buffer);}} else if (key.isWritable()) {//写事件//获取发送数据的ChannelSocketChannel channel = (SocketChannel) key.channel();//获取附件,目的是接着第一次写之后继续写,保证Buffer唯一传递ByteBuffer attachment = (ByteBuffer) key.attachment();//写!但是不循环写,本次写完后,如果客户端还让继续写,那么Selector.select()会再一次监听到WRITABLE事件,那么可以再一次到这里写!int write = channel.write(attachment);System.out.println("本次写出的数据大小为: " + write);//如果Buffer写完了,那么需要把附件置空,并且清空WRITE事件if (!attachment.hasRemaining()) {key.attach(null);key.interestOps(key.interestOps()-SelectionKey.OP_WRITE);}}}}}}
public class MyClient1 {public static void main(String[] args) throws Exception{SocketChannel socketChannel = SocketChannel.open();socketChannel.connect(new InetSocketAddress(8000));int read = 0;while (true) {ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);read += socketChannel.read(buffer);System.out.println("本次读取的大小为: " + read);buffer.clear();}}}

测试:

Server端

Client端:

5.2.6 Selector多路复用的核心含义

把多个客户端(SocketChannel)和服务端(ServerSocketChannel)都注册到一个Selector上,Selector监听内核缓冲区,当允许触发READ或WRITE的时候,才会让当前线程去执行该就绪事件的操作。这样就更加高效的利用了当前线程的资源性能。也使得单线程的资源利用率发挥的很好。


linux操作系统 默认可以允许客户端连接的socket数量不够多,我们需要在合适的事件增加修改这个socket数量。啥时候修改呢?当并发上来后,CPU和内存相比之前没什么太大变化,说明此时socket连接数量不够了,客户端再来请求连接时,服务端(linux)直接给拒绝了,所以CPU和内存不受这次并发流量大的影响。此时我们应该提升linux服务器对应的socket数量。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2777607.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

低代码平台与BPM:两者是否具有可比性?

传统上&#xff0c;业务流程管理 (BPM) 系统通过消除手动重复工作来帮助企业简化复杂的流程。它用于自动化、监控和分析业务流程&#xff0c;使高层管理人员的工作更轻松。这反过来又提高了所有其他相关利益相关者的生产力&#xff0c;并为业务增长铺平了道路。BPM 软件还使决策…

springboot174基于springboot的疾病防控综合系统的设计与实现

简介 【毕设源码推荐 javaweb 项目】基于springbootvue 的 适用于计算机类毕业设计&#xff0c;课程设计参考与学习用途。仅供学习参考&#xff0c; 不得用于商业或者非法用途&#xff0c;否则&#xff0c;一切后果请用户自负。 看运行截图看 第五章 第四章 获取资料方式 **项…

Linux-3 进程概念(三)

1.环境变量 1.1基本概念 环境变量(environment variables)一般是指在操作系统中用来指定操作系统运行环境的一些参数 如&#xff1a;我们在编写C/C代码的时候&#xff0c;在链接的时候&#xff0c;从来不知道我们的所链接的动态静态库在哪里&#xff0c;但是照样可以链接成功…

初识Solidworks:我的第一份作业的感想

从来没用CAD软件画过机械设计图。但我脑子里有一种概念&#xff0c;无非就是把尺规作图软件化&#xff0c;更方便画图、更方便修改、更方便打印一些。但第一份 Solidworks 作业就颠覆了我的认知&#xff0c;考虑到这个软件的上市时间&#xff0c;让我意识到自己对 CAD 软件的认…

如何让内网client通过公网地址访问内网server?

第一步&#xff0c;实现任意公网用户访问内网server。按教育网规矩&#xff0c;公网过来的流量要访问校内网的server必须从教育专线&#xff08;路由器接口G0/0/1)进入。 第二步&#xff0c;实现内网主机通过公网地址210.43.2.3能够访问内网server192.168.1.2&#xff0c;图中①…

ES实战-book笔记1

#索引一个文档,-XPUT手动创建索引, curl -XPUT localhost:9200/get-together/_doc/1?pretty -H Content-Type: application/json -d {"name": "Elasticsearch Denver","organizer": "Lee" } #返回结果 {"_index" : "g…

鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之Toggle组件

鸿蒙&#xff08;HarmonyOS&#xff09;项目方舟框架&#xff08;ArkUI&#xff09;之Toggle组件 一、操作环境 操作系统: Windows 10 专业版、IDE:DevEco Studio 3.1、SDK:HarmonyOS 3.1 二、Toggle组件 组件提供勾选框样式、状态按钮样式及开关样式。 子组件 仅当Toggl…

林浩然与杨凌芸的Java异常处理大冒险

林浩然与杨凌芸的Java异常处理大冒险 Lin Haoran and Yang Lingyun’s Java Exception Handling Adventure 在一个阳光明媚的午后&#xff0c;编程世界的英雄——林浩然和杨凌芸坐在Java王国的咖啡馆里&#xff0c;一边品尝着香醇的代码咖啡&#xff0c;一边探讨着他们的最新挑…

干掉Xshell,这款开源的终端工具逼格真高!

GitHub 上已经有 53.7k 的 star 了&#xff0c;这说明 Tabby 非常的受欢迎&#xff1a; https://github.com/eugeny/tabby Tabby 是一个高度可定制化的 跨平台的终端工具&#xff0c;支持 Windows、macOS 和 Linux&#xff0c;自带 SFTP 功能&#xff0c;能与 Linux 服务器轻…

Blazor入门100天 : 自做一个支持长按事件的按钮组件

好长时间没继续写这个系列博客了, 不知道大家还记得我吗? 话不多说,直接开撸. 配套源码 demo https://blazor.app1.es/b19LongPressButton ####1. 新建 net8 blazor 工程 b19LongPressButton 至于用什么模式大家各取所需, 我创建的是ssr单工程, 如果大家不小心建立错了按页…

Mysql——更新数据

注&#xff1a;文章参考&#xff1a; MySQL 更新数据 不同条件(批量)更新不同值_update批量更新同一列不同值-CSDN博客文章浏览阅读2w次&#xff0c;点赞20次&#xff0c;收藏70次。一般在更新时会遇到以下场景&#xff1a;1.全部更新&#xff1b;2.根据条件更新字段中的某部分…

[office] excel求乘积的公式和方法 #媒体#笔记#经验分享

excel求乘积的公式和方法 本文首先给出两个常规的excel求乘积的链接&#xff0c;然后再例举了一个文字和数字在同一单元格里面的excel求乘积的公式写法。 excel求乘积的方法分为两种&#xff0c;第一种是直接用四则运算的*来求乘积&#xff0c;另外一种就是使用PRODUCT乘积函数…

【51单片机】自定义静态数码管显示(设计思路&代码演示)

前言 大家好吖&#xff0c;欢迎来到 YY 滴单片机系列 &#xff0c;热烈欢迎&#xff01; 本章主要内容面向接触过单片机的老铁 主要内容含&#xff1a; 本章节内容为【实现动静态数码管】项目的第三个模块完整章节&#xff1a;传送门 欢迎订阅 YY滴C专栏&#xff01;更多干货持…

谷歌发布AI新品Gemini及收费模式;宜家推出基于GPT的AI家装助手

&#x1f989; AI新闻 &#x1f680; 谷歌发布AI新品Gemini及收费模式 摘要&#xff1a;谷歌宣布将原有的AI产品Bard更名为Gemini&#xff0c;开启了谷歌的AI新篇章。同时推出了强化版的聊天机器人Gemini Advanced&#xff0c;支持更复杂的任务处理&#xff0c;提供了两个月的…

springboot175图书管理系统

简介 【毕设源码推荐 javaweb 项目】基于springbootvue 的 适用于计算机类毕业设计&#xff0c;课程设计参考与学习用途。仅供学习参考&#xff0c; 不得用于商业或者非法用途&#xff0c;否则&#xff0c;一切后果请用户自负。 看运行截图看 第五章 第四章 获取资料方式 **项…

Netty应用(二) 之 ByteBuffer

目录 4.ByteBuffer详解 4.1 ByteBuffer为什么做成一个抽象类&#xff1f; 4.2 ByteBuffer是抽象类&#xff0c;他的主要实现类为 4.3 ByteBuffer的获取方式 4.4 核心结构&#xff08;NIO的ByteBuffer底层是啥结构&#xff0c;以及读写模式都是根据这些核心结构进行维护的&a…

Netty应用(一) 之 NIO概念 基本编程

目录 第一章 概念引入 1.分布式概念引入 第二章 Netty基础 - NIO 1.引言 1.1 什么是Netty&#xff1f; 1.2 为什么要学习Netty&#xff1f; 2.NIO编程 2.1 传统网络通信中开发方式及问题&#xff08;BIO&#xff09; 2.1.1 多线程版网络编程 2.1.2 线程池版的网络编程…

渗透测试-信息打点与架构分析细节梳理

渗透测试-信息打点与架构分析细节梳理 为了保障信息安全&#xff0c;我在正文中会去除除靶场环境的其他任何可能的敏感信息 什么是网站架构 网站架构包括网站的方方面面&#xff0c;下面是常见的内容&#xff1a; 前端&#xff08;Front-End&#xff09;&#xff1a; 使用Reac…

【C语言】深入理解指针

目录 1.字符指针 2.指针数组 3.数组指针 4.数组传参与指针传参 一维数组传参 二维数组传参 一级指针传参 二级指针传参 5.函数指针 6.函数指针数组 7.指向函数指针数组的指针&#xff08;了解即可&#xff09; 8.回调函数 回调函数的应用&#xff1a;库函数qsort …

ANSI Escape Sequence 下落的方块

ANSI Escape Sequence 下落的方块 1. ANSI Escape 的用途 无意中发现 B站有人讲解&#xff0c; 完全基于终端实现俄罗斯方块。 基本想法是借助于 ANSI Escape Sequence 实现方方块的绘制、 下落动态效果等。对于只了解 ansi escape sequence 用于 log 的颜色打印的人来说&…