❶ Netty内存管理
ByteBuf底层是一个字节数组,内部维护了两个索引:readerIndex与writerIndex。其中0 --> readerIndex部分为可丢弃字节,表示已被读取过,readerIndex --> writerIndex部分为可读字节,writerIndex --> capacity部分为可写字节。ByteBuf支持动态扩容,在实例化时会传入maxCapacity,当writerIndex达到capacity且capacity小于maxCapacity时会进行自动扩容。
ByteBuf子类可以按照以下三个纬度进行分类:
在进入内存分配核心逻辑前,我们先对Netty内存分配相关概念做下了解。Netty内存管理借鉴jemalloc思想,为了提高内存利用率,根据不同内存规格使用不同的分配策略,并且使用缓存提高内存分配效率。
Netty有四种内存规格,tiny表示16B ~ 512B之间的内存块,samll表示512B ~ 8K之间的内存块,normal表示8K ~ 16M的内存块,Huge表示大于16M的内存块。
Chunk是Netty向操作系统申请内存的单位,默认一次向操作系统申请16M内存,Netty内部将Chunk按照Page大小划分为2048块。我们申请内存时如果大于16M,则Netty会直接向操作系统申请对应大小内存,如果申请内存在8k到16M之间则会分配对应个数Page进行使用。如果申请内存远小于8K,那么直接使用一个Page会造成内存浪费,SubPage就是对Page进行再次分配,减少内存浪费。
如果申请内存小于8K,会对Page进行再次划分为SubPage,SubPage大小为Page大小/申请内存大小。SubPage又划分为tiny与small两种。
负责管理从操作系统中申请到的内存块,Netty为了减少多线程竞争arena,采用多arena设计,arena数量默认为2倍CPU核心数。线程与arena关系如下:
线程本地缓存,负责创建线程缓存PoolThreadCache。PoolThreadCache中会初始化三种类型MemoryRegionCache数组,用以缓存线程中不同规格的内存块,分别为:tiny、small、normal。tiny类型数组缓存的内存块大小为16B ~ 512B之间,samll类型数组缓存的内存块大小为512B ~ 8K之间的内存块,normal类型数组缓存的内存块大小受DEFAULT_MAX_CACHED_BUFFER_CAPACITY配置影响,默认只缓存8K、16K、32K三种类型内存块。
内存块缓存容器,负责缓存tiny、small、normal三种内存块。其内部维护一个队列,用于缓存同种内存大小的内存块。
负责管理从操作系统申请的内存,内部采用伙伴算法以Page为单位进行内存的分配与管理。
负责管理Chunk列表,根据内存使用率,分为:qInit、q000、q025、q050、q075、q100六种。每个PoolChunkList中存储内存使用率相同的Chunk,Chunk以双向链表进行关联,同时不同使用率的PoolChunkList也以双向列表进行关联。这样做的目的是因为随着内存的分配,Chunk使用率会发生变化,以链表形式方便Chunk在不同使用率列表进行移动。
PoolSubpage负责tiny、small类型内存的管理与分配,实现基于SLAB内存分配算法。PoolArena中有两种PoolSubpage类型数组,分别为:tinySubpagePools、smallSubpagePools。tinySubpagePools负责管理tiny类型内存,数组大小为512/16=32种。smallSubpagePools负责管理small类型内存,数组大小为4。
PoolSubpage数组中存储不同内存大小的PoolSubpage节点,相同大小节点以链表进行关联。PoolSubpage内部使用位图数组记录内存分配情况。
Netty通过ByteBufAllocator进行内存分配,ByteBufAllocator有两个实现类:PooledByteBufAllocator与UnpooledByteBufAllocator,其中,是否在堆内存或者直接内存分配与是否使用unsafe进行读写操作都封装在其实现类中。
我们先看下ByteBufAllocator类图:
PooledByteBufAllocator与UnpooledByteBufAllocator内存分配类似,可以通过newHeapBuffer与newDirectBuffer进行分配内存,我们以PooledByteBufAllocator为例分析下内存分配流程:
以PooledByteBufAllocator为例来分析下内存分配器实例化过程。首先调用PooledByteBufAllocator#DEFAULT方法实例化PooledByteBufAllocator
PooledByteBufAllocator实例化时会初始化几个比较重要的属性:
最终会调用PooledByteBufAllocator如下构造方法:
PooledByteBufAllocator构造方法主要做了两件事情,一是:初始化PoolThreadLocalCache属性,二是:初始化堆内存与直接内存类型PoolArena数组,我们进入PoolArena.DirectArena构造方法,来分析下PoolArena初始化时主要做了哪些事情:
DirectArena构造方法会调用其父类PoolArena构造方法,在PoolArena构造方法中会初始化tiny类型与small类型PoolSubpage数组,并初始化六种不同内存使用率的PoolChunkList,每个PoolChunkList以双向链表进行关联。
以分配直接内存为例,分析内存分配的主要流程:
PooledByteBufAllocator#directBuffer方法最终会调用如下构造方法,其中maxCapacity为Integer.MAX_VALUE:
该方法主要分三步,第一步:获取线程缓存,第二步:分配内存,第三步:将ByteBuf转为具有内存泄漏检测功能的ByteBuf,我们来分析下每一步具体做了哪些事情:
1.获取线程缓存,从PoolThreadLocalCache中获取PoolThreadCache,首次调用会先进行进行初始化,并将结果缓存下来:
初始化方法在PoolThreadLocalCache中,首先会循环找到使用最少的PoolArena,然后调用PoolThreadCache构造方法创建PoolThreadCache:
PoolThreadCache构造方法中会初始化tinySubPageDirectCaches、smallSubPageDirectCaches、normalDirectCaches这三种MemoryRegionCache数组:
createSubPageCaches方法中会创建并初始化MemoryRegionCache数组,其中tiny类型数组大小为32,small类型数组大小为4,normal类型数组大小为3:
最终会调用MemoryRegionCache构造方法进行创建,我们看下MemoryRegionCache结构:
2.分配内存,首先会获取PooledByteBuf,然后进行内存分配:
newByteBuf方法会尝试从对象池里面获取pooledByteBuf,如果没有则进行创建。allocate方法为内存分配核心逻辑,主要分为两种分配方式:page级别内存分配(8k 16M)、subPage级别内存分配(0 8K)、huge级别内存分配(>16M)。page与subPage级别内存分配首先会尝试从缓存上进行内存分配,如果分配失败则重新申请内存。huge级别内存分配不会通过缓存进行分配。我们看下allocate方法主要流程:
首先尝试从缓存中进行分配:
cacheForTiney方法先根据分配内存大小定位到对应的tinySubPageDirectCaches数组中MemoryRegionCache,如果没有定位到则不能在缓存中进行分配。如果有则从MemoryRegionCache对应的队列中弹出一个PooledByteBuf对象进行初始化,同时为了复用PooledByteBuf对象,会将其缓存下来。
如果从缓存中分配不成功,则会从对应的PoolSubpage数组上进行分配,如果PoolSubpage数组对应的内存大小下标中有可分配空间则进行分配,并对PooledByteBuf进行初始化。
如果在PoolSubpage数组上分配不成功,则表示没有可以用来分配的SubPage,则会尝试从Page上进行分配。先尝试从不同内存使用率的ChunkList进行分配,如果仍分配不成功,则表示没有可以用来分配的Chunk,此时会创建新的Chunk进行内存分配。
进入PoolChunk#allocate方法看下分配流程:
allocateRun方法用来分配大于等于8K的内存,allocateSubpage用来分配小于8K的内存,进入allocateSubpage方法:
内存分配成功后会调用initBuf方法初始化PoolByteBuf:
Page级别内存分配和SubPage级别类似,同样是先从缓存中进行分配,分配不成功则尝试从不同内存使用率的ChunkList进行分配,如果仍分配不成功,则表示没有可以用来分配的Chunk,此时会创建新的Chunk进行内存分配,不同点在allocate方法中:
因为大于16M的内存分配Netty不会进行缓存,所以Huge级别内存分配会直接申请内存并进行初始化:
调用ByteBuf#release方法会进行内存释放,方法中会判断当前byteBuf 是否被引用,如果没有被引用, 则调用deallocate方法进行释放:
进入deallocate方法看下内存释放流程:
free方法会把释放的内存加入到缓存,如果加入缓存不成功则会标记这段内存为未使用:
recycle方法会将PoolByteBuf对象放入到对象池中:
❷ Netty零拷贝
netty的零拷贝技术主要基于以下几点:
1. 堆外内存,也叫直接内存
2. Composite Buffers
3. 文件传输基于linux的sendfile机制
Linux的设计的初衷:给不同的操作给与不同的“权限”。Linux操作系统就将权限等级分为了2个等级,分别就是 内核态和用户态。
内核态是属于cpu的特权工作模式,可以操作计算机设备中的任何元件,包括网卡、硬盘、内存等等。
用户态是应用程序的工作模式,只能操作已申请的内存空间,无法操作外围设备。当应用程序需要与网卡、硬盘等外围设备进行交互时,需要通过系统提供的接口,来调用外围设备。
堆内存中的数据如果需要发送到外围设备,需要调用系统的接口,将数据拷贝到堆外内存中,发送到外围设备中。
而Netty的ByteBuffer不经过堆内存,直接在堆外内存中进行读写,省去一步拷贝操作。
需要注意的是,堆外内存只能通过主动调用回收或者Full GC回收,如果使用不当,容易造成内存溢出。
Composite Buffers
Netty提供了Composite Buffers来组合多个buffer。传统的buffer如果要合并的话,需要新建一个buffer,将原来的buffer拷贝到新的buffer中进行合并。而Composite Buffers相当于buffer的集合,保存了每个buffer对象,使物理的buffer合并变为逻辑上的buffer的合并。
文件传输
Netty的文件传输是依赖于操作系统的零拷贝技术。
一般我们读取文件都是调用操作系统接口,操作系统在应用程序读取文件时,会首先判断文件是否在内核缓冲区中,如果不在,则需要将文件从磁盘或socket读取到内核缓冲区中。
在写入文件时,操作系统会将文件先写入内核缓冲区,再写入到socket中。
我们传统做文件拷贝或传输时,会先在应用程序内存中构建一个缓冲区,通过这个缓冲区与操作系统做数据交换。这样无疑会增加了文件的多次拷贝。
传统的文件传输过程如下:
1. 构建byte[]数组来缓冲文件
2. 切换到内核态,将文件先在内核缓冲区中缓存
3. 将内核缓冲区的数据拷贝到应用程序缓冲区的byte[]数组中
4. 切换回用户态
5. 执行写入操作,切换回内核态
6. 将数据再拷贝一份到内核中的socket缓冲区
7. 切换回用户态
8. 操作系统将数据异步刷新到网卡
传统的文件传输过程,会造成操作系统在用户态和内核态多次切换,非常影响性能。
而linux在内核2.1中引入了sendfile操作,过程如下:
1. 读取数据时,sendfile系统调用导致文件内容通过DMA模块被复制到内核缓冲区中
2. 写入数据时,数据直接复制到socket关联的缓冲区(linux内核2.4已删除这一步,取而代之的是,只有记录数据位置和长度的描述符被加入到socket缓冲区中。DMA模块将数据直接从内核缓冲区传递给协议引擎)
3. 最后将socket buffer中的数据到网卡设备中(protocol buffer)发送
netty的FileRegion 包下的FileChannel.tranferTo即是基于sendfile机制来实现文件传输的
❸ Netty writeAndFlush解析
Netty事件分为入站事件与出站事件,可以通过ChannelPipline或者ChannelHandlerContext进行事件传播。通过ChannelPipline传播入站事件,它将被从ChannelPipeline的头部开始一直被传播到ChannelPipeline的尾端,出站事件则从尾端开始传递到头部。通过ChannelHandlerContext传播入站事件,它将被从下一个ChannelHandler开始直至传递到尾端,出站事件则从下一个ChannelHandler直至传递到头部。
Netty为了提高传输数据的效率,在写出数据时,会先将数据(ByteBuf)缓存到ChannelOutboundBuffer中,等到调用flush方法时才会将ChannelOutboundBuffer中的数据写入到socket缓冲区。
ChannelOutboundBuffer中有三个重要属性:
从其属性可以看出,ChannelOutboundBuffer内部是一个链表结构,里面有三个指针:
ChannelOutboundBuffer中有两个比较重要的方法,addMessage:将数据以链表形式缓存下来,addFlush:移动链表指针,将缓存的数据标记为已刷新,注意此时并没有将数据写入到socket缓冲区。接下来我们看下两个方法的实现:
我们进入其addMessage方法分析下它是怎么缓存数据的:
当第一次添加数据数,会将数据封装为Entry,此时tailEntry、unflushedEntry指针指向这个Entry,flushedEntry指针此时为null。每次添加数据都会生成新的Entry,并将tailEntry指针指向该Entry,而unflushedEntry指针则一直指向最初添加的Entry,我们通过画图展示下:
第一次添加:
第N次添加:
为了防止缓存数据过大,Netty对缓存数据的大小做了限制:
addMessage方法最后会调用incrementPendingOutboundBytes方法记录已缓存的数据大小(totalPendingSize),如果该大小超过了写缓冲区高水位阈值(默认64K),则更新不可写标志(unwritable),并传播Channel可写状态发生变化事件:fireChannelWritabilityChanged:
移动链表指针,将缓存的数据标记为已刷新,并设置每个数据节点状态为不可取消:
执行完addFlush方法后,链表图示如下:
通过ChannelHandlerContext#writeAndFlush方法来分析下Netty是如何将数据通过网络进行传输的:
ChannelHandlerContext#writeAndFlush方法最终会调用其子类AbstractChannelHandlerContext的writeAndFlush方法:
主要逻辑在write方法中:
write方法主要做了两件事,一是:找到下一个ChannelHandlerContext。二是:调用下一个ChannelHandlerContext的w riteAndFlush方法传播事件。writeAndFlush方法是一个出站事件,前面我们也讲过对于出站事件,通过ChannelHandlerContext进行事件传播,事件是从pipline链中找到当前ChannelHandlerContext的下一个ChannelHandlerContext进行传播直至头部(HeadContext),在这期间我们需要自定义编码器对传输的Java对象进行编码,转换为ByteBuf对象,最终事件会传递到HeadContext进行处理。
invokeWriteAndFlush方法主要做了两件事,一是:调用invokeWrite0方法将数据放入Netty缓冲区中(ChannelOutboundBuffer),二是:调用invokeFlush0方法将缓冲区数据通过NioSocketChannel写入到socket缓冲区。
invokeWrite0方法内部会调用ChannelOutboundHandler#write方法:
前面说过,出站事件最终会传播到HeadContext,在传播到HeadContext之前我们需要自定义编码器对Java对象进行编码,将Java对象编码为ByteBuf,关于编码器本章节暂不进行解析。我们进入HeadContext的write方法:
HeadContext#write方法中会调用AbstractChannelUnsafe#write方法:
该方法主要做了三件事情,一:对数据进行过滤转换,二:估测数据大小,三:缓存数据。我们先看下filterOutboundMessage方法:
一、对数据进行过滤转换:
filterOutboundMessage方法首先会对数据进行过滤,如果数据不是ByteBuf或者FileRegion类型,则直接抛出异常。如果数据是ByteBuf类型,则判断数据是否为直接直接内存,如果不是则转换为直接内存以提升性能。
二、估测数据大小:
三、缓存数据:
最后会调用ChannelOutboundBuffer#addMessage方法将数据缓存到链表中,关于addMessage方法可以回顾下文章中的Netty缓冲区部分。
回到AbstractChannelHandlerContext#invokeWriteAndFlush方法,方法内部在调用完invokeWrite0方法将数据放入到缓存后,会调用invokeFlush0方法,将缓存中的数据写入到socket缓冲区。invokeFlush0方法内部会调用ChannelOutboundHandler#flush方法:
flush方法最终会将事件传播到HeadContext的flush方法:
HeadContext#flush方法中会调用AbstractChannelUnsafe#flush方法:
方法主要做了两件事,一:调用ChannelOutboundBuffer#addFlush方法,移动链表指针,将缓存的数据标记为已刷新。二:调用flush0方法将缓存中数据写入到socket缓冲区。关于addFlush方法可以看文中Netty缓冲区部分,我们直接进入flush0方法:
flush0方法主要做了两件事,一:判断是否有挂起的刷新。二:调用父类flush0方法。
一、判断是否有挂起的刷新
文中提到写入数据时,当socket缓冲区没有可用空间时会设置不可写状态,并注册OP_WRITE事件,等待socket缓冲区有空闲空间时会触发forceFlush,我们进入到isFlushPending方法看下方法是如何判断的:
二、调用父类flush0方法
写入socket缓冲区的具体逻辑在AbstractNioChannel#AbstractNioUnsafe父类AbstractChannel#AbstractUnsafe中:
核心逻辑在doWrite方法中,我们进入到AbstractChannel子类NioSocketChannel的doWrite方法看下具体实现:
NioSocketChannel#doWrite方法根据nioBufferCnt大小执行不同的写逻辑,如果为0则调用AbstractNioByteChannel#doWrite方法。如果nioBufferCnt为1或者大于1,则调用NioSocketChannel不同的重载方法进行处理。注意,写数据时自旋次数默认为16,也就是说如果执行16次write后仍有数据未写完,则调用incompleteWrite方法将flush操作封装为一个任务,放入到队列中,目的是不阻塞其他任务。另外,如果调用NioSocketChannel#write方法后,返回的localWrittenBytes为0,则表示socket缓冲区空间不足,则注册OP_WRITE事件,等待有可用空间时会触发该事件,然后调用forceFlush方法继续写入数据。
❹ netty内存池
1:PooledByteBufAllocator 内存池入口,应用通过该类从内存池中申请内存
PoolThreadCache:线程缓存池
Recycler:上文的Recycler是个对象池,储存的是相应类型的堆中对象的集合。
2:PooledByteBufAllocator获取bytebuf步骤:
1)如果是pool类型,先在线程对象池中获取一个相应类型的poolBuffer对象,这个对象是在堆中的,也是返回的对象,然后会使用PoolThreadCache这个线程内存池,获取一块合适大小的内存。接下来根据这块内存的信息对这个poolBuffer对象进行init,比如设置内存的address,length等,用户就可以通过这个buffer对象对相应的内存进行操作。
2)注意区别线程对象池和线程内存池。线程对象池指同一类型的对象集合,用户也只能通过对象来进行操作,比如读写数据。而线程内存池是许多内存块的集合,用户通过对象读写数据还是要定位到实际的内存地址(虚拟内存地址)。内存池就通过把对象和一块内存绑定,对象的读写操作都会反应到这块内存上。
3)内存是有限的,为了最大化内存的利用率以及提高内存分配回收的效率,netty实现了类似jemalloc内存分配的方式给对象分配内存。
4)获取一个buffer对象-->给buffer对象属性赋值(赋的值就是内存的地址、大小)
5) 比如现在需要分配一个UnpooledHeapByteBuf类型的ByteBuf对象,其初始大小为20,最大容量为100,因为是unpooled,意味着这各类型的ByteBuf是没有对象池的,需要的时候直接new一个即可
(1)new UnpooledHeapByteBuf(this,initialCapacity,maxCapacity);
单纯的一个没有经过初始化(成员变量没有赋值)的ByteBuf是不能进行读写的,因为其读写方法都需要确切的读写内存地址的。对于UnpooledHeapByteBuf类型的ByteBuf,其是一个HeapByteBuf,Heap意味着这个ByteBuf的读写操作是在JVM堆上进行的,其读写内存地址需要在jvm对上进行分配,所以在初始化时要根据需要的大小创建一个Byte类型的数组Byte[]对UnpooledDirectByteBuf进行初始化,接下来对这个Buf的读写都会反应在这个字节数组中。
6)对于UnpooledDirectByteBuf类型的ByteBuf,Direct意味着这个ByteBuf的读写实现方法是以直接内存为基础进行实现的,其读写区域是在直接内存上,在初始化时构造一个DirectByteBuffer对象(nio中的ByteBuffer)赋值给UnpooledDirectByteBuf的buf属性,接下来对这个UnpooledDirectByteBuf类型对象的读写都会反应到这个ByteBuffer上。
7)对于pool类型的,
❺ netty3.6.2中写数据的过程,以及写数据写不出去后怎么处理
netty写数据的时候,会先放到一个缓存队列AbstractNioChannel.writeBufferQueue中,这个队列是WriteRequestQueue
Java代码
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
……
} else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.getChannel();
boolean offered = channel.writeBufferQueue.offer(event);//写到channel的writeBufferQueue
assert offered;
channel.worker.writeFromUserCode(channel);
}
}
WriteRequestQueue的offer方法中会根据缓存消息的总大小(字节数)判断是否超过了高水位线highWaterMark,如果第一次超过了超过高水位线,就会fireChannelInterestChanged;后边如果仍然一直往队列放数据,缓存的消息的大小持续超过高水位线的时候,不会再fireChannelInterestChanged。
Java代码
public boolean offer(MessageEvent e) {
boolean success = queue.offer(e);
assert success;
int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
if (newWriteBufferSize >= highWaterMark) {
if (newWriteBufferSize - messageSize < highWaterMark) {
highWaterMarkCounter.incrementAndGet();
if (!notifying.get()) {
notifying.set(Boolean.TRUE);
fireChannelInterestChanged(AbstractNioChannel.this);
notifying.set(Boolean.FALSE);
}
}
}
return true;
}
fireChannelInterestChanged这个会调到SimpleChannelUpstreamHandler.handleUpstream,触发SimpleChannelUpstreamHandler.channelInterestChanged,可以通过继承这个方法来自定义做些事情。高水位的值可以通过Bootstrap设置,最终会调到DefaultNioSocketChannelConfig.setOption。writeBufferHighWaterMark默认值为64K
Java代码
public boolean setOption(String key, Object value) {
if (super.setOption(key, value)) {
return true;
}
if ("writeBufferHighWaterMark".equals(key)) {
setWriteBufferHighWaterMark0(ConversionUtil.toInt(value));
} else if ("writeBufferLowWaterMark".equals(key)) {
setWriteBufferLowWaterMark0(ConversionUtil.toInt(value));
} else if ("writeSpinCount".equals(key)) {
setWriteSpinCount(ConversionUtil.toInt(value));
} else if ("".equals(key)) {
(() value);
} else if ("receiveBufferSizePredictor".equals(key)) {
setReceiveBufferSizePredictor((ReceiveBufferSizePredictor) value);
} else {
return false;
}
return true;
}
然后在write0的时候会从队列拉数据,拉数据的时候,如果发现本次拉的数据会导致缓存的数据大小(字节)从低水位writeBufferLowWaterMark之上,掉到了低水位之下,即跨过了低水位,会再次触发fireChannelInterestChanged事件。writeBufferLowWaterMark默认值为32K
Java代码
public MessageEvent poll() {
MessageEvent e = queue.poll();
if (e != null) {
int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
if (newWriteBufferSize + messageSize >= lowWaterMark) {//本次拉取,是的缓存数据大小掉到了低水位之下
highWaterMarkCounter.decrementAndGet();
if (isConnected() && !notifying.get()) {
notifying.set(Boolean.TRUE);
fireChannelInterestChanged(AbstractNioChannel.this);
notifying.set(Boolean.FALSE);
}
}
}
}
return e;
}
超过高水位和低于低水位都会触发fireChannelInterestChanged,怎么区分呢?通过AbstractChannel. isWritable(),如果channel的interestOps里边有注册过OP_WRITE,则是不可写的,否则是可写的
Java代码
public boolean isWritable() {
return (getInterestOps() & OP_WRITE) == 0;
}
public int getInterestOps() {
if (!isOpen()) {
return Channel.OP_WRITE;
}
int interestOps = getRawInterestOps();
int writeBufferSize = this.writeBufferSize.get();
if (writeBufferSize != 0) {
if (highWaterMarkCounter.get() > 0) {//还记得这个值,放数据到发送队列的时候值+=1,从队列拉数据出来的时候值-=1
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
if (writeBufferSize >= lowWaterMark) {//缓存队列数据量,超过高水位,也超过了低水位,意味着高水位>低水位,此时等于注册写操作
interestOps |= Channel.OP_WRITE;
} else {
interestOps &= ~Channel.OP_WRITE;//缓存队列数据量,超过高水位但是低于低水位,意味着低水位>高水位,此时等于没有注册写操作
}
} else {//超过高水位counter<=0,意味着当前数据量小于高水位
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
if (writeBufferSize >= highWaterMark) {//这里,缓存数据量仍然高于高水位.....并发?按道理说channel的处理是单线程处理的,此时等于注册写操作
interestOps |= Channel.OP_WRITE;
} else {
interestOps &= ~Channel.OP_WRITE;
}
}
} else {
interestOps &= ~Channel.OP_WRITE;//写队列没数据,没有注册写操作
}
return interestOps;
}
即,如果超过高水位isWritable()==false,低于低水位isWritable()==true,低水位优先级高于高水位,即如果 当前水位>低水位 则不可写,否则可写
如果在通过netty向某机器写数据,但是写很缓慢,则会导致数据都缓存到netty的发送队列中,如果不做控制,可能会导致full gc/cms gc频繁,甚至最终OOM。所以可以考虑用高水位和低水位的值来控制netty的缓存队列,即用AbstractChannel.isWritable来控制是否继续写,如果AbstractChannel.isWritable==false,则丢弃数据,或者记录发送数据的状态,待后续缓存数据队列水位下降到安全水位后再发送。
❻ Netty里的pipeline
pipeline是netty里最重要的几个组件之一,我们从四个部分讲诉pipeline是怎么运作的
1、pipeline增加最开始先加了synchronized防止并发
2、然后检查这个handler是否已经被添加了,如果没有被添加,就添加
如果这个handler实例不是sharable的且已经被添加,就会报错,handler被加上Sharable,就是共享的handler,handler的实例就可以服用了
isSharable方法会使用ThreadLocal缓存已经判断是否共享的handler
添加节点到双向链表
3、创建一个新的context,如果没有传入名称,会生成一个
这里会创建一个DefaultChannelHandlerContext,创建的时候涉及到这个上下文的命名,如果创建的时候没有传入名称,会自动生成一个
这里会先从缓存里面获取是否这个handler已经缓存这个handler的名称,如果没有,就会用这个handler的类名称生成一个,放到缓存里
如果这个名称已经在这个pipeline里的某一个节点使用了,那就会把最后一位的数字加1,继续判断有没有,直到生成的名称还没有被使用
4、执行节点被添加的handlerAdded,这里执行有三种方式:1)如果节点还没有被注册到eventLoop,则创建一个任务后面触发,2)如果当前线程不是executor线程,则在executor线程触发handlerAdded,3)如果当前线程是executor线程,则立即触发handlerAdded
第一种方式,会在handlerRegistered时候触发创建的任务
第二种方式,在executor里执行
第三种方式,直接执行
我们看pipeline的构造函数,会发现创建pipeline的时候会创建一个头部节点一个尾部节点,一个新建的pipeline都是会有这两个节点的,那这两个节点是用来干嘛的呢,让我们分析一下
channel读到数据之后,会调用fireChannelRead让各个节点处理,这里第一个节点是headContext,netty的pipeline执行读取数据的流程是按添加的顺序。比如先后分别添加了ABC三个节点,那么读取数据流转的流程就是headContext->A->B->C->tailContext,如果channelRead的实现是ctx.fireChannelRead,那么就会按照这样的流程流转
写数据的时候,ctx.channel().writeAndFlush和ctx.writeAndFlush会有不同的实现
ctx.channel().writeAndFlush调用的是io.netty.channel.AbstractChannel#writeAndFlush(java.lang.Object),会调用到pipeline的writeAndFlush,然后调用到tail的writeAndFlush,此时调用的顺序是tailContext->C->B->A->headContext
ctx.writeAndFlush会调用io.netty.channel.AbstractChannelHandlerContext#writeAndFlush(java.lang.Object)
也就是当前上下文开始执行,如果当前执行到B节点,那么write执行的流程就是B->A->headContext
❼ Netty怎样增大缓冲区
有现成的方法,你说的那种限制还根本没涉及到粘包解码的问题,而是netty底层限制了接收字节缓存区大小为1024,下面这样就行了
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 1024, 65536))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(10, 10, 0));
ch.pipeline().addLast(new HeartBeat());
ch.pipeline().addLast(new ByteStreamDecoder());
ch.pipeline().addLast(new ByteStreamEncoder());
ch.pipeline().addLast(new ServerStreamHandler());
}
});