拨开荷叶行,寻梦已然成。仙女莲花里,翩翩白鹭情。
IMG-LOGO
主页 文章列表 Netty原始码分析之ByteBuf参考计数

Netty原始码分析之ByteBuf参考计数

白鹭 - 2022-02-10 1960 0 0

参考计数是一种常用的存储器管理机制,是指将资源的被参考次数保存起来,当被参考次数变为零时就将其释放的程序,Netty在4.x版本开始使用参考计数机制进行部分物件的管理,其实作思路并不是特别复杂,它主要涉及跟踪某个物件被参考的次数,在Netty具体代码中需要通过参考计数进行存储器管理的物件,会基于ReferenceCounted界面实作,其中参考计数大于0时则代表该物件被参考不会释放,当参考计数减少到0时,该物件就会被释放,通过参考计数机制,Netty可以很好的实作存储器管理,参考计数减少到0时要么直接释放存储器,要么放回存储器池中重复利用,

1、基本示例

下面先通过一个简单示例看下Netty中参考计数机制的使用

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        
           ByteBuf recvBuffer = (ByteBuf) msg;// 申请ByteBuf 需要主动释放
        if(recvBuffer.isDirect()){
            System.err.println(true);
        }
        PooledByteBufAllocator allocator = new PooledByteBufAllocator(true);
        ByteBuf sendBuffer = allocator.buffer();//申请池化直接存储器
        System.err.println("sendBuffer的参考计数:"+sendBuffer.refCnt());
        sendBuffer.retain();
        System.err.println("sendBuffer的参考计数:"+sendBuffer.refCnt());
        sendBuffer.release();
        System.err.println("sendBuffer的参考计数:"+sendBuffer.refCnt());
