當前位置:首頁 » 硬碟大全 » netty通道如何緩存起來
擴展閱讀
webinf下怎麼引入js 2023-08-31 21:54:13
堡壘機怎麼打開web 2023-08-31 21:54:11

netty通道如何緩存起來

發布時間: 2022-12-28 05:18:35

❶ Netty內存管理

ByteBuf底層是一個位元組數組,內部維護了兩個索引:readerIndex與writerIndex。其中0 --> readerIndex部分為可丟棄位元組,表示已被讀取過,readerIndex --> writerIndex部分為可讀位元組,writerIndex --> capacity部分為可寫位元組。ByteBuf支持動態擴容,在實例化時會傳入maxCapacity,當writerIndex達到capacity且capacity小於maxCapacity時會進行自動擴容。

ByteBuf子類可以按照以下三個緯度進行分類:

在進入內存分配核心邏輯前,我們先對Netty內存分配相關概念做下了解。Netty內存管理借鑒jemalloc思想,為了提高內存利用率,根據不同內存規格使用不同的分配策略,並且使用緩存提高內存分配效率。

Netty有四種內存規格,tiny表示16B ~ 512B之間的內存塊,samll表示512B ~ 8K之間的內存塊,normal表示8K ~ 16M的內存塊,Huge表示大於16M的內存塊。

Chunk是Netty向操作系統申請內存的單位,默認一次向操作系統申請16M內存,Netty內部將Chunk按照Page大小劃分為2048塊。我們申請內存時如果大於16M,則Netty會直接向操作系統申請對應大小內存,如果申請內存在8k到16M之間則會分配對應個數Page進行使用。如果申請內存遠小於8K,那麼直接使用一個Page會造成內存浪費,SubPage就是對Page進行再次分配,減少內存浪費。

如果申請內存小於8K,會對Page進行再次劃分為SubPage,SubPage大小為Page大小/申請內存大小。SubPage又劃分為tiny與small兩種。

負責管理從操作系統中申請到的內存塊,Netty為了減少多線程競爭arena,採用多arena設計,arena數量默認為2倍CPU核心數。線程與arena關系如下:

線程本地緩存,負責創建線程緩存PoolThreadCache。PoolThreadCache中會初始化三種類型MemoryRegionCache數組,用以緩存線程中不同規格的內存塊,分別為:tiny、small、normal。tiny類型數組緩存的內存塊大小為16B ~ 512B之間,samll類型數組緩存的內存塊大小為512B ~ 8K之間的內存塊,normal類型數組緩存的內存塊大小受DEFAULT_MAX_CACHED_BUFFER_CAPACITY配置影響,默認只緩存8K、16K、32K三種類型內存塊。

內存塊緩存容器,負責緩存tiny、small、normal三種內存塊。其內部維護一個隊列,用於緩存同種內存大小的內存塊。

負責管理從操作系統申請的內存,內部採用夥伴演算法以Page為單位進行內存的分配與管理。

負責管理Chunk列表,根據內存使用率,分為:qInit、q000、q025、q050、q075、q100六種。每個PoolChunkList中存儲內存使用率相同的Chunk,Chunk以雙向鏈表進行關聯,同時不同使用率的PoolChunkList也以雙向列表進行關聯。這樣做的目的是因為隨著內存的分配,Chunk使用率會發生變化,以鏈表形式方便Chunk在不同使用率列表進行移動。

PoolSubpage負責tiny、small類型內存的管理與分配,實現基於SLAB內存分配演算法。PoolArena中有兩種PoolSubpage類型數組,分別為:tinySubpagePools、smallSubpagePools。tinySubpagePools負責管理tiny類型內存,數組大小為512/16=32種。smallSubpagePools負責管理small類型內存,數組大小為4。

PoolSubpage數組中存儲不同內存大小的PoolSubpage節點,相同大小節點以鏈表進行關聯。PoolSubpage內部使用點陣圖數組記錄內存分配情況。

Netty通過ByteBufAllocator進行內存分配,ByteBufAllocator有兩個實現類:PooledByteBufAllocator與UnpooledByteBufAllocator,其中,是否在堆內存或者直接內存分配與是否使用unsafe進行讀寫操作都封裝在其實現類中。

我們先看下ByteBufAllocator類圖:

PooledByteBufAllocator與UnpooledByteBufAllocator內存分配類似,可以通過newHeapBuffer與newDirectBuffer進行分配內存,我們以PooledByteBufAllocator為例分析下內存分配流程:

以PooledByteBufAllocator為例來分析下內存分配器實例化過程。首先調用PooledByteBufAllocator#DEFAULT方法實例化PooledByteBufAllocator

PooledByteBufAllocator實例化時會初始化幾個比較重要的屬性:

最終會調用PooledByteBufAllocator如下構造方法:

PooledByteBufAllocator構造方法主要做了兩件事情,一是:初始化PoolThreadLocalCache屬性,二是:初始化堆內存與直接內存類型PoolArena數組,我們進入PoolArena.DirectArena構造方法,來分析下PoolArena初始化時主要做了哪些事情:

DirectArena構造方法會調用其父類PoolArena構造方法,在PoolArena構造方法中會初始化tiny類型與small類型PoolSubpage數組,並初始化六種不同內存使用率的PoolChunkList,每個PoolChunkList以雙向鏈表進行關聯。

以分配直接內存為例,分析內存分配的主要流程:

PooledByteBufAllocator#directBuffer方法最終會調用如下構造方法,其中maxCapacity為Integer.MAX_VALUE:

該方法主要分三步,第一步:獲取線程緩存,第二步:分配內存,第三步:將ByteBuf轉為具有內存泄漏檢測功能的ByteBuf,我們來分析下每一步具體做了哪些事情:

1.獲取線程緩存,從PoolThreadLocalCache中獲取PoolThreadCache,首次調用會先進行進行初始化,並將結果緩存下來:

初始化方法在PoolThreadLocalCache中,首先會循環找到使用最少的PoolArena,然後調用PoolThreadCache構造方法創建PoolThreadCache:

PoolThreadCache構造方法中會初始化tinySubPageDirectCaches、smallSubPageDirectCaches、normalDirectCaches這三種MemoryRegionCache數組:

createSubPageCaches方法中會創建並初始化MemoryRegionCache數組,其中tiny類型數組大小為32,small類型數組大小為4,normal類型數組大小為3:

最終會調用MemoryRegionCache構造方法進行創建,我們看下MemoryRegionCache結構:

2.分配內存,首先會獲取PooledByteBuf,然後進行內存分配:

newByteBuf方法會嘗試從對象池裡面獲取pooledByteBuf,如果沒有則進行創建。allocate方法為內存分配核心邏輯,主要分為兩種分配方式:page級別內存分配(8k 16M)、subPage級別內存分配(0 8K)、huge級別內存分配(>16M)。page與subPage級別內存分配首先會嘗試從緩存上進行內存分配,如果分配失敗則重新申請內存。huge級別內存分配不會通過緩存進行分配。我們看下allocate方法主要流程:

首先嘗試從緩存中進行分配:

cacheForTiney方法先根據分配內存大小定位到對應的tinySubPageDirectCaches數組中MemoryRegionCache,如果沒有定位到則不能在緩存中進行分配。如果有則從MemoryRegionCache對應的隊列中彈出一個PooledByteBuf對象進行初始化,同時為了復用PooledByteBuf對象,會將其緩存下來。

如果從緩存中分配不成功,則會從對應的PoolSubpage數組上進行分配,如果PoolSubpage數組對應的內存大小下標中有可分配空間則進行分配,並對PooledByteBuf進行初始化。

如果在PoolSubpage數組上分配不成功,則表示沒有可以用來分配的SubPage,則會嘗試從Page上進行分配。先嘗試從不同內存使用率的ChunkList進行分配,如果仍分配不成功,則表示沒有可以用來分配的Chunk,此時會創建新的Chunk進行內存分配。

進入PoolChunk#allocate方法看下分配流程:

allocateRun方法用來分配大於等於8K的內存,allocateSubpage用來分配小於8K的內存,進入allocateSubpage方法:

內存分配成功後會調用initBuf方法初始化PoolByteBuf:

Page級別內存分配和SubPage級別類似,同樣是先從緩存中進行分配,分配不成功則嘗試從不同內存使用率的ChunkList進行分配,如果仍分配不成功,則表示沒有可以用來分配的Chunk,此時會創建新的Chunk進行內存分配,不同點在allocate方法中:

因為大於16M的內存分配Netty不會進行緩存,所以Huge級別內存分配會直接申請內存並進行初始化:

調用ByteBuf#release方法會進行內存釋放,方法中會判斷當前byteBuf 是否被引用,如果沒有被引用, 則調用deallocate方法進行釋放:

進入deallocate方法看下內存釋放流程:

free方法會把釋放的內存加入到緩存,如果加入緩存不成功則會標記這段內存為未使用:

recycle方法會將PoolByteBuf對象放入到對象池中:

❷ Netty零拷貝

netty的零拷貝技術主要基於以下幾點:

1. 堆外內存,也叫直接內存

2. Composite Buffers

3. 文件傳輸基於linux的sendfile機制

Linux的設計的初衷:給不同的操作給與不同的「許可權」。Linux操作系統就將許可權等級分為了2個等級,分別就是 內核態和用戶態。

內核態是屬於cpu的特權工作模式,可以操作計算機設備中的任何元件,包括網卡、硬碟、內存等等。

用戶態是應用程序的工作模式,只能操作已申請的內存空間,無法操作外圍設備。當應用程序需要與網卡、硬碟等外圍設備進行交互時,需要通過系統提供的介面,來調用外圍設備。

堆內存中的數據如果需要發送到外圍設備,需要調用系統的介面,將數據拷貝到堆外內存中,發送到外圍設備中。

而Netty的ByteBuffer不經過堆內存,直接在堆外內存中進行讀寫,省去一步拷貝操作。

需要注意的是,堆外內存只能通過主動調用回收或者Full GC回收,如果使用不當,容易造成內存溢出。

Composite Buffers

Netty提供了Composite Buffers來組合多個buffer。傳統的buffer如果要合並的話,需要新建一個buffer,將原來的buffer拷貝到新的buffer中進行合並。而Composite Buffers相當於buffer的集合,保存了每個buffer對象,使物理的buffer合並變為邏輯上的buffer的合並。

文件傳輸

Netty的文件傳輸是依賴於操作系統的零拷貝技術。

一般我們讀取文件都是調用操作系統介面,操作系統在應用程序讀取文件時,會首先判斷文件是否在內核緩沖區中,如果不在,則需要將文件從磁碟或socket讀取到內核緩沖區中。

在寫入文件時,操作系統會將文件先寫入內核緩沖區,再寫入到socket中。

我們傳統做文件拷貝或傳輸時,會先在應用程序內存中構建一個緩沖區,通過這個緩沖區與操作系統做數據交換。這樣無疑會增加了文件的多次拷貝。

傳統的文件傳輸過程如下:

1. 構建byte[]數組來緩沖文件

2. 切換到內核態,將文件先在內核緩沖區中緩存

3. 將內核緩沖區的數據拷貝到應用程序緩沖區的byte[]數組中

4. 切換回用戶態

5. 執行寫入操作,切換回內核態

6. 將數據再拷貝一份到內核中的socket緩沖區

7. 切換回用戶態

8. 操作系統將數據非同步刷新到網卡

傳統的文件傳輸過程,會造成操作系統在用戶態和內核態多次切換,非常影響性能。

而linux在內核2.1中引入了sendfile操作,過程如下:

1. 讀取數據時,sendfile系統調用導致文件內容通過DMA模塊被復制到內核緩沖區中

2. 寫入數據時,數據直接復制到socket關聯的緩沖區(linux內核2.4已刪除這一步,取而代之的是,只有記錄數據位置和長度的描述符被加入到socket緩沖區中。DMA模塊將數據直接從內核緩沖區傳遞給協議引擎)

3. 最後將socket buffer中的數據到網卡設備中(protocol buffer)發送

netty的FileRegion 包下的FileChannel.tranferTo即是基於sendfile機制來實現文件傳輸的

❸ Netty writeAndFlush解析

Netty事件分為入站事件與出站事件,可以通過ChannelPipline或者ChannelHandlerContext進行事件傳播。通過ChannelPipline傳播入站事件,它將被從ChannelPipeline的頭部開始一直被傳播到ChannelPipeline的尾端,出站事件則從尾端開始傳遞到頭部。通過ChannelHandlerContext傳播入站事件,它將被從下一個ChannelHandler開始直至傳遞到尾端,出站事件則從下一個ChannelHandler直至傳遞到頭部。

Netty為了提高傳輸數據的效率,在寫出數據時,會先將數據(ByteBuf)緩存到ChannelOutboundBuffer中,等到調用flush方法時才會將ChannelOutboundBuffer中的數據寫入到socket緩沖區。

ChannelOutboundBuffer中有三個重要屬性:

從其屬性可以看出,ChannelOutboundBuffer內部是一個鏈表結構,裡面有三個指針:

ChannelOutboundBuffer中有兩個比較重要的方法,addMessage:將數據以鏈表形式緩存下來,addFlush:移動鏈表指針,將緩存的數據標記為已刷新,注意此時並沒有將數據寫入到socket緩沖區。接下來我們看下兩個方法的實現:

我們進入其addMessage方法分析下它是怎麼緩存數據的:

當第一次添加數據數,會將數據封裝為Entry,此時tailEntry、unflushedEntry指針指向這個Entry,flushedEntry指針此時為null。每次添加數據都會生成新的Entry,並將tailEntry指針指向該Entry,而unflushedEntry指針則一直指向最初添加的Entry,我們通過畫圖展示下:

第一次添加:

第N次添加:

為了防止緩存數據過大,Netty對緩存數據的大小做了限制:

addMessage方法最後會調用incrementPendingOutboundBytes方法記錄已緩存的數據大小(totalPendingSize),如果該大小超過了寫緩沖區高水位閾值(默認64K),則更新不可寫標志(unwritable),並傳播Channel可寫狀態發生變化事件:fireChannelWritabilityChanged:

移動鏈表指針,將緩存的數據標記為已刷新,並設置每個數據節點狀態為不可取消:

執行完addFlush方法後,鏈表圖示如下:

通過ChannelHandlerContext#writeAndFlush方法來分析下Netty是如何將數據通過網路進行傳輸的:

ChannelHandlerContext#writeAndFlush方法最終會調用其子類AbstractChannelHandlerContext的writeAndFlush方法:

主要邏輯在write方法中:

write方法主要做了兩件事,一是:找到下一個ChannelHandlerContext。二是:調用下一個ChannelHandlerContext的w riteAndFlush方法傳播事件。writeAndFlush方法是一個出站事件,前面我們也講過對於出站事件,通過ChannelHandlerContext進行事件傳播,事件是從pipline鏈中找到當前ChannelHandlerContext的下一個ChannelHandlerContext進行傳播直至頭部(HeadContext),在這期間我們需要自定義編碼器對傳輸的Java對象進行編碼,轉換為ByteBuf對象,最終事件會傳遞到HeadContext進行處理。

invokeWriteAndFlush方法主要做了兩件事,一是:調用invokeWrite0方法將數據放入Netty緩沖區中(ChannelOutboundBuffer),二是:調用invokeFlush0方法將緩沖區數據通過NioSocketChannel寫入到socket緩沖區。

invokeWrite0方法內部會調用ChannelOutboundHandler#write方法:

前面說過,出站事件最終會傳播到HeadContext,在傳播到HeadContext之前我們需要自定義編碼器對Java對象進行編碼,將Java對象編碼為ByteBuf,關於編碼器本章節暫不進行解析。我們進入HeadContext的write方法:

HeadContext#write方法中會調用AbstractChannelUnsafe#write方法:

該方法主要做了三件事情,一:對數據進行過濾轉換,二:估測數據大小,三:緩存數據。我們先看下filterOutboundMessage方法:

一、對數據進行過濾轉換:

filterOutboundMessage方法首先會對數據進行過濾,如果數據不是ByteBuf或者FileRegion類型,則直接拋出異常。如果數據是ByteBuf類型,則判斷數據是否為直接直接內存,如果不是則轉換為直接內存以提升性能。

二、估測數據大小:

三、緩存數據:

最後會調用ChannelOutboundBuffer#addMessage方法將數據緩存到鏈表中,關於addMessage方法可以回顧下文章中的Netty緩沖區部分。

回到AbstractChannelHandlerContext#invokeWriteAndFlush方法,方法內部在調用完invokeWrite0方法將數據放入到緩存後,會調用invokeFlush0方法,將緩存中的數據寫入到socket緩沖區。invokeFlush0方法內部會調用ChannelOutboundHandler#flush方法:

flush方法最終會將事件傳播到HeadContext的flush方法:

HeadContext#flush方法中會調用AbstractChannelUnsafe#flush方法:

方法主要做了兩件事,一:調用ChannelOutboundBuffer#addFlush方法,移動鏈表指針,將緩存的數據標記為已刷新。二:調用flush0方法將緩存中數據寫入到socket緩沖區。關於addFlush方法可以看文中Netty緩沖區部分,我們直接進入flush0方法:

flush0方法主要做了兩件事,一:判斷是否有掛起的刷新。二:調用父類flush0方法。

一、判斷是否有掛起的刷新

文中提到寫入數據時,當socket緩沖區沒有可用空間時會設置不可寫狀態,並注冊OP_WRITE事件,等待socket緩沖區有空閑空間時會觸發forceFlush,我們進入到isFlushPending方法看下方法是如何判斷的:

二、調用父類flush0方法

寫入socket緩沖區的具體邏輯在AbstractNioChannel#AbstractNioUnsafe父類AbstractChannel#AbstractUnsafe中:

核心邏輯在doWrite方法中,我們進入到AbstractChannel子類NioSocketChannel的doWrite方法看下具體實現:

NioSocketChannel#doWrite方法根據nioBufferCnt大小執行不同的寫邏輯,如果為0則調用AbstractNioByteChannel#doWrite方法。如果nioBufferCnt為1或者大於1,則調用NioSocketChannel不同的重載方法進行處理。注意,寫數據時自旋次數默認為16,也就是說如果執行16次write後仍有數據未寫完,則調用incompleteWrite方法將flush操作封裝為一個任務,放入到隊列中,目的是不阻塞其他任務。另外,如果調用NioSocketChannel#write方法後,返回的localWrittenBytes為0,則表示socket緩沖區空間不足,則注冊OP_WRITE事件,等待有可用空間時會觸發該事件,然後調用forceFlush方法繼續寫入數據。

❹ netty內存池

1:PooledByteBufAllocator    內存池入口,應用通過該類從內存池中申請內存

PoolThreadCache:線程緩存池

Recycler:上文的Recycler是個對象池,儲存的是相應類型的堆中對象的集合。

2:PooledByteBufAllocator獲取bytebuf步驟:

      1)如果是pool類型,先在線程對象池中獲取一個相應類型的poolBuffer對象,這個對象是在堆中的,也是返回的對象,然後會使用PoolThreadCache這個線程內存池,獲取一塊合適大小的內存。接下來根據這塊內存的信息對這個poolBuffer對象進行init,比如設置內存的address,length等,用戶就可以通過這個buffer對象對相應的內存進行操作。

      2)注意區別線程對象池和線程內存池。線程對象池指同一類型的對象集合,用戶也只能通過對象來進行操作,比如讀寫數據。而線程內存池是許多內存塊的集合,用戶通過對象讀寫數據還是要定位到實際的內存地址(虛擬內存地址)。內存池就通過把對象和一塊內存綁定,對象的讀寫操作都會反應到這塊內存上。

3)內存是有限的,為了最大化內存的利用率以及提高內存分配回收的效率,netty實現了類似jemalloc內存分配的方式給對象分配內存。

4)獲取一個buffer對象-->給buffer對象屬性賦值(賦的值就是內存的地址、大小)

5)  比如現在需要分配一個UnpooledHeapByteBuf類型的ByteBuf對象,其初始大小為20,最大容量為100,因為是unpooled,意味著這各類型的ByteBuf是沒有對象池的,需要的時候直接new一個即可

                          (1)new            UnpooledHeapByteBuf(this,initialCapacity,maxCapacity);

單純的一個沒有經過初始化(成員變數沒有賦值)的ByteBuf是不能進行讀寫的,因為其讀寫方法都需要確切的讀寫內存地址的。對於UnpooledHeapByteBuf類型的ByteBuf,其是一個HeapByteBuf,Heap意味著這個ByteBuf的讀寫操作是在JVM堆上進行的,其讀寫內存地址需要在jvm對上進行分配,所以在初始化時要根據需要的大小創建一個Byte類型的數組Byte[]對UnpooledDirectByteBuf進行初始化,接下來對這個Buf的讀寫都會反應在這個位元組數組中。

6)對於UnpooledDirectByteBuf類型的ByteBuf,Direct意味著這個ByteBuf的讀寫實現方法是以直接內存為基礎進行實現的,其讀寫區域是在直接內存上,在初始化時構造一個DirectByteBuffer對象(nio中的ByteBuffer)賦值給UnpooledDirectByteBuf的buf屬性,接下來對這個UnpooledDirectByteBuf類型對象的讀寫都會反應到這個ByteBuffer上。

7)對於pool類型的,

❺ netty3.6.2中寫數據的過程,以及寫數據寫不出去後怎麼處理

