Netty底层数据交互源码分析

文章目录

    • 1. 前题回顾
    • 2. 主线流程源码分析
    • 3. Netty底层的零拷贝
    • 4. ByteBuf内存池设计

书接上文

1. 前题回顾

上一篇博客我们分析了Netty服务端启动的底层原理,主要就是将EventLoop里面的线程注册到了Select中,然后调用select方法监听客户端连接,我们这里从这个EventLoop里面线程的run方法开始分析。

2. 主线流程源码分析

进入EventLoop的run方法:

@Overrideprotected void run() {for (;;) {try {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:// fall-through to SELECT since the busy-wait is not supported with NIOcase SelectStrategy.SELECT:select(wakenUp.getAndSet(false));// 'wakenUp.compareAndSet(false, true)' is always evaluated// before calling 'selector.wakeup()' to reduce the wake-up// overhead. (Selector.wakeup() is an expensive operation.)//// However, there is a race condition in this approach.// The race condition is triggered when 'wakenUp' is set to// true too early.//// 'wakenUp' is set to true too early if:// 1) Selector is waken up between 'wakenUp.set(false)' and//    'selector.select(...)'. (BAD)// 2) Selector is waken up between 'selector.select(...)' and//    'if (wakenUp.get()) { ... }'. (OK)//// In the first case, 'wakenUp' is set to true and the// following 'selector.select(...)' will wake up immediately.// Until 'wakenUp' is set to false again in the next round,// 'wakenUp.compareAndSet(false, true)' will fail, and therefore// any attempt to wake up the Selector will fail, too, causing// the following 'selector.select(...)' call to block// unnecessarily.//// To fix this problem, we wake up the selector again if wakenUp// is true immediately after selector.select(...).// It is inefficient in that it wakes up the selector for both// the first case (BAD - wake-up required) and the second case// (OK - no wake-up required).if (wakenUp.get()) {selector.wakeup();}// fall throughdefault:}} catch (IOException e) {// If we receive an IOException here its because the Selector is messed up. Let's rebuild// the selector and retry. https://github.com/netty/netty/issues/8566rebuildSelector0();handleLoopException(e);continue;}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {processSelectedKeys();} finally {// Ensure we always run tasks.runAllTasks();}} else {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}}

而run方法这里会执行一个select(wakenUp.getAndSet(false));方法,我们进入该方法:

 private void select(boolean oldWakenUp) throws IOException {Selector selector = this.selector;try {int selectCnt = 0;long currentTimeNanos = System.nanoTime();long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();if (nextWakeupTime != normalizedDeadlineNanos) {nextWakeupTime = normalizedDeadlineNanos;}for (;;) {long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;if (timeoutMillis <= 0) {if (selectCnt == 0) {selector.selectNow();selectCnt = 1;}break;}// If a task was submitted when wakenUp value was true, the task didn't get a chance to call// Selector#wakeup. So we need to check task queue again before executing select operation.// If we don't, the task might be pended until select operation was timed out.// It might be pended until idle timeout if IdleStateHandler existed in pipeline.if (hasTasks() && wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt = 1;break;}int selectedKeys = selector.select(timeoutMillis);selectCnt ++;if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {// - Selected something,// - waken up by user, or// - the task queue has a pending task.// - a scheduled task is ready for processingbreak;}if (Thread.interrupted()) {// Thread was interrupted so reset selected keys and break so we not run into a busy loop.// As this is most likely a bug in the handler of the user or it's client library we will// also log it.//// See https://github.com/netty/netty/issues/2426if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely because " +"Thread.currentThread().interrupt() was called. Use " +"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");}selectCnt = 1;break;}long time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// timeoutMillis elapsed without anything selected.selectCnt = 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {// The code exists in an extra method to ensure the method is not too big to inline as this// branch is not very likely to get hit very frequently.selector = selectRebuildSelector(selectCnt);selectCnt = 1;break;}currentTimeNanos = time;}if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}}} catch (CancelledKeyException e) {if (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}// Harmless exception - log anyway}}

上面代码核心的一句就是int selectedKeys = selector.select(timeoutMillis);,这里就调用了Nio中的Selector的select方法。假如现在客户端有连接事件来了,这个方法就会结束阻塞,然后回到run方法,执行下面代码:

if (ioRatio == 100) {try {processSelectedKeys();} finally {// Ensure we always run tasks.runAllTasks();}}

处理连接事件的核心函数就是processSelectedKeys();,我们进入该方法:

private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized();} else {processSelectedKeysPlain(selector.selectedKeys());}}

如果select轮询到的事件selectedKeys不为空,就执行processSelectedKeysOptimized方法,我们进入该方法:

private void processSelectedKeysOptimized() {//遍历selectedKeysfor (int i = 0; i < selectedKeys.size; ++i) {final SelectionKey k = selectedKeys.keys[i];// null out entry in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.keys[i] = null;final Object a = k.attachment();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {// null out entries in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.reset(i + 1);selectAgain();i = -1;}}}

上面代码就是便利所有的selectionKey然后调用 processSelectedKey(k, (AbstractNioChannel) a);函数进行处理。我们进入该方法:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {// If the channel implementation throws an exception because there is no event loop, we ignore this// because we are only trying to determine if ch is registered to this event loop and thus has authority// to close ch.return;}// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is// still healthy and should not be closed.// See https://github.com/netty/netty/issues/5125if (eventLoop != this || eventLoop == null) {return;}// close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());return;}try {int readyOps = k.readyOps();// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}

上面方法就是处理事件的核心代码(注意我们现在是分析的客户端发来连接事件的场景)。上面方法是非常重要的,我们详细分析一下:

  1. 首先执行int readyOps = k.readyOps();函数这里就是拿到当前的事件类型
  2. 然后Netty就会判断事件的类型,判断当前的事件类型是读事件、写事件还是连接事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}
  1. 由于我们这里分析的是客户端连接事件,即SelectionKey.OP_ACCEPT事件,所以这里我们会执行unsafe.read();方法我们进入该方法
//实现是NioMessageUnsafeprivate final List<Object> readBuf = new ArrayList<Object>();@Overridepublic void read() {assert eventLoop().inEventLoop();final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {do {int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();allocHandle.readComplete();pipeline.fireChannelReadComplete();if (exception != null) {closed = closeOnReadError(exception);pipeline.fireExceptionCaught(exception);}if (closed) {inputShutdown = true;if (isOpen()) {close(voidPromise());}}} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();}}}

首先上面调用的第一个核心函数是doReadMessages(readBuf);,readBuf就是上面声明的一个List集合private final List<Object> readBuf = new ArrayList<Object>();按照这个方法的字面意思我们可以理解为来读客户端发过来的消息的。我们进入该方法:

//由于现在是连接事件,所以用NioServerSocektChannel这个实现类@Overrideprotected int doReadMessages(List<Object> buf) throws Exception {//这里就是获取了一个Nio里面的SocketChannel(这里就是NIO的代码,accept方法返回一个SocketChannel)SocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {//下面代码首先将NIO原生的SocketChannel封装为了netty中的NioSocketChannel,然后添加到了前面的list集合中了buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);try {ch.close();} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);}}return 0;}

上面代码的逻辑是,首先对于连接事件它这里首先获得了一个SocketChannel对象,然后将这个原生的SocketChannel对象封装成了一个NioSocketChannel对象,然后添加到了前面代码声明的List集合中了。这里我们看一下将SocketChannel对象封装为NioSocketChannel对象底层到底做了些什么事:

 public NioSocketChannel(Channel parent, SocketChannel socket) {super(parent, socket);config = new NioSocketChannelConfig(this, socket.socket());}
//进入super方法protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {//parent就是当前的NIOServerSocketChannel,SelectionKey标记为读事件super(parent, ch, SelectionKey.OP_READ);}protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);//记录当前的channelthis.ch = ch;//记录这个channel所感兴趣的事件this.readInterestOp = readInterestOp;try {//然后将channel设置为非阻塞ch.configureBlocking(false);} catch (IOException e) {try {ch.close();} catch (IOException e2) {logger.warn("Failed to close a partially initialized socket.", e2);}throw new ChannelException("Failed to enter non-blocking mode.", e);}}//继续进入super方法protected AbstractChannel(Channel parent) {//指定父管道this.parent = parent;id = newId();unsafe = newUnsafe();//创建一个管道pipeline = newChannelPipeline();}

上面代码的流程就是将我们创建的SocketChannel封装为NioSocketChannel的过程,在这个过程中它记录了我们创建的Channel以及记录了Channel感兴趣的事件,以及创建了channel对应的管道(一种组合设计模式),但是并没有将Channel注册到Selector中。doReadMessages在这就执行完毕了,回到read方法:

public void read() {assert eventLoop().inEventLoop();final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {do {int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();allocHandle.readComplete();pipeline.fireChannelReadComplete();if (exception != null) {closed = closeOnReadError(exception);pipeline.fireExceptionCaught(exception);}if (closed) {inputShutdown = true;if (isOpen()) {close(voidPromise());}}} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();}}}

然后执行下面的重要代码:

int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i));}

首先这里就是readBuf也就是前面的list的大小(前面我们知道这里面存储的实际是NioSocketChannel,这个list大小就是客户端连接的数量),然后遍历所有的NioSocketChannel,然后调用pipeline.fireChannelRead方法。首先我们看看这个pipeline是什么:

final ChannelPipeline pipeline = pipeline();

思考这个pipeline是什么?

这个pipeline其实就是NioServerSocketChannelpipeline,所以这里调用的是ServerSocketChannel对应的fireChannelRead,也就是执行ServerSocketChannel对应的pipieline里面的Handler逻辑(调用channelread方法)。

那在这里调用做了什么是,这里需要回到上一篇博客,在服务端创建过程中,向其pipiline中加入了一个ServerBootStrapAcceptor的handler,所里这里执行的是这个handler

ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});

然后调用它的channelRead方法:

        @Override@SuppressWarnings("unchecked")//msg在这里就是前面创建的NioSocketChannelpublic void channelRead(ChannelHandlerContext ctx, Object msg) {//然后就获得了该ServerSocketChannel创建的子NioSocketChannelfinal Channel child = (Channel) msg;//然后获得子NioSocketChannel对应的pipeline(此时就是socketChannel对应的pipeline)//childHandler就是我们在写netty服务端程序时加入的一个通道初始化对象】/**.childHandler((ChannelInitializer)(ch)->{//对workerGroup的SocketChannel设置处理器ch.pipeline().addLast(new NettyServerHandler());});**///这里的代码就是向socketchannel的pipeline中加入了通道初始化对象的handlerchild.pipeline().addLast(childHandler);setChannelOptions(child, childOptions, logger);setAttributes(child, childAttrs);try {//childGroup就是初始化netty服务端事创建的workEventLoopGroupchildGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}}

childGroup.register(child)就将socketChannel注册到了WorkEventLoopGroup中,这里的逻辑和BooEventLoopGroup注册ServerSocketChannel中的逻辑是一样的(底层主要是将channel注册到selector中)。我们进入该方法:

//MultithreadEventLoopGroup的方法@Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}

next就是获得workerEventLoopGroup的下一个EventLoopGroup,然后调用EventLoopGroupregister方法。我们进入该方法:

 @Overridepublic ChannelFuture register(Channel channel) {return register(new DefaultChannelPromise(channel, this));}

然后这里的逻辑和ServerSocketChennel的注册逻辑是几乎一样的,这里我们知道在NioServerSocketChannel会被注册到对应的EventLoop的Selector上。这里同样是需要把NioSocketChannel注册到某个EventLoop的Selector上,这个逻辑就是在该方法中实现的。

在这里插入图片描述
到此SocketChannel的创建和注册就分析完了。现在Netty客户端就可以向服务端发送数据了,现在我们开始分析Netty服务端是如何处理这个流程的。这里我们同样需要回到上面的processSelectedKey方法。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {// If the channel implementation throws an exception because there is no event loop, we ignore this// because we are only trying to determine if ch is registered to this event loop and thus has authority// to close ch.return;}// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is// still healthy and should not be closed.// See https://github.com/netty/netty/issues/5125if (eventLoop != this || eventLoop == null) {return;}// close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());return;}try {int readyOps = k.readyOps();// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}

然后这里读事件是OP_READ事件,所以这里还是要调用unsafe.read();,继续进入read方法(实现类现在是NioByteUnsafe类):

     @Overridepublic final void read() {final ChannelConfig config = config();if (shouldBreakReadReady(config)) {clearReadPending();return;}final ChannelPipeline pipeline = pipeline();final ByteBufAllocator allocator = config.getAllocator();final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();allocHandle.reset(config);ByteBuf byteBuf = null;boolean close = false;try {do {//首先分配一个byteBufbyteBuf = allocHandle.allocate(allocator);//将收到的数据读入ByteBufallocHandle.lastBytesRead(doReadBytes(byteBuf));if (allocHandle.lastBytesRead() <= 0) {// nothing was read. release the buffer.byteBuf.release();byteBuf = null;close = allocHandle.lastBytesRead() < 0;if (close) {// There is nothing left to read as we received an EOF.readPending = false;}break;}allocHandle.incMessagesRead(1);readPending = false;pipeline.fireChannelRead(byteBuf);byteBuf = null;} while (allocHandle.continueReading());allocHandle.readComplete();pipeline.fireChannelReadComplete();if (close) {closeOnRead(pipeline);}} catch (Throwable t) {handleReadException(pipeline, byteBuf, t, close, allocHandle);} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();}}}}

上面首先 byteBuf = allocHandle.allocate(allocator);这就生成了一个ByteBuf(我们知道Netty底层数据交互都是给予ByteBuf实现的,这句代码底层涉及了0拷贝,我们这里后面在分析)。 allocHandle.lastBytesRead(doReadBytes(byteBuf));这句代码就将收到的客户端数据读取到了byteBuf中,首先我们先看doReadBytes(byteBuf)函数:

  @Overrideprotected int doReadBytes(ByteBuf byteBuf) throws Exception {final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.attemptedBytesRead(byteBuf.writableBytes());return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());}

byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());这里就是将channel中的数据,写到ByteBuf中。回到read()方法。下面就是调用pipeline.fireChannelRead(byteBuf);这里就是将ByteBuf中的数据经过pipeline中所有的Handler进行处理。到此,结合上一部分的源码分析,到此Netty所有的主线源码就分析完了。

3. Netty底层的零拷贝

了解Netty的零拷贝之前,我们需要对Netty的直接内存有所了解,直接内存并不是虚拟机运行时数据区的一部分,也不是Java虚拟机规范中定义的内存区域,某些情况下这部分内存也会被频繁的使用,而且也可能导致OOM问题,Java里面的DirectByteBuffer可以分配一块直接内存(堆外内存),元空间对应的内存也被称为直接内存,它们对应的是机器的物理内存。

ByteBuf类有两个方法,一个是allocate方法,它分配的内存就是直接在Java堆上面分配的,另一个方法是allocateDirect方法,它就是分配的直接内存。allocate我们可以想到它底层一定是new了一个数组,我们重点分析allocateDirect底层在干什么,我们进入该方法:

public static ByteBuffer allocateDirect(int capacity) {return new DirectByteBuffer(capacity);}

可以发现它底层就是创建了一个NIO的DirectByteBuffer对象。我们继续进入该对象:

DirectByteBuffer(int cap) {                   // package-privatesuper(-1, 0, cap, cap);boolean pa = VM.isDirectMemoryPageAligned();int ps = Bits.pageSize();long size = Math.max(1L, (long)cap + (pa ? ps : 0));Bits.reserveMemory(size, cap);long base = 0;try {base = unsafe.allocateMemory(size);} catch (OutOfMemoryError x) {Bits.unreserveMemory(size, cap);throw x;}unsafe.setMemory(base, size, (byte) 0);if (pa && (base % ps != 0)) {// Round up to page boundaryaddress = base + ps - (base & (ps - 1));} else {address = base;}cleaner = Cleaner.create(this, new Deallocator(base, size, cap));att = null;}

base = unsafe.allocateMemory(size);这里就是真正分配直接内存的逻辑,allocateMemory是一个本地方法,它底层就是用malloc分配了一块内存。又遇见创建的是堆外内存,我们创建的DirectByteBuffer是有这块堆外内存的引用的,当方法执行完毕,DirectByteBuffer会被JVM回收掉,所以对堆外内存的引用也会回收掉,而这个堆外内存也会被回收掉。

使用直接内存的优缺点如下:

  • 优点:不占用堆空间,减少了发生GC的可能,java虚拟机上,本地IO会直接操作直接内存(直接内存->系统调用->硬盘/网卡),而非直接内存则需要二次拷贝(堆内存->直接内存->系统调用->硬盘/网卡)
  • 缺点:初始化分配比较慢,没有JVM直接帮助管理内存,容易发生内存溢出。为例避免一致没有FULL GC,最终导致直接内存把物理内存耗完了。(我们可以通过-XX:MaxDirectMemorySize来指定直接内存的大小,当达到阈值是,会调用system.gc()来进行一次FULL GC,简介把那些没有被使用的直接内存回收掉)

在这里插入图片描述
对于上图,上面一部分不使用直接内存,首先客户端client发送数据过来,socket缓存区接受到数据,然后操作系统会把Socket缓存区的数据拷贝到直接内存(DMA控制器),这是第一次拷贝,然后JDK把直接内存的数据拷贝到堆内存,这是第二次拷贝。对于操作直接内存的就只需要前面的第一次拷贝即可。

Netty的接收和发送ByteBuf采用Direct BUFFERS,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。 我们回到Read方法,解决上面遗留的点:

byteBuf = allocHandle.allocate(allocator)

进入allocate方法:

  @Overridepublic ByteBuf allocate(ByteBufAllocator alloc) {return alloc.ioBuffer(guess());}@Overridepublic ByteBuf ioBuffer(int initialCapacity) {if (PlatformDependent.hasUnsafe() || isDirectBufferPooled()) {return directBuffer(initialCapacity);}return heapBuffer(initialCapacity);}

可以看到了 directBuffer(initialCapacity);,可以看出netty默认使用的是直接内存。

4. ByteBuf内存池设计

随着JVM虚拟机和JIT即时编译技术的发展,对象的分配和回收是个非常轻量的工作。但是对于缓存区Buffer(相当于一个内存块),情况缺稍有不同,特别是对于堆外内存直接内存的分配和回收,是一件非常耗时的操作。为了尽量避免重用缓存区,Netty提供了基于ByteBuf内存池的缓存区重用机制。需要的时候直接从池值中获取ByteBuf就行,使用完毕后会放回池子中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3029282.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

Amesim基础篇-热仿真常用模型库-Air Conditioning-Pipes

前言 基于上文对空调库各个元件的介绍&#xff0c;本文进一步将其中的管路展开。 管路介绍 1 摩擦阻力管&#xff08;R&#xff09;&#xff1a; 具有阻力特性的管路&#xff0c;通过管长以及管截面计算阻力。 2 可调节阻力管&#xff08;R&#xff09;&#xff1a; 只具有…

STM32CubeMX软件使用(超详细)

1、Cube启动页介绍 2、芯片选择页面介绍 3、输入自己的芯片型号&#xff0c;这里以STM32U575RIT6举例 4、芯片配置页码介绍 5、芯片外设配置栏详细说明 6、点击ClockConfiguration进行时钟树的配置&#xff0c;选择时钟树后可以选择自己想使用的时钟源&#xff0c;也可以直接输…

[c++]多态的分析

多态详细解读 多态的概念多态的构成条件 接口继承和实现继承: 多态的原理:动态绑定和静态绑定 多继承中的虚函数表 多态的概念 -通俗的来说&#xff1a;当不同的对象去完成某同一行为时&#xff0c;会产生不同的状态。 多态的构成条件 必须通过基类的指针或者引用调用虚函数1虚…

3---Linux编译器gcc/g++

一、程序的翻译过程&#xff1a;ESc->iso 1.1预处理&#xff1a;c->c 主要功能&#xff1a;宏替换、头文件的展开、条件编译、去注释&#xff1b;目的是让代码变得纯粹。条件编译&#xff0c;可以实现对代码的裁剪。比如对于不同用户&#xff0c;设置不同的宏常量&…

聚观早报 | 苹果新款iPad Pro发布;国产特斯拉4月交付量

聚观早报每日整理最值得关注的行业重点事件&#xff0c;帮助大家及时了解最新行业动态&#xff0c;每日读报&#xff0c;就读聚观365资讯简报。 整理丨Cutie 5月9日消息 苹果新款iPad Pro发布 国产特斯拉4月交付量 iOS 18新功能爆料 真我GT Neo6续航细节 三星Galaxy Z F…

【计算机毕业设计】springboot海产品加工销售一体化管理系统

时代在飞速进步&#xff0c;每个行业都在努力发展现在先进技术&#xff0c;通过这些先进的技术来提高自己的水平和优势&#xff0c;海产品加工销售一体化 管理系统当然不能排除在外。微信小程序海产品加工销售一体化管理系统是在实际应用和软件工程的开发原理之上&#xff0c;运…

驱动开发-字符设备驱动的注册与注销

1.注册字符设备驱动 #include<fs.h> int register_chrdev(unsigned int major,const char *name,const struct file_operations *fops) 函数功能&#xff1a;注册字符设备驱动 参数&#xff1a;major&#xff1a;主设备号 major>0:静态指定主设备号&#xff0c;不…

IOS 苹果IAP(内购)之创建沙盒账号

IOS 苹果IAP&#xff08;内购&#xff09;之创建沙盒账号 沙盒账号是什么&#xff1f;沙盒账号创建的前提条件沙盒账号创建沙盒账号使用流程沙盒账号注意事项 沙盒账号是什么&#xff1f; 如果IOS应用里面用到了苹果应用内付费&#xff08;IAP&#xff09;功能&#xff0c;那么…

Python管理PVE(Proxmox VE)云平台--节点资源统计

一、前言 写本脚本的初衷是因手动查看统计已分配的PVE资源过于耗时&#xff0c;因此写一个脚本一劳永逸&#xff0c;具体实现方法&#xff1a;利用Python的paramiko模块进行远程命令查看、统计PVE平台各节点已分配的cpu、内存、磁盘空间。 二、步骤 1.构建shell脚本 1.1 统计…

基于SpringBoot的全国风景区WebGIS按省展示实践

目录 前言 一、全国风景区信息介绍 1、全国范围内数据分布 2、全国风景区分布 3、PostGIS空间关联查询 二、后台查询的设计与实现 1、Model和Mapper层 2、业务层和控制层设计 三、WebGIS可视化 1、省份范围可视化 2、省级风景区可视化展示 3、成果展示 总结 前…

EMAP的简单开发(单表)

注意框架版本选择SPA_v1 一个数据源代表一个业务&#xff0c;选择一个就会产生对应的应用 index.jsp是展示的页面 conifg.js是index.jsp引用渲染的一个js文件 index.jsp中的全局变量中有一个pageMeta和contaextPath&#xff1a; pageMeta&#xff1a;是一个对象&#xff0c;…

办公技巧之合集文档 拆分_word

问题 如何将文档合集拆分为单独文档。 操作步骤 软件 word 365 原理简述&#xff1a; 在 word 大纲视图下&#xff0c;通过一级标题确定子文档范围&#xff0c;然后导出即可。 文档结构 从下图可见&#xff0c;文档结构为已建立大纲级别的文档&#xff0c;如果没有建立&a…

初探 JUC 并发编程:读写锁 ReentrantReadWriteLock 原理(8000 字源码详解)

本文中会涉及到一些前面 ReentrantLock 中学到的内容&#xff0c;先去阅读一下我关于独占锁 ReentrantLock 的源码解析阅读起来会更加清晰。 初探 JUC 并发编程&#xff1a;独占锁 ReentrantLock 底层源码解析 6.4&#xff09;读写锁 ReentrantReadWriteLock 原理 前面提到的 R…

Java入门——类和对象(上)

经读者反映与笔者考虑&#xff0c;近期以及往后内容更新将主要以java为主&#xff0c;望读者周知、见谅。 类与对象是什么&#xff1f; C语言是面向过程的&#xff0c;关注的是过程&#xff0c;分析出求解问题的步骤&#xff0c;通过函数调用逐步解决问题。 JAVA是基于面向对…

C++常用库函数——strcmp、strchr

1、strcmp&#xff1a;比较两个字符串的值是否相等 例如 char a1[6] "AbDeG",*s1 a1;char a2[6] "AbdEg",* s2 a2;s1 2;s2 2;printf("%d \n", strcmp(s1, s2));return(0); s1指向a1&#xff0c;s2指向a2&#xff0c;strcmp表示比较s1和s…

万物生长大会 | 创邻科技再登杭州准独角兽榜单

近日&#xff0c;由民建中央、中国科协指导&#xff0c;民建浙江省委会、中国投资发展促进会联合办的第八届万物生长大会在杭州举办。 在这场创新创业领域一年一度的盛会上&#xff0c;杭州市创业投资协会联合微链共同发布《2024杭州独角兽&准独角兽企业榜单》。榜单显示&…

数据库被攻击后出现1044 - access denied for user ‘root‘@‘% ‘ to database table

MySQL数据库被攻击后&#xff0c;数据库全部被删除&#xff0c;并且加一个一个勒索的数据&#xff0c;向我索要btc&#xff0c; 出现这个问题就是我的数据库密码太简单了&#xff0c;弱密码&#xff0c;被破解了&#xff0c;并且把我权限也给修改了 导致我操作数据库时&#…

vs2019 里 C++ 20规范的 string 类的源码注释

&#xff08;1&#xff09;读源码&#xff0c;可以让我们更好的使用这个类&#xff0c;掌握这个类&#xff0c;知道咱们使用了库代码以后&#xff0c;程序大致具体是怎么执行的。而不用担心程序出不知名的意外的问题。也便于随后的代码调试。 string 类实际是 库中 basic_strin…

国产开源物联网操作系统

软件介绍 RT-Thread是一个开源、中立、社区化发展的物联网操作系统&#xff0c;采用C语言编写&#xff0c;具有易移植的特性。该项目提供完整版和Nano版以满足不同设备的资源需求。 功能特点 1.内核层 RT-Thread内核包括多线程调度、信号量、邮箱、消息队列、内存管理、定时器…

Bokeh实战高级教程:用滑块控件打造动态数据可视化

在数据可视化的世界里&#xff0c;Bokeh无疑是一颗璀璨的明星。它不仅提供了丰富的图表类型&#xff0c;还支持强大的交互功能。今天&#xff0c;我们就来深入探讨如何使用Bokeh的滑块控件&#xff0c;轻松实现数据的动态展示。 首先&#xff0c;让我们从创建ColumnDataSource开…