try { byte[] bytesReady = new byte[recvBuffer.readableBytes()]; recvBuffer.readBytes(bytesReady); System.out.println("channelRead收到资料:"+ BytesUtils.toHexString(bytesReady)); byte[] sendBytes = new byte[] {0x7E,0x01,0x02,0x7e}; sendBuffer.writeBytes(sendBytes); ctx.writeAndFlush(sendBuffer); System.err.println("sendBuffer的参考计数:"+sendBuffer.refCnt()); }catch (Exception e) { // TODO: handle exception System.err.println(e.getMessage()); }finally { System.err.println("recvBuffer的参考计数:"+recvBuffer.refCnt()); recvBuffer.release(); //此处需要释放 System.err.println("recvBuffer的参考计数:"+recvBuffer.refCnt()); } }

输出结果如下,通过示例可以看出retain方法会增加计数参考,release方法会减少计数参考

true
sendBuffer的参考计数:1
sendBuffer的参考计数:2
sendBuffer的参考计数:1
sendBuffer的参考计数:0
recvBuffer的参考计数:1
recvBuffer的参考计数:0

AbstractReferenceCountedByteBuf实作了对ByteBuf的存储器管理,以实作存储器的回收、释放或者重复利用 ,AbstractReferenceCountedByteBuf的继承实作关系如下图所示

2、ReferenceCounted界面定义

首先是ReferenceCounted界面的定义

public interface ReferenceCounted {
    /**
     * Returns the reference count of this object.  If {@code 0}, it means this object has been deallocated.
     * 回传物件的参考计数
     */
    int refCnt();

    /**
     * Increases the reference count by {@code 1}.
     * 增加参考计数
     */
    ReferenceCounted retain();

    /**
     * Increases the reference count by the specified {@code increment}.
     * 参考计数增加指定值
     */
    ReferenceCounted retain(int increment);

    /**
     * Records the current access location of this object for debugging purposes.
     * If this object is determined to be leaked, the information recorded by this operation will be provided to you
     * via {@link ResourceLeakDetector}.  This method is a shortcut to {@link #touch(Object) touch(null)}.
     * 记录该物件的当前访问位置,用于除错,
     * 如果确定该物件被泄露,将提供此操作记录的信息给您
     */
    ReferenceCounted touch();

    /**
     * Records the current access location of this object with an additional arbitrary information for debugging
     * purposes.  If this object is determined to be leaked, the information recorded by this operation will be
     * provided to you via {@link ResourceLeakDetector}.
     * 记录该物件的当前访问位置,附加信息用于除错,
     * 如果确定该物件被泄露,将提供此操作记录的信息给您
     */
    ReferenceCounted touch(Object hint);

    /**
     * Decreases the reference count by {@code 1} and deallocates this object if the reference count reaches at
     * {@code 0}.
     *
     * @return {@code true} if and only if the reference count became {@code 0} and this object has been deallocated
     * 参考计数减少,如果计数变为了0,则释放物件资源
     * 如果物件资源被释放,则回传true,否则回传false
     */
    boolean release();

    /**
     * Decreases the reference count by the specified {@code decrement} and deallocates this object if the reference
     * count reaches at {@code 0}.
     *
     * @return {@code true} if and only if the reference count became {@code 0} and this object has been deallocated
     * 参考计数-指定值,如果计数变为了0,则释放物件资源或交回到物件池
     * 如果物件资源被释放,则回传true,否则回传false
     */
    boolean release(int decrement);
}

 

3、AbstractReferenceCountedByteBuf原始码分析

AbstractReferenceCountedByteBuf对ReferenceCounted进行了具体实作,retain与release两个方法通过CAS方式对参考计数refcnt进行操作,下面对其原始码进行简单分析

初始化

参考计数初始值refCnt 使用关键字volatile修饰,保证执行绪的可见性,同时使用偶数,参考增加通过位移操作实作,提高运算效率,

采用 AtomicIntegerFieldUpdater 物件,通过CAS方式更新refCnt,以实作执行绪安全,避免加锁,提高效率,

    private static final long REFCNT_FIELD_OFFSET;
    //采用 AtomicIntegerFieldUpdater 物件,CAS方式更新refCnt
    private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater =
            AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");

    //refCnt 实际值为偶数,采用位移操作提高效率
    // even => "real" refcount is (refCnt >>> 1); odd => "real" refcount is 0
    @SuppressWarnings("unused")
    private volatile int refCnt = 2;

retain操作

上面示例中每呼叫一次retain方法,参考计数就会累加一次,我们看下原始码中retain的具体实作

    public ByteBuf retain() {
        return retain0(1);
    }

    @Override
    public ByteBuf retain(int increment) {
        return retain0(checkPositive(increment, "increment"));
    }

    //计数器增值操作
    private ByteBuf retain0(final int increment) {
        // all changes to the raw count are 2x the "real" change
        int adjustedIncrement = increment << 1; // overflow OK here  真正的计数都是2倍递增
        int oldRef = refCntUpdater.getAndAdd(this, adjustedIncrement); //通过CAS方式递增并获取原值
        if ((oldRef & 1) != 0) {//判断奇偶,正常情况这里应该都是偶数
            throw new IllegalReferenceCountException(0, increment);
        }
        // don't pass 0!  如果计数小于等于0,以及整型范围越界(0x7fffffff+1)抛出例外
        if ((oldRef <= 0 && oldRef + adjustedIncrement >= 0)
                || (oldRef >= 0 && oldRef + adjustedIncrement < oldRef)) {
            // overflow case
            refCntUpdater.getAndAdd(this, -adjustedIncrement);
            throw new IllegalReferenceCountException(realRefCnt(oldRef), increment);
        }
        return this;
    }

release操作

通过呼叫release方法,对参考计数做减值操作,原始码中release的具体实作要注意的是由于参考计数以2倍递增,所以参考次数= 参考计数/2,当decrement=refcnt/2 也就是参考次数=释放次数时,代表ByteBuf不再被参考,执行存储器释放或放回存储器池的操作,

    //计数器减值操作
    private boolean release0(int decrement) {
        int rawCnt = nonVolatileRawCnt(), realCnt = toLiveRealCnt(rawCnt, decrement); //对计数器进行除以2操作,也就是参考次数
        /**
         * /这里如注意 你传入的减值自变量decrement  = realCnt 时 等同于 参考次数=释放次数,直接进行释放操作
         */
        if (decrement == realCnt) {
            if (refCntUpdater.compareAndSet(this, rawCnt, 1)) { //CAS方式置为1
                deallocate();//存储器释放或放回存储器池
                return true;
            }
            return retryRelease0(decrement);//进入具体操作
        }
        return releaseNonFinal0(decrement, rawCnt, realCnt);
    }

    private boolean releaseNonFinal0(int decrement, int rawCnt, int realCnt) {
        //如果decrement 小于 realCnt,通过CAS方式减去decrement*2
        if (decrement < realCnt
                // all changes to the raw count are 2x the "real" change
                && refCntUpdater.compareAndSet(this, rawCnt, rawCnt - (decrement << 1))) {
            return false;
        }
        return retryRelease0(decrement);
    }

    private boolean retryRelease0(int decrement) {
        for (;;) {
            int rawCnt = refCntUpdater.get(this), realCnt = toLiveRealCnt(rawCnt, decrement);
            if (decrement == realCnt) {
                if (refCntUpdater.compareAndSet(this, rawCnt, 1)) {
                    deallocate();
                    return true;
                }
            } else if (decrement < realCnt) {//如果decrement 小于 realCnt,通过CAS方式减去decrement*2
                // all changes to the raw count are 2x the "real" change
                if (refCntUpdater.compareAndSet(this, rawCnt, rawCnt - (decrement << 1))) {
                    return false;
                }
            } else {
                throw new IllegalReferenceCountException(realCnt, -decrement);
            }
            Thread.yield(); // this benefits throughput under high contention
        }
    }

    /**
     * Like {@link #realRefCnt(int)} but throws if refCnt == 0
     */
    private static int toLiveRealCnt(int rawCnt, int decrement) {
        if ((rawCnt & 1) == 0) {
            return rawCnt >>> 1;
        }
        // odd rawCnt => already deallocated
        throw new IllegalReferenceCountException(0, -decrement);
    }

4、总结 

以上我们围绕AbstractReferenceCountedByteBuf对Netty参考计数的具体实作进行了分析,可以看到Netty在实作参考计数的同时,结合CAS、位移计算等方式,保证了运算效率和执行绪安全,在实际项目中我们遇到类似应用场景也都可以借鉴参考,如资料发送次数,商品剩余数量等计数场景的实作,希望本文对大家能有所帮助,其中如有不足与不正确的地方还望指正与海涵,十分感谢,

 

关注微信公众号,查看更多技术文章,

 

 

标签:

0 评论

发表评论

您的电子邮件地址不会被公开。 必填的字段已做标记 *