netty寫數據的時候,會先放到一個緩存隊列AbstractNioChannel.writeBufferQueue中,這個隊列是WriteRequestQueue
Java代碼
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
……
} else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.getChannel();
boolean offered = channel.writeBufferQueue.offer(event);//寫到channel的writeBufferQueue
assert offered;
channel.worker.writeFromUserCode(channel);
}
}
WriteRequestQueue的offer方法中會根據緩存消息的總大小(位元組數)判斷是否超過了高水位線highWaterMark,如果第一次超過了超過高水位線,就會fireChannelInterestChanged;後邊如果仍然一直往隊列放數據,緩存的消息的大小持續超過高水位線的時候,不會再fireChannelInterestChanged。
Java代碼
public boolean offer(MessageEvent e) {
boolean success = queue.offer(e);
assert success;

int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
int highWaterMark = getConfig().getWriteBufferHighWaterMark();

if (newWriteBufferSize >= highWaterMark) {
if (newWriteBufferSize - messageSize < highWaterMark) {
highWaterMarkCounter.incrementAndGet();
if (!notifying.get()) {
notifying.set(Boolean.TRUE);
fireChannelInterestChanged(AbstractNioChannel.this);
notifying.set(Boolean.FALSE);
}
}
}
return true;
}
fireChannelInterestChanged這個會調到SimpleChannelUpstreamHandler.handleUpstream,觸發SimpleChannelUpstreamHandler.channelInterestChanged,可以通過繼承這個方法來自定義做些事情。高水位的值可以通過Bootstrap設置,最終會調到DefaultNioSocketChannelConfig.setOption。writeBufferHighWaterMark默認值為64K
Java代碼
public boolean setOption(String key, Object value) {
if (super.setOption(key, value)) {
return true;
}
if ("writeBufferHighWaterMark".equals(key)) {
setWriteBufferHighWaterMark0(ConversionUtil.toInt(value));
} else if ("writeBufferLowWaterMark".equals(key)) {
setWriteBufferLowWaterMark0(ConversionUtil.toInt(value));
} else if ("writeSpinCount".equals(key)) {
setWriteSpinCount(ConversionUtil.toInt(value));
} else if ("".equals(key)) {
(() value);
} else if ("receiveBufferSizePredictor".equals(key)) {
setReceiveBufferSizePredictor((ReceiveBufferSizePredictor) value);
} else {
return false;
}
return true;
}
然後在write0的時候會從隊列拉數據,拉數據的時候,如果發現本次拉的數據會導致緩存的數據大小(位元組)從低水位writeBufferLowWaterMark之上,掉到了低水位之下,即跨過了低水位,會再次觸發fireChannelInterestChanged事件。writeBufferLowWaterMark默認值為32K
Java代碼
public MessageEvent poll() {
MessageEvent e = queue.poll();
if (e != null) {
int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();


if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
if (newWriteBufferSize + messageSize >= lowWaterMark) {//本次拉取,是的緩存數據大小掉到了低水位之下
highWaterMarkCounter.decrementAndGet();
if (isConnected() && !notifying.get()) {
notifying.set(Boolean.TRUE);
fireChannelInterestChanged(AbstractNioChannel.this);
notifying.set(Boolean.FALSE);
}
}
}
}
return e;
}
超過高水位和低於低水位都會觸發fireChannelInterestChanged,怎麼區分呢?通過AbstractChannel. isWritable(),如果channel的interestOps里邊有注冊過OP_WRITE,則是不可寫的,否則是可寫的
Java代碼
public boolean isWritable() {
return (getInterestOps() & OP_WRITE) == 0;
}
public int getInterestOps() {
if (!isOpen()) {
return Channel.OP_WRITE;
}

int interestOps = getRawInterestOps();
int writeBufferSize = this.writeBufferSize.get();
if (writeBufferSize != 0) {
if (highWaterMarkCounter.get() > 0) {//還記得這個值,放數據到發送隊列的時候值+=1,從隊列拉數據出來的時候值-=1
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
if (writeBufferSize >= lowWaterMark) {//緩存隊列數據量,超過高水位,也超過了低水位,意味著高水位>低水位,此時等於注冊寫操作
interestOps |= Channel.OP_WRITE;
} else {
interestOps &= ~Channel.OP_WRITE;//緩存隊列數據量,超過高水位但是低於低水位,意味著低水位>高水位,此時等於沒有注冊寫操作
}
} else {//超過高水位counter<=0,意味著當前數據量小於高水位
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
if (writeBufferSize >= highWaterMark) {//這里,緩存數據量仍然高於高水位.....並發?按道理說channel的處理是單線程處理的,此時等於注冊寫操作
interestOps |= Channel.OP_WRITE;
} else {
interestOps &= ~Channel.OP_WRITE;
}
}
} else {
interestOps &= ~Channel.OP_WRITE;//寫隊列沒數據,沒有注冊寫操作
}

return interestOps;
}
即,如果超過高水位isWritable()==false,低於低水位isWritable()==true,低水位優先順序高於高水位,即如果 當前水位>低水位 則不可寫,否則可寫

如果在通過netty向某機器寫數據,但是寫很緩慢,則會導致數據都緩存到netty的發送隊列中,如果不做控制,可能會導致full gc/cms gc頻繁,甚至最終OOM。所以可以考慮用高水位和低水位的值來控制netty的緩存隊列,即用AbstractChannel.isWritable來控制是否繼續寫,如果AbstractChannel.isWritable==false,則丟棄數據,或者記錄發送數據的狀態,待後續緩存數據隊列水位下降到安全水位後再發送。

❻ Netty里的pipeline

pipeline是netty里最重要的幾個組件之一,我們從四個部分講訴pipeline是怎麼運作的

1、pipeline增加最開始先加了synchronized防止並發

2、然後檢查這個handler是否已經被添加了,如果沒有被添加,就添加

如果這個handler實例不是sharable的且已經被添加,就會報錯,handler被加上Sharable,就是共享的handler,handler的實例就可以服用了

isSharable方法會使用ThreadLocal緩存已經判斷是否共享的handler

添加節點到雙向鏈表

3、創建一個新的context,如果沒有傳入名稱,會生成一個

這里會創建一個DefaultChannelHandlerContext,創建的時候涉及到這個上下文的命名,如果創建的時候沒有傳入名稱,會自動生成一個

這里會先從緩存裡面獲取是否這個handler已經緩存這個handler的名稱,如果沒有,就會用這個handler的類名稱生成一個,放到緩存里

如果這個名稱已經在這個pipeline里的某一個節點使用了,那就會把最後一位的數字加1,繼續判斷有沒有,直到生成的名稱還沒有被使用

4、執行節點被添加的handlerAdded,這里執行有三種方式:1)如果節點還沒有被注冊到eventLoop,則創建一個任務後面觸發,2)如果當前線程不是executor線程,則在executor線程觸發handlerAdded,3)如果當前線程是executor線程,則立即觸發handlerAdded

第一種方式,會在handlerRegistered時候觸發創建的任務

第二種方式,在executor里執行

第三種方式,直接執行

我們看pipeline的構造函數,會發現創建pipeline的時候會創建一個頭部節點一個尾部節點,一個新建的pipeline都是會有這兩個節點的,那這兩個節點是用來幹嘛的呢,讓我們分析一下

channel讀到數據之後,會調用fireChannelRead讓各個節點處理,這里第一個節點是headContext,netty的pipeline執行讀取數據的流程是按添加的順序。比如先後分別添加了ABC三個節點,那麼讀取數據流轉的流程就是headContext->A->B->C->tailContext,如果channelRead的實現是ctx.fireChannelRead,那麼就會按照這樣的流程流轉

寫數據的時候,ctx.channel().writeAndFlush和ctx.writeAndFlush會有不同的實現

ctx.channel().writeAndFlush調用的是io.netty.channel.AbstractChannel#writeAndFlush(java.lang.Object),會調用到pipeline的writeAndFlush,然後調用到tail的writeAndFlush,此時調用的順序是tailContext->C->B->A->headContext

ctx.writeAndFlush會調用io.netty.channel.AbstractChannelHandlerContext#writeAndFlush(java.lang.Object)

也就是當前上下文開始執行,如果當前執行到B節點,那麼write執行的流程就是B->A->headContext

❼ Netty怎樣增大緩沖區

有現成的方法,你說的那種限制還根本沒涉及到粘包解碼的問題,而是netty底層限制了接收位元組緩存區大小為1024,下面這樣就行了
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 1024, 65536))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(10, 10, 0));
ch.pipeline().addLast(new HeartBeat());
ch.pipeline().addLast(new ByteStreamDecoder());
ch.pipeline().addLast(new ByteStreamEncoder());
ch.pipeline().addLast(new ServerStreamHandler());
}
});