亚洲精品久久久久久久久久久,亚洲国产精品一区二区制服,亚洲精品午夜精品,国产成人精品综合在线观看,最近2019中文字幕一页二页

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

緩存之美:萬文詳解 Caffeine 實(shí)現(xiàn)原理(上)

京東云 ? 來源:jf_75140285 ? 作者:jf_75140285 ? 2025-08-05 14:49 ? 次閱讀
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

文章將采用“總-分-總”的結(jié)構(gòu)對(duì)配置固定大小元素驅(qū)逐策略的 Caffeine 緩存進(jìn)行介紹,首先會(huì)講解它的實(shí)現(xiàn)原理,在大家對(duì)它有一個(gè)概念之后再深入具體源碼的細(xì)節(jié)之中,理解它的設(shè)計(jì)理念,從中能學(xué)習(xí)到用于統(tǒng)計(jì)元素訪問頻率的 Count-Min Sketch 數(shù)據(jù)結(jié)構(gòu)、理解內(nèi)存屏障和如何避免緩存?zhèn)喂蚕韱栴}、MPSC 多線程設(shè)計(jì)模式、高性能緩存的設(shè)計(jì)思想和多線程間的協(xié)調(diào)方案等等,文章最后會(huì)對(duì)全文內(nèi)容進(jìn)行總結(jié),希望大家能有所收獲的同時(shí)在未來對(duì)本地緩存選型時(shí)提供完整的理論依據(jù)。

Caffeine 緩存原理圖如下:

wKgZPGiRqV6ACzEGABFA9ACvlZA860.png

它使用 ConcurrentHashMap 保存數(shù)據(jù),并在該數(shù)據(jù)結(jié)構(gòu)的基礎(chǔ)上創(chuàng)建了窗口區(qū)、試用區(qū)和保護(hù)區(qū),用于管理元素的生命周期,各個(gè)區(qū)的數(shù)據(jù)結(jié)構(gòu)是使用了 LRU 算法的雙端隊(duì)列,隨著緩存的命中率變化,窗口區(qū)和保護(hù)區(qū)大小會(huì)自動(dòng)調(diào)節(jié)以適應(yīng)當(dāng)前訪問模式。在對(duì)元素進(jìn)行驅(qū)逐時(shí),使用了 TinyLFU 算法,會(huì)優(yōu)先將頻率低的元素驅(qū)逐,訪問頻率使用 Count-Min Sketch 數(shù)據(jù)結(jié)構(gòu)記錄,它能在保證較高準(zhǔn)確率(93.75%)的情況下占用較少內(nèi)存空間。讀、寫操作分別會(huì)向 ReadBuffer 和 WriteBuffer 中添加“讀/寫后任務(wù)”,這兩個(gè)緩沖區(qū)的設(shè)計(jì)均采用了 MPSC 多生產(chǎn)者單消費(fèi)者的多線程設(shè)計(jì)模式。緩沖區(qū)中任務(wù)的消費(fèi)由維護(hù)方法 maintenance 中 drainReadBuffer 和 drainWriteBuffer 實(shí)現(xiàn),維護(hù)方法通過添加同步鎖,保證任務(wù)只由單線程執(zhí)行,這種設(shè)計(jì)參考了 WAL(Write-Ahead Logging)思想,即:先寫日志,再執(zhí)行操作,先把操作記錄在緩沖區(qū),然后在合適的時(shí)機(jī)異步、批量地執(zhí)行緩沖區(qū)中的任務(wù)。維護(hù)方法除了這些作用外,還負(fù)責(zé)元素在各個(gè)分區(qū)的移動(dòng)、頻率的更新、元素的驅(qū)逐等操作。

接下來的源碼分析以如下測試用例為例:先分析構(gòu)造方法,了解緩存初始化過程中創(chuàng)建的重要數(shù)據(jù)結(jié)構(gòu)和關(guān)鍵字段,然后再深入添加元素的方法(put),該方法相對(duì)復(fù)雜,也是 Caffeine 緩存的核心,理解了這部分內(nèi)容,文章剩余的內(nèi)容理解起來會(huì)非常容易,接著分析獲取元素的方法(getIfPresent),最后再回到核心的維護(hù)方法 maintenance 中,這樣便基本理解了 Caffeine 緩存的運(yùn)行原理,需要注意的是,因?yàn)槲覀儾⑽粗付ň彺嬖氐倪^期時(shí)間,所以與此相關(guān)的內(nèi)容如時(shí)間過期策略和時(shí)間輪等內(nèi)容不會(huì)專門介紹。

public class TestReadSourceCode {

    @Test
    public void doRead() {
        // read constructor
        Cache cache = Caffeine.newBuilder()
                .maximumSize(10_000)
                .build();

        // read put
        cache.put("key", "value");

        // read get
        cache.getIfPresent("key");
    }

}

constructor

Caffeine 的實(shí)現(xiàn)類區(qū)分了 BoundedLocalManualCache 和 UnboundedLocalManualCache,見名知意它們分別為“有邊界”的和“無邊界”的緩存。Caffeine#isBounded 方法詮釋了“邊界”的含義:

public final class Caffeine {

    static final int UNSET_INT = -1;

    public  Cache build() {
        // 校驗(yàn)參數(shù)
        requireWeightWithWeigher();
        requireNonLoadingCache();

        @SuppressWarnings("unchecked")
        Caffeine self = (Caffeine) this;
        return isBounded()
                ? new BoundedLocalCache.BoundedLocalManualCache(self)
                : new UnboundedLocalCache.UnboundedLocalManualCache(self);
    }

    boolean isBounded() {
        // 指定了最大大小;指定了最大權(quán)重
        return (maximumSize != UNSET_INT) || (maximumWeight != UNSET_INT)
                // 指定了訪問后過期策略;指定了寫后過期策略
                || (expireAfterAccessNanos != UNSET_INT) || (expireAfterWriteNanos != UNSET_INT)
                // 指定了自定義過期策略;指定了 key 或 value 的引用級(jí)別
                || (expiry != null) || (keyStrength != null) || (valueStrength != null);
    }
}

也就是說,當(dāng)為緩存指定了上述的驅(qū)逐或過期策略會(huì)定義為有邊界的 BoundedLocalManualCache 緩存,它會(huì)限制緩存的大小,防止內(nèi)存溢出,否則為無邊界的 UnboundedLocalManualCache 類型,它沒有大小限制,直到內(nèi)存耗盡。我們以創(chuàng)建配置了固定大小的緩存為例,它對(duì)應(yīng)的類型便是 BoundedLocalManualCache,在執(zhí)行構(gòu)造方法時(shí),有以下邏輯:

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef
        implements LocalCache {
    // ...

    static class BoundedLocalManualCache implements LocalManualCache, Serializable {
        private static final long serialVersionUID = 1;

        final BoundedLocalCache cache;

        BoundedLocalManualCache(Caffeine builder) {
            this(builder, null);
        }

        BoundedLocalManualCache(Caffeine builder, @Nullable CacheLoader loader) {
            cache = LocalCacheFactory.newBoundedLocalCache(builder, loader, /* async */ false);
        }
    }
}

BoundedLocalCache 為抽象類,緩存對(duì)象的實(shí)際類型都是它的子類。它在創(chuàng)建時(shí)使用了反射并遵循簡單工廠的編碼風(fēng)格:

interface LocalCacheFactory {
    static  BoundedLocalCache newBoundedLocalCache(Caffeine builder,
                                                               @Nullable AsyncCacheLoader cacheLoader, boolean async) {
        var className = getClassName(builder);
        var factory = loadFactory(className);
        try {
            return factory.newInstance(builder, cacheLoader, async);
        } catch (RuntimeException | Error e) {
            throw e;
        } catch (Throwable t) {
            throw new IllegalStateException(className, t);
        }
    }
}

getClassName 方法非常有意思,它會(huì)根據(jù)緩存配置的屬性動(dòng)態(tài)拼接出實(shí)際緩存類名:

interface LocalCacheFactory {

    static String getClassName(Caffeine builder) {
        var className = new StringBuilder();
        // key 是強(qiáng)引用或弱引用
        if (builder.isStrongKeys()) {
            className.append('S');
        } else {
            className.append('W');
        }
        // value 是強(qiáng)引用或弱引用
        if (builder.isStrongValues()) {
            className.append('S');
        } else {
            className.append('I');
        }
        // 配置了移除監(jiān)聽器
        if (builder.removalListener != null) {
            className.append('L');
        }
        // 配置了統(tǒng)計(jì)功能
        if (builder.isRecordingStats()) {
            className.append('S');
        }
        // 不同的驅(qū)逐策略
        if (builder.evicts()) {
            // 基于最大值限制,可能是最大權(quán)重W,也可能是最大容量S
            className.append('M');
            // 基于權(quán)重或非權(quán)重
            if (builder.isWeighted()) {
                className.append('W');
            } else {
                className.append('S');
            }
        }
        // 配置了訪問過期或可變過期策略
        if (builder.expiresAfterAccess() || builder.expiresVariable()) {
            className.append('A');
        }
        // 配置了寫入過期策略
        if (builder.expiresAfterWrite()) {
            className.append('W');
        }
        // 配置了刷新策略
        if (builder.refreshAfterWrite()) {
            className.append('R');
        }
        return className.toString();
    }
}

這也就是為什么能在 com.github.benmanes.caffeine.cache 包路徑下能發(fā)現(xiàn)很多類似 SSMS 只有簡稱命名的類的原因(下圖只截取部分,實(shí)際上有很多):

wKgZO2iRqV6AOqckAAAUJgkJ52Y634.png

根據(jù)代碼邏輯,它的命名遵循如下格式 S|W S|I [L] [S] [MW|MS] [A] [W] [R] 其中 [] 表示選填,| 表示某配置不同選擇的分隔符,結(jié)合注釋能清楚的了解各個(gè)位置字母簡稱表達(dá)的含義。如此定義實(shí)現(xiàn)類使用了 多級(jí)繼承,盡可能多地復(fù)用代碼。

以我們測試用例中創(chuàng)建的緩存類型為例,它對(duì)應(yīng)的實(shí)現(xiàn)類為 SSMS,表示 key 和 value 均為強(qiáng)引用,并配置了非權(quán)重的最大緩存大小限制,類圖關(guān)系如下:

wKgZO2iRqWCAc5TlAAUDX4Iv_CE796.png

雖然在一些軟件設(shè)計(jì)相關(guān)的書籍中強(qiáng)調(diào)“多用組合,少用繼承”,但是這里使用多級(jí)繼承我覺得并沒有增加開發(fā)者的理解難度,反而了解了它的命名規(guī)則后,能更清晰的理解各個(gè)緩存所表示的含義,更好地實(shí)現(xiàn)代碼復(fù)用。

執(zhí)行 SSMS 的構(gòu)造方法會(huì)有以下邏輯:

// 1
abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef
        implements LocalCache {

    static final int WRITE_BUFFER_MIN = 4;
    static final int WRITE_BUFFER_MAX = 128 * ceilingPowerOfTwo(NCPU);

    static final long MAXIMUM_CAPACITY = Long.MAX_VALUE - Integer.MAX_VALUE;

    static final double PERCENT_MAIN = 0.99d;
    static final double PERCENT_MAIN_PROTECTED = 0.80d;

    static final double HILL_CLIMBER_STEP_PERCENT = 0.0625d;

    final @Nullable RemovalListener evictionListener;
    final @Nullable AsyncCacheLoader cacheLoader;

    final MpscGrowableArrayQueue writeBuffer;
    final ConcurrentHashMap> data;
    final PerformCleanupTask drainBuffersTask;
    final Consumer> accessPolicy;
    final Buffer> readBuffer;
    final NodeFactory nodeFactory;
    final ReentrantLock evictionLock;
    final Weigher weigher;
    final Executor executor;

    final boolean isAsync;
    final boolean isWeighted;

    protected BoundedLocalCache(Caffeine builder,
                                @Nullable AsyncCacheLoader cacheLoader, boolean isAsync) {
        // 標(biāo)記同步或異步
        this.isAsync = isAsync;
        // 指定 cacheLoader 
        this.cacheLoader = cacheLoader;
        // 指定用于執(zhí)行驅(qū)逐元素、刷新緩存等任務(wù)的線程池,不指定默認(rèn)為 ForkJoinPool.commonPool()
        executor = builder.getExecutor();
        // 標(biāo)記是否定義了節(jié)點(diǎn)計(jì)算權(quán)重的 Weigher 對(duì)象
        isWeighted = builder.isWeighted();
        // 同步鎖,在接下來的內(nèi)容中會(huì)看到很多標(biāo)記了 @GuardedBy("evictionLock") 注解的方法,表示這行這些方法時(shí)都會(huì)獲取這把同步鎖
        // 根據(jù)該鎖的命名,eviction 表示驅(qū)逐的意思,也就是說關(guān)注驅(qū)逐策略執(zhí)行的方法都要獲取該鎖,這一點(diǎn)需要在后文中注意
        evictionLock = new ReentrantLock();
        // 計(jì)算元素權(quán)重的對(duì)象,不指定為 SingletonWeigher.INSTANCE
        weigher = builder.getWeigher(isAsync);
        // 執(zhí)行緩存 maintenance 方法的任務(wù),在后文中具體介紹
        drainBuffersTask = new PerformCleanupTask(this);
        // 創(chuàng)建節(jié)點(diǎn)的工廠
        nodeFactory = NodeFactory.newFactory(builder, isAsync);
        // 驅(qū)逐監(jiān)聽器,有元素被驅(qū)逐時(shí)會(huì)回調(diào)
        evictionListener = builder.getEvictionListener(isAsync);
        // 用于保存所有數(shù)據(jù)的 ConcurrentHashMap
        data = new ConcurrentHashMap(builder.getInitialCapacity());
        // 如果指定驅(qū)逐策略 或 key為弱引用 或 value為弱引用或軟引用 或 訪問后過期則創(chuàng)建 readBuffer,否則它為不可用狀態(tài)
        // readBuffer 用于記錄某些被訪問過的節(jié)點(diǎn)
        readBuffer = evicts() || collectKeys() || collectValues() || expiresAfterAccess()
                ? new BoundedBuffer() : Buffer.disabled();
        // 如果指定了驅(qū)逐策略 或 訪問后過期策略則會(huì)定義訪問策略,執(zhí)行 onAccess 方法,后文詳細(xì)介紹
        accessPolicy = (evicts() || expiresAfterAccess()) ? this::onAccess : e -> {};
        // 初始化最大值和最小值的雙端隊(duì)列作為 writeBuffer,用于記錄一些寫后操作任務(wù) 
        writeBuffer = new MpscGrowableArrayQueue(WRITE_BUFFER_MIN, WRITE_BUFFER_MAX);

        // 執(zhí)行了驅(qū)逐策略則更新最大容量限制
        if (evicts()) {
            setMaximumSize(builder.getMaximum());
        }
    }

    @GuardedBy("evictionLock")
    void setMaximumSize(long maximum) {
        requireArgument(maximum >= 0, "maximum must not be negative");
        if (maximum == maximum()) {
            return;
        }

        // 不能超過最大容量
        long max = Math.min(maximum, MAXIMUM_CAPACITY);
        // 計(jì)算窗口區(qū)大小
        long window = max - (long) (PERCENT_MAIN * max);
        // 計(jì)算保護(hù)區(qū)大小
        long mainProtected = (long) (PERCENT_MAIN_PROTECTED * (max - window));

        // 記錄這些值
        setMaximum(max);
        setWindowMaximum(window);
        setMainProtectedMaximum(mainProtected);

        // 標(biāo)記命中量、非命中量并初始化步長值,這三個(gè)值用于后續(xù)動(dòng)態(tài)調(diào)整保護(hù)區(qū)和窗口區(qū)大小
        setHitsInSample(0);
        setMissesInSample(0);
        setStepSize(-HILL_CLIMBER_STEP_PERCENT * max);

        // 直到當(dāng)前緩存的權(quán)重(大?。┙咏畲笾狄话霑r(shí)才初始化頻率草圖
        if ((frequencySketch() != null) && !isWeighted() && (weightedSize() >= (max >>> 1))) {
            frequencySketch().ensureCapacity(max);
        }
    }
}

// 2
class SS extends BoundedLocalCache {
    static final LocalCacheFactory FACTORY = SS::new;

    // key value 強(qiáng)引用無需特殊操作
    SS(Caffeine var1, @Nullable AsyncCacheLoader var2, boolean var3) {
        super(var1, var2, var3);
    }
}

// 3
class SSMS extends SS {

    // 頻率草圖,后文具體介紹
    final FrequencySketch sketch = new FrequencySketch();

    final AccessOrderDeque> accessOrderWindowDeque;
    final AccessOrderDeque> accessOrderProbationDeque;
    final AccessOrderDeque> accessOrderProtectedDeque;

    SSMS(Caffeine var1, @Nullable AsyncCacheLoader var2, boolean var3) {
        super(var1, var2, var3);
        // 如果 Caffeine 初始化了容量則確定頻率草圖的容量
        if (var1.hasInitialCapacity()) {
            long var4 = Math.min(var1.getMaximum(), (long) var1.getInitialCapacity());
            this.sketch.ensureCapacity(var4);
        }

        // 初始化窗口區(qū)、試用區(qū)和保護(hù)區(qū),它們都是雙端隊(duì)列(鏈表實(shí)現(xiàn))
        this.accessOrderWindowDeque = !var1.evicts() && !var1.expiresAfterAccess() ? null : new AccessOrderDeque();
        this.accessOrderProbationDeque = new AccessOrderDeque();
        this.accessOrderProtectedDeque = new AccessOrderDeque();
    }
}

在步驟 1 中定義了三個(gè)區(qū)的初始化大小為 1%|19%|80%,這樣配置的性能相對(duì)較好。此外,我們還需要解釋一下 weightedSize() 方法,它用于訪問 long weightedSize 變量。根據(jù)其命名有“權(quán)重大小”的含義,在默認(rèn)不指定權(quán)重計(jì)算對(duì)象 Weigher 的情況下,Weigher 默認(rèn)為 SingletonWeigher.INSTANCE 表示每個(gè)元素的權(quán)重大小為 1,如下:

enum SingletonWeigher implements Weigher {
    INSTANCE;

    @Override
    public int weigh(Object key, Object value) {
        return 1;
    }
}

這樣 weightedSize 表示的便是當(dāng)前緩存中元素?cái)?shù)量。如果自定義了 Weigher 那么 weightedSize 表示的便是緩存中總權(quán)重大小,每個(gè)元素的權(quán)重則可能會(huì)不同。因?yàn)樵谑纠形覀儾]有指定 Weigher,所以在此處可以將 weightedSize 理解為當(dāng)前緩存大小。

上文中我們提到緩存的定義遵循大寫字母縮寫的命名規(guī)則,實(shí)際上節(jié)點(diǎn)類的定義也采用了這種方式,在創(chuàng)建節(jié)點(diǎn)工廠 NodeFactory.newFactory(builder, isAsync)
的邏輯中,它會(huì)執(zhí)行如下邏輯,根據(jù)緩存的類型來確定它的節(jié)點(diǎn)類型,命名遵循 P|F S|W|D A|AW|W| [R] [MW|MS] 的規(guī)則,同樣使用了反射機(jī)制和簡單工廠的編碼風(fēng)格,如下:

interface NodeFactory {
    // ...

    static  NodeFactory newFactory(Caffeine builder, boolean isAsync) {
        if (builder.interner) {
            return (NodeFactory) Interned.FACTORY;
        }
        var className = getClassName(builder, isAsync);
        return loadFactory(className);
    }

    static String getClassName(Caffeine builder, boolean isAsync) {
        var className = new StringBuilder();
        // key 強(qiáng)引用或弱引用
        if (builder.isStrongKeys()) {
            className.append('P');
        } else {
            className.append('F');
        }
        // value 強(qiáng)引用或弱引用或軟引用
        if (builder.isStrongValues()) {
            className.append('S');
        } else if (builder.isWeakValues()) {
            className.append('W');
        } else {
            className.append('D');
        }
        // 過期策略
        if (builder.expiresVariable()) {
            if (builder.refreshAfterWrite()) {
                // 訪問后過期
                className.append('A');
                if (builder.evicts()) {
                    // 寫入后過期
                    className.append('W');
                }
            } else {
                className.append('W');
            }
        } else {
            // 訪問后過期
            if (builder.expiresAfterAccess()) {
                className.append('A');
            }
            // 寫入后過期
            if (builder.expiresAfterWrite()) {
                className.append('W');
            }
        }
        // 寫入后刷新
        if (builder.refreshAfterWrite()) {
            className.append('R');
        }
        // 驅(qū)逐策略
        if (builder.evicts()) {
            // 默認(rèn)最大大小限制
            className.append('M');
            // 加權(quán)
            if (isAsync || (builder.isWeighted() && (builder.weigher != Weigher.singletonWeigher()))) {
                className.append('W');
            } else {
                // 非加權(quán)
                className.append('S');
            }
        }
        return className.toString();
    }

}

SSMS 類型緩存對(duì)應(yīng)的節(jié)點(diǎn)類型為 PSMS。

FrequencySketch

接下來,我們需要具體介紹下 FrequencySketch,它在上述構(gòu)造方法的步驟 3 中被創(chuàng)建。這個(gè)類的實(shí)現(xiàn)采用了 Count-Min Sketch 數(shù)據(jù)結(jié)構(gòu),它維護(hù)了一個(gè) long[] table 一維數(shù)組,每個(gè)元素有 64 位,每 4 位作為一個(gè)計(jì)數(shù)器(這也就限定了最大頻率為 15),那么數(shù)組中每個(gè)槽位便是 16 個(gè)計(jì)數(shù)器。通過哈希函數(shù)取 4 個(gè)獨(dú)立的計(jì)數(shù)值,將其中的最小值作為元素的訪問頻率。table 的初始大小為緩存最大容量最接近的 2 的 n 次冪,并在計(jì)算哈希值時(shí)使用 blockMask 掩碼來使哈希結(jié)果均勻分布,保證了獲取元素訪問頻率的正確率為 93.75%,達(dá)到空間與時(shí)間的平衡。它的實(shí)現(xiàn)原理和布隆過濾器類似,犧牲了部分準(zhǔn)確性,但減少了占用內(nèi)存的大小。如下圖所示為計(jì)算元素 e 的訪問頻率:

wKgZPGiRqWGANlxcAAqrCLf4x8I995.png

以下為 FrequencySketch 的源碼,關(guān)注注釋即可,并不復(fù)雜:

final class FrequencySketch {

    static final long RESET_MASK = 0x7777777777777777L;
    static final long ONE_MASK = 0x1111111111111111L;

    // 采樣大小,用于控制 reset
    int sampleSize;
    // 掩碼,用于均勻分散哈希結(jié)果
    int blockMask;
    long[] table;
    int size;

    public FrequencySketch() {
    }

    public void ensureCapacity(@NonNegative long maximumSize) {
        requireArgument(maximumSize >= 0);
        // 取緩存最大容量和 Integer.MAX_VALUE >>> 1 中的小值 
        int maximum = (int) Math.min(maximumSize, Integer.MAX_VALUE >>> 1);
        // 如果已經(jīng)被初始化過并且 table 長度大于等于最大容量,那么不進(jìn)行操作
        if ((table != null) && (table.length >= maximum)) {
            return;
        }

        // 初始化 table,長度為最接近 maximum 的 2的n次冪 和 8 中的大值
        table = new long[Math.max(Caffeine.ceilingPowerOfTwo(maximum), 8)];
        // 計(jì)算采樣大小
        sampleSize = (maximumSize == 0) ? 10 : (10 * maximum);
        // 計(jì)算掩碼
        blockMask = (table.length >>> 3) - 1;
        // 特殊判斷
        if (sampleSize <= 0) {
            sampleSize = Integer.MAX_VALUE;
        }
        // 計(jì)數(shù)器總數(shù)
        size = 0;
    }

    @NonNegative
    public int frequency(E e) {
        // 如果緩存沒有被初始化則返回頻率為 0
        if (isNotInitialized()) {
            return 0;
        }

        // 創(chuàng)建 4 個(gè)元素的數(shù)組 count 用于保存 4 次 hash 計(jì)算出的頻率值
        int[] count = new int[4];
        // hash 擾動(dòng),使結(jié)果均勻分布
        int blockHash = spread(e.hashCode());
        // 重 hash,進(jìn)一步分散結(jié)果
        int counterHash = rehash(blockHash);
        // 根據(jù)掩碼計(jì)算對(duì)應(yīng)的塊索引
        int block = (blockHash & blockMask) >> (i >> 1) & 15;
            // 計(jì)算計(jì)數(shù)器的偏移量
            int offset = h & 1;
            // 定位到 table 中某個(gè)槽位后右移并進(jìn)行位與運(yùn)算得到最低的 4 位的值(0xfL 為二進(jìn)制的 1111)
            count[i] = (int) ((table[block + offset + (i >> (index >> (i >> 1) & 15;
            int offset = h & 1;
            // i + 4 記錄元素所在 table 中的索引
            index[i + 4] = block + offset + (i >> 1) & RESET_MASK;
        }
        // count >>> 2 表示計(jì)數(shù)器個(gè)數(shù),計(jì)算重置后的 size
        size = (size - (count >>> 2)) >>> 1;
    }

    static int spread(int x) {
        x ^= x >>> 17;
        x *= 0xed5ad4bb;
        x ^= x >>> 11;
        x *= 0xac4c1b51;
        x ^= x >>> 15;
        return x;
    }

    static int rehash(int x) {
        x *= 0x31848bab;
        x ^= x >>> 14;
        return x;
    }

}

到這里,Caffeine 緩存的基本數(shù)據(jù)結(jié)構(gòu)全貌已經(jīng)展現(xiàn)出來了,如下所示,在后文中我們?cè)倬唧w關(guān)注它們之間是如何協(xié)同的。

wKgZO2iRqWOAGD6FAAyQ2GkyIYU736.png

put

接下來繼續(xù)了解向緩存中添加元素的流程,本節(jié)內(nèi)容比較多,理解起來也相對(duì)復(fù)雜,結(jié)合文章內(nèi)容的同時(shí),也需要多去深入查看 Caffeine 源碼才能有更好的理解,以下為 put 方法的源碼:

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {

    // 默認(rèn)入?yún)?onlyIfAbsent 為 false,表示向緩存中添加相同的 key 會(huì)對(duì) value 進(jìn)行替換 
    @Override
    public @Nullable V put(K key, V value) {
        return put(key, value, expiry(), /* onlyIfAbsent */ false);
    }
}

它會(huì)執(zhí)行到如下具體邏輯中,關(guān)注注釋信息:

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {

    static final int WRITE_BUFFER_RETRIES = 100;

    final MpscGrowableArrayQueue writeBuffer;

    final ConcurrentHashMap> data;

    final ReentrantLock evictionLock;

    final NodeFactory nodeFactory;

    @Nullable
    V put(K key, V value, Expiry expiry, boolean onlyIfAbsent) {
        // 不允許添加 null
        requireNonNull(key);
        requireNonNull(value);

        Node node = null;
        // 獲取當(dāng)前時(shí)間戳
        long now = expirationTicker().read();
        // 計(jì)算緩存權(quán)重,如果沒有指定 weigher 的話,默認(rèn)權(quán)重為 1
        int newWeight = weigher.weigh(key, value);
        // 創(chuàng)建用于查找的鍵對(duì)象
        Object lookupKey = nodeFactory.newLookupKey(key);
        
        for (int attempts = 1; ; attempts++) {
            // 嘗試獲取節(jié)點(diǎn);prior 譯為先前的;較早的
            Node prior = data.get(lookupKey);
            // 處理不存在的節(jié)點(diǎn)
            if (prior == null) {
                // 如果 node 在循環(huán)執(zhí)行中還未被創(chuàng)建
                if (node == null) {
                    // NodeFactory 創(chuàng)建對(duì)應(yīng)類型節(jié)點(diǎn)
                    node = nodeFactory.newNode(key, keyReferenceQueue(), value, valueReferenceQueue(), newWeight, now);
                    // 設(shè)置節(jié)點(diǎn)的過期時(shí)間
                    setVariableTime(node, expireAfterCreate(key, value, expiry, now));
                }
                // 嘗試添加新節(jié)點(diǎn)到緩存中,如果鍵已存在則返回現(xiàn)有節(jié)點(diǎn)
                prior = data.putIfAbsent(node.getKeyReference(), node);
                // 返回 null 表示插入成功
                if (prior == null) {
                    // 寫后操作:添加 AddTask 并調(diào)度執(zhí)行任務(wù)
                    afterWrite(new AddTask(node, newWeight));
                    return null;
                }
                // onlyIfAbsent 形參在默認(rèn)的 put 方法中為 false,以下邏輯簡單介紹
                // 如果此時(shí)有其他線程添加了相同 key 的元素
                else if (onlyIfAbsent) {
                    // 獲取到當(dāng)前值,嘗試判斷讀后失效策略,更新訪問時(shí)間,并執(zhí)行讀后操作 afterRead 方法
                    V currentValue = prior.getValue();
                    if ((currentValue != null) && !hasExpired(prior, now)) {
                        if (!isComputingAsync(prior)) {
                            tryExpireAfterRead(prior, key, currentValue, expiry(), now);
                            setAccessTime(prior, now);
                        }
                        // 讀后操作,該方法在 getIfPresent 中進(jìn)行講解
                        afterRead(prior, now, /* recordHit */ false);
                        return currentValue;
                    }
                }
            } else if (onlyIfAbsent) {
                // 同樣的邏輯
                V currentValue = prior.getValue();
                if ((currentValue != null) && !hasExpired(prior, now)) {
                    if (!isComputingAsync(prior)) {
                        tryExpireAfterRead(prior, key, currentValue, expiry(), now);
                        setAccessTime(prior, now);
                    }
                    afterRead(prior, now, /* recordHit */ false);
                    return currentValue;
                }
            }
        }
        // ...
    }
}

注意添加節(jié)點(diǎn)成功的邏輯,它會(huì)執(zhí)行 afterWrite 寫后操作方法,添加 AddTask 任務(wù)到 writeBuffer 中:

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {

    // 寫重試最多 100 次
    static final int WRITE_BUFFER_RETRIES = 100;

    static final int WRITE_BUFFER_MIN = 4;
    static final int WRITE_BUFFER_MAX = 128 * ceilingPowerOfTwo(NCPU);

    final MpscGrowableArrayQueue writeBuffer = new MpscGrowableArrayQueue(WRITE_BUFFER_MIN, WRITE_BUFFER_MAX);

    // 添加寫后 Task 到 writeBuffer 中并在合適的時(shí)機(jī)調(diào)度執(zhí)行任務(wù)
    void afterWrite(Runnable task) {
        // 最多重試添加 100 次
        for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {
            if (writeBuffer.offer(task)) {
                // 寫后調(diào)度
                scheduleAfterWrite();
                return;
            }
            // 向 writeBuffer 中添加任務(wù)失敗會(huì)調(diào)度任務(wù)執(zhí)行
            scheduleDrainBuffers();
            // 自旋等待,讓出 CPU 控制權(quán)
            Thread.onSpinWait();
        }
        // ...
    }
}

writeBuffer 的類型為 MpscGrowableArrayQueue,在這里我們?cè)敿?xì)的介紹下它。

WriteBuffer

根據(jù)它的命名 GrowableArrayQueue 可知它是一個(gè)容量可以增長的雙端隊(duì)列,前綴 MPSC 表達(dá)的含義是“多生產(chǎn)者,單消費(fèi)者”,也就是說可以有多個(gè)線程向其中添加元素,但只有一個(gè)線程能從其中獲取元素。那么它是如何實(shí)現(xiàn) MPSC 的呢?接下來我們就根據(jù)源碼詳細(xì)了解一下。首先先來看一下它的類繼承關(guān)系圖及簡要說明:

wKgZPGiRqWaAKxtEAArX0SqWOsQ801.png

圖中灰色的表示抽象類,藍(lán)色為實(shí)現(xiàn)類,java.util.AbstractQueue 就不再多解釋了。我們先看看其中標(biāo)記紅框的類,討論到底什么是“避免內(nèi)存?zhèn)喂蚕韱栴}”?

以 BaseMpscLinkedArrayQueuePad1 為例:

abstract class BaseMpscLinkedArrayQueuePad1 extends AbstractQueue {
    byte p000, p001, p002, p003, p004, p005, p006, p007;
    byte p008, p009, p010, p011, p012, p013, p014, p015;
    byte p016, p017, p018, p019, p020, p021, p022, p023;
    byte p024, p025, p026, p027, p028, p029, p030, p031;
    byte p032, p033, p034, p035, p036, p037, p038, p039;
    byte p040, p041, p042, p043, p044, p045, p046, p047;
    byte p048, p049, p050, p051, p052, p053, p054, p055;
    byte p056, p057, p058, p059, p060, p061, p062, p063;
    byte p064, p065, p066, p067, p068, p069, p070, p071;
    byte p072, p073, p074, p075, p076, p077, p078, p079;
    byte p080, p081, p082, p083, p084, p085, p086, p087;
    byte p088, p089, p090, p091, p092, p093, p094, p095;
    byte p096, p097, p098, p099, p100, p101, p102, p103;
    byte p104, p105, p106, p107, p108, p109, p110, p111;
    byte p112, p113, p114, p115, p116, p117, p118, p119;
}

這個(gè)類除了定義了 120 字節(jié)的字段外,看上去沒有做其他任何事情,實(shí)際上它為 性能提升 默默做出了貢獻(xiàn),避免了內(nèi)存?zhèn)喂蚕?/strong>。CPU 中緩存行(Cache Line)的大小通常是 64 字節(jié),在類中定義 120 字節(jié)來占位,這樣便能將上下繼承關(guān)系間的字段間隔開,保證被多個(gè)線程訪問的關(guān)鍵字段距離至少跨越一個(gè)緩存行,分布在不同的緩存行中。這樣在不同的線程訪問 BaseMpscLinkedArrayQueueProducerFields 和 BaseMpscLinkedArrayQueueConsumerFields 中字段時(shí)互不影響,詳細(xì)了解原理可參考博客園 - CPU Cache與緩存行。

接下來我們看看其他抽象類的作用。BaseMpscLinkedArrayQueueProducerFields 定義生產(chǎn)者相關(guān)字段:

abstract class BaseMpscLinkedArrayQueueProducerFields extends BaseMpscLinkedArrayQueuePad1 {
    // 生產(chǎn)者操作索引(并不對(duì)應(yīng)緩沖區(qū) producerBuffer 中索引位置)
    protected long producerIndex;
}

BaseMpscLinkedArrayQueueConsumerFields 負(fù)責(zé)定義消費(fèi)者相關(guān)字段:

abstract class BaseMpscLinkedArrayQueueConsumerFields extends BaseMpscLinkedArrayQueuePad2 {
    // 掩碼值,用于計(jì)算消費(fèi)者實(shí)際的索引位置
    protected long consumerMask;
    // 消費(fèi)者訪問這個(gè)緩沖區(qū)來獲取元素消費(fèi)
    protected E[] consumerBuffer;
    // 消費(fèi)者操作索引(并不對(duì)應(yīng)緩沖區(qū) consumerBuffer 中索引位置)
    protected long consumerIndex;
}

BaseMpscLinkedArrayQueueColdProducerFields 中定義字段如下,該類的命名包含 Cold,表示其中字段被修改的次數(shù)會(huì)比較少:

abstract class BaseMpscLinkedArrayQueueColdProducerFields extends BaseMpscLinkedArrayQueuePad3 {
    // 生產(chǎn)者可以操作的最大索引上限
    protected volatile long producerLimit;
    // 掩碼值,用于計(jì)算生產(chǎn)者在數(shù)組中實(shí)際的索引
    protected long producerMask;
    // 存儲(chǔ)生產(chǎn)者生產(chǎn)的元素
    protected E[] producerBuffer;
}

現(xiàn)在關(guān)鍵字段我們已經(jīng)介紹完了,接下來看一下創(chuàng)建 MpscGrowableArrayQueue 的邏輯,執(zhí)行它的構(gòu)造方法時(shí)會(huì)為我們剛剛提到的字段進(jìn)行賦值:

class MpscGrowableArrayQueue extends MpscChunkedArrayQueue {

    MpscGrowableArrayQueue(int initialCapacity, int maxCapacity) {
        // 調(diào)用父類的構(gòu)造方法
        super(initialCapacity, maxCapacity);
    }
}

abstract class MpscChunkedArrayQueue extends MpscChunkedArrayQueueColdProducerFields {
    // 省略字節(jié)占位字段...
    byte p119;

    MpscChunkedArrayQueue(int initialCapacity, int maxCapacity) {
        // 調(diào)用父類的構(gòu)造方法
        super(initialCapacity, maxCapacity);
    }

}

abstract class MpscChunkedArrayQueueColdProducerFields extends BaseMpscLinkedArrayQueue {
    protected final long maxQueueCapacity;

    MpscChunkedArrayQueueColdProducerFields(int initialCapacity, int maxCapacity) {
        // 調(diào)用父類的構(gòu)造方法
        super(initialCapacity);
        if (maxCapacity < 4) {
            throw new IllegalArgumentException("Max capacity must be 4 or more");
        }
        // 保證了最大值最少比初始值大 2 倍
        if (ceilingPowerOfTwo(initialCapacity) >= ceilingPowerOfTwo(maxCapacity)) {
            throw new IllegalArgumentException(
                    "Initial capacity cannot exceed maximum capacity(both rounded up to a power of 2)");
        }
        // 最大容量也為 2的n次冪
        maxQueueCapacity = ((long) ceilingPowerOfTwo(maxCapacity))  extends BaseMpscLinkedArrayQueueColdProducerFields {

    BaseMpscLinkedArrayQueue(final int initialCapacity) {
        if (initialCapacity < 2) {
            throw new IllegalArgumentException("Initial capacity must be 2 or more");
        }

        // 初始化緩沖區(qū)大小為數(shù)值最接近的 2 的 n 次冪
        int p2capacity = ceilingPowerOfTwo(initialCapacity);
        // 掩碼值,-1L 使其低位均為 1,左移 1 位則最低位為 0,eg: 00000110,注意該值會(huì)被生產(chǎn)者和消費(fèi)者掩碼值共同賦值
        long mask = (p2capacity - 1L) 

現(xiàn)在 MpscGrowableArrayQueue 的構(gòu)建已經(jīng)看完了,了解了其中關(guān)鍵字段的賦值,現(xiàn)在我們就需要看它是如何實(shí)現(xiàn) MPSC 的?!岸嗌a(chǎn)者”也就意味著會(huì)有多個(gè)線程向其中添加元素,既然是多線程就需要重點(diǎn)關(guān)注它是如何在多線程間完成協(xié)同的。添加操作對(duì)應(yīng)了 BaseMpscLinkedArrayQueue#offer 方法,它的實(shí)現(xiàn)如下:

abstract class BaseMpscLinkedArrayQueue extends BaseMpscLinkedArrayQueueColdProducerFields {

    private static final Object JUMP = new Object();

    @Override
    @SuppressWarnings("MissingDefault")
    public boolean offer(final E e) {
        if (e == null) {
            throw new NullPointerException();
        }

        long mask;
        E[] buffer;
        long pIndex;

        while (true) {
            // 生產(chǎn)者最大索引(生產(chǎn)者掩碼值),獲取 BaseMpscLinkedArrayQueueColdProducerFields 中定義的該字段
            long producerLimit = lvProducerLimit();
            // 生產(chǎn)者當(dāng)前索引,初始值為 0,BaseMpscLinkedArrayQueueProducerFields 中字段 
            pIndex = lvProducerIndex(this);
            // producerIndex 最低位用來表示擴(kuò)容(索引生產(chǎn)者索引 producerIndex 并不對(duì)應(yīng)緩沖區(qū)中實(shí)際的索引)
            // 低位為 1 表示正在擴(kuò)容,自旋等待直到擴(kuò)容完成(表示只有一個(gè)線程操作擴(kuò)容)
            if ((pIndex & 1) == 1) {
                continue;
            }

            // 掩碼值和buffer可能在擴(kuò)容中被改變,每次循環(huán)使用最新值
            mask = this.producerMask;
            buffer = this.producerBuffer;

            // 檢查是否需要擴(kuò)容
            if (producerLimit <= pIndex) {
                int result = offerSlowPath(mask, pIndex, producerLimit);
                switch (result) {
                    case 0:
                        break;
                    case 1:
                        continue;
                    case 2:
                        return false;
                    case 3:
                        resize(mask, buffer, pIndex, e);
                        return true;
                }
            }

            // CAS 操作更新生產(chǎn)者索引,注意這里是 +2,更新成功結(jié)束循環(huán)
            if (casProducerIndex(this, pIndex, pIndex + 2)) {
                break;
            }
        }
        // 計(jì)算該元素在 buffer 中的實(shí)際偏移量,并將其添加到緩沖區(qū)中
        final long offset = modifiedCalcElementOffset(pIndex, mask);
        soElement(buffer, offset, e);
        return true;
    }

    // 沒有將 resize 邏輯封裝在該方法中,而是由該方法判斷是否需要擴(kuò)容
    private int offerSlowPath(long mask, long pIndex, long producerLimit) {
        int result;
        // 獲取消費(fèi)者索引 BaseMpscLinkedArrayQueueConsumerFields 類中
        final long cIndex = lvConsumerIndex(this);
        // 通過掩碼值計(jì)算當(dāng)前緩沖區(qū)容量
        long bufferCapacity = getCurrentBufferCapacity(mask);
        result = 0;
        // 如果隊(duì)列還有空間
        if (cIndex + bufferCapacity > pIndex) {
            // 嘗試更新生產(chǎn)者最大限制,更新失敗則返回 1 重試
            if (!casProducerLimit(this, producerLimit, cIndex + bufferCapacity)) {
                result = 1;
            }
        }
        // 如果隊(duì)列已滿且無法擴(kuò)展
        else if (availableInQueue(pIndex, cIndex) <= 0) {
            result = 2;
        }
        // 更新 producerIndex 最低位為 1,成功則進(jìn)行擴(kuò)容,否則重試
        else if (casProducerIndex(this, pIndex, pIndex + 1)) {
            result = 3;
        } else {
            result = 1;
        }
        return result;
    }

    private void resize(long oldMask, E[] oldBuffer, long pIndex, final E e) {
        // 計(jì)算新緩沖區(qū)大小并創(chuàng)建,2 * (buffer.length - 1) + 1
        int newBufferLength = getNextBufferSize(oldBuffer);
        final E[] newBuffer = allocate(newBufferLength);

        // 更新緩沖區(qū)引用為新的緩沖區(qū)
        producerBuffer = newBuffer;
        // 更新新的掩碼
        final int newMask = (newBufferLength - 2) > 1;
    }
}

可見,在這個(gè)過程中它并沒有限制操作線程數(shù)量,也沒有使用加鎖的同步機(jī)制。它通過保證 可見性,并使用 自旋鎖結(jié)合 CAS 操作 更新生產(chǎn)者索引值,因?yàn)樵摬僮魇窃拥?,同時(shí)只有一個(gè)線程能更新獲取索引值成功,更新失敗的線程會(huì)自旋重試,這樣便允許多線程同時(shí)添加元素,可見性保證和CAS操作源碼如下:

abstract class BaseMpscLinkedArrayQueue extends BaseMpscLinkedArrayQueueColdProducerFields {

    static final VarHandle P_INDEX = pIndexLookup.findVarHandle(
            BaseMpscLinkedArrayQueueProducerFields.class, "producerIndex", long.class);
    
    // volatile 可見性保證
    static long lvProducerIndex(BaseMpscLinkedArrayQueue self) {
        return (long) P_INDEX.getVolatile(self);
    }
    
    // CAS 操作
    static boolean casProducerIndex(BaseMpscLinkedArrayQueue self, long expect, long newValue) {
        return P_INDEX.compareAndSet(self, expect, newValue);
    }
}

保證可見性(內(nèi)存操作對(duì)其他線程可見)的原理是 內(nèi)存屏障,除了保證可見性以外,內(nèi)存屏障還能夠 防止重排序(確保在內(nèi)存屏障前后的內(nèi)存操作不會(huì)被重排序,從而保證程序的正確性)。到這里,生產(chǎn)者添加元素的邏輯我們已經(jīng)分析完了,接下來我們需要繼續(xù)看一下消費(fèi)者獲取元素的邏輯,它對(duì)應(yīng)了 BaseMpscLinkedArrayQueue#poll 方法,同樣地,在這過程中需要關(guān)注“在這個(gè)方法中有沒有限制單一線程執(zhí)行”,以此實(shí)現(xiàn)單消費(fèi)者呢:

abstract class BaseMpscLinkedArrayQueue extends BaseMpscLinkedArrayQueueColdProducerFields {
    
    private static final Object JUMP = new Object();
    
    public E poll() {
        // 讀取消費(fèi)者相關(guān)字段 BaseMpscLinkedArrayQueueConsumerFields 類
        final E[] buffer = consumerBuffer;
        final long index = consumerIndex;
        final long mask = consumerMask;

        // 根據(jù)消費(fèi)索引,計(jì)算出元素在消費(fèi)者緩沖區(qū)中實(shí)際的位置
        final long offset = modifiedCalcElementOffset(index, mask);
        // 讀取該元素(volatile 可見性讀?。?        Object e = lvElement(buffer, offset);
        
        // 如果為空
        if (e == null) {
            // 比較生產(chǎn)者索引,如果兩個(gè)索引不相等,那么證明兩索引間存在距離表示還有元素能夠被消費(fèi)
            if (index != lvProducerIndex(this)) {
                // 自旋讀取元素,直到讀到元素
                do {
                    e = lvElement(buffer, offset);
                } while (e == null);
            } else {
                // 索引相等證明確實(shí)是空隊(duì)列
                return null;
            }
        }
        if (e == JUMP) {
            // 獲取到新緩沖區(qū)
            final E[] nextBuffer = getNextBuffer(buffer, mask);
            // 在新緩沖區(qū)中獲取到對(duì)應(yīng)元素
            return newBufferPoll(nextBuffer, index);
        }
        // 清除當(dāng)前索引的元素,表示該元素已經(jīng)被消費(fèi)
        soElement(buffer, offset, null);
        // 更新消費(fèi)者索引,這里也是 +2,它并不表示實(shí)際的在緩沖區(qū)的索引
        soConsumerIndex(this, index + 2);
        return (E) e;
    }

    private E[] getNextBuffer(final E[] buffer, final long mask) {
        // 如果已經(jīng)發(fā)生擴(kuò)容,此時(shí) consumerMask 仍然對(duì)應(yīng)的是擴(kuò)容前的 mask
        // 此處與生產(chǎn)者操作擴(kuò)容時(shí)拼接新舊緩沖區(qū)調(diào)用的是一樣的方法,這樣便能夠獲取到新緩沖區(qū)的偏移量
        final long nextArrayOffset = nextArrayOffset(mask);
        // 獲取到新緩沖區(qū),因?yàn)樵跀U(kuò)容操作時(shí)已經(jīng)將新緩沖區(qū)鏈接到舊緩沖區(qū)上了
        final E[] nextBuffer = (E[]) lvElement(buffer, nextArrayOffset);
        // 將舊緩沖區(qū)中新緩沖區(qū)位置設(shè)置為 null 表示舊緩沖區(qū)中已經(jīng)沒有任何元素需要被消費(fèi)了,也不再需要被引用了(能被垃圾回收了)
        soElement(buffer, nextArrayOffset, null);
        return nextBuffer;
    }

    private long nextArrayOffset(final long mask) {
        return modifiedCalcElementOffset(mask + 2, Long.MAX_VALUE);
    }

    private E newBufferPoll(E[] nextBuffer, final long index) {
        // 計(jì)算出消費(fèi)者操作索引在新緩沖區(qū)中對(duì)應(yīng)的實(shí)際位置
        final long offsetInNew = newBufferAndOffset(nextBuffer, index);
        // 在新緩沖區(qū)中獲取到對(duì)應(yīng)元素
        final E n = lvElement(nextBuffer, offsetInNew);
        if (n == null) {
            throw new IllegalStateException("new buffer must have at least one element");
        }
        // 清除當(dāng)前索引的元素,表示該元素已經(jīng)被消費(fèi)
        soElement(nextBuffer, offsetInNew, null);
        // 更新消費(fèi)者索引
        soConsumerIndex(this, index + 2);
        return n;
    }

    private long newBufferAndOffset(E[] nextBuffer, final long index) {
        // 將消費(fèi)者緩沖區(qū)引用和掩碼值更新
        consumerBuffer = nextBuffer;
        consumerMask = (nextBuffer.length - 2L) > 1;
    }
    
    static  E lvElement(E[] buffer, long offset) {
        return (E) REF_ARRAY.getVolatile(buffer, (int) offset);
    }
}

可以發(fā)現(xiàn)在該方法中并沒有限制單一線程執(zhí)行,所以理論上這個(gè)方法可能被多個(gè)線程調(diào)用,那么它又為什么被稱為 MPSC 呢?在這個(gè)方法中的一段注釋值得細(xì)心體會(huì):

This implementation is correct for single consumer thread use only.
此實(shí)現(xiàn)僅適用于單消費(fèi)者線程使用

所以調(diào)用該方法時(shí)開發(fā)者本身需要保證單線程調(diào)用而并不是在實(shí)現(xiàn)中控制。

到這里 MpscGrowableArrayQueue 中核心的邏輯已經(jīng)講解完了,現(xiàn)在我們回過頭來再看一下隊(duì)列擴(kuò)容前后生產(chǎn)者和消費(fèi)者是如何協(xié)同的?在擴(kuò)容前,consumerBuffer 和 producerBuffer 引用的是同一個(gè)緩沖區(qū)對(duì)象。如果發(fā)生擴(kuò)容,那么生產(chǎn)者會(huì)創(chuàng)建一個(gè)新的緩沖區(qū),并將 producerBuffer 引用指向它,此時(shí)它做了一個(gè) 非常巧妙 的操作,將 新緩沖區(qū)依然鏈接到舊緩沖區(qū) 上,并將觸發(fā)擴(kuò)容的元素對(duì)應(yīng)的舊緩沖區(qū)的索引處標(biāo)記為 JUMP,表示這及之后的元素已經(jīng)都在新緩沖區(qū)中。此時(shí),消費(fèi)者依然會(huì)在舊緩沖區(qū)中慢慢地消費(fèi),直到遇到 JUMP 標(biāo)志位,消費(fèi)者就知道需要到新緩沖區(qū)中取獲取元素了。因?yàn)橹吧a(chǎn)者在擴(kuò)容時(shí)對(duì)新舊緩沖區(qū)進(jìn)行鏈接,所以消費(fèi)者能夠通過舊緩沖區(qū)獲取到新緩沖區(qū)的引用,并變更 consumerBuffer 的引用和 consumerMask 掩碼值,接下來的消費(fèi)過程便和擴(kuò)容前沒有差別了。

scheduleAfterWrite

現(xiàn)在我們?cè)倩氐?put 方法的邏輯中,如果向 WriterBuffer 中添加元素成功,則會(huì)調(diào)用 scheduleAfterWrite 方法,調(diào)度任務(wù)的執(zhí)行:

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {

    final ReentrantLock evictionLock = new ReentrantLock();
    // 默認(rèn)為 ForkJoinPool.commonPool()
    final Executor executor;
    // 該任務(wù)在創(chuàng)建緩存時(shí)已經(jīng)完成初始化
    final PerformCleanupTask drainBuffersTask;
    
    // 根據(jù)狀態(tài)的變化來調(diào)度執(zhí)行任務(wù)
    void scheduleAfterWrite() {
        // 獲取當(dāng)前 drainStatus,drain 譯為排空,耗盡
        int drainStatus = drainStatusOpaque();
        for (; ; ) {
            // 這里的狀態(tài)機(jī)變更需要關(guān)注下
            switch (drainStatus) {
                // IDLE 表示當(dāng)前無任務(wù)可做
                case IDLE:
                    // CAS 更新狀態(tài)為 REQUIRED
                    casDrainStatus(IDLE, REQUIRED);
                    // 調(diào)度任務(wù)執(zhí)行
                    scheduleDrainBuffers();
                    return;
                // REQUIRED 表示當(dāng)前有任務(wù)需要執(zhí)行
                case REQUIRED:
                    // 調(diào)度任務(wù)執(zhí)行
                    scheduleDrainBuffers();
                    return;
                // PROCESSING_TO_IDLE 表示當(dāng)前任務(wù)處理完成后會(huì)變成 IDLE 狀態(tài)
                case PROCESSING_TO_IDLE:
                    // 又來了新的任務(wù),則 CAS 操作將它更新為 PROCESSING_TO_REQUIRED 狀態(tài)
                    if (casDrainStatus(PROCESSING_TO_IDLE, PROCESSING_TO_REQUIRED)) {
                        return;
                    }
                    drainStatus = drainStatusAcquire();
                    continue;
                    // PROCESSING_TO_REQUIRED 表示正在處理任務(wù),處理完任務(wù)后還有任務(wù)需要處理
                case PROCESSING_TO_REQUIRED:
                    return;
                default:
                    throw new IllegalStateException("Invalid drain status: " + drainStatus);
            }
        }
    }

    // 調(diào)度執(zhí)行緩沖區(qū)中的任務(wù)
    void scheduleDrainBuffers() {
        // 如果狀態(tài)表示正在有任務(wù)處理則返回
        if (drainStatusOpaque() >= PROCESSING_TO_IDLE) {
            return;
        }
        // 注意這里要獲取同步鎖 evictionLock
        if (evictionLock.tryLock()) {
            try {
                // 獲取鎖后再次校驗(yàn)當(dāng)前處理狀態(tài)
                int drainStatus = drainStatusOpaque();
                if (drainStatus >= PROCESSING_TO_IDLE) {
                    return;
                }
                // 更新狀態(tài)為 PROCESSING_TO_IDLE
                setDrainStatusRelease(PROCESSING_TO_IDLE);
                // 同步機(jī)制保證任何時(shí)刻只能有一個(gè)線程能夠提交任務(wù)
                executor.execute(drainBuffersTask);
            } catch (Throwable t) {
                logger.log(Level.WARNING, "Exception thrown when submitting maintenance task", t);
                maintenance(/* ignored */ null);
            } finally {
                evictionLock.unlock();
            }
        }
    }

}

寫后調(diào)度處理任務(wù)(scheduleAfterWrite)會(huì)根據(jù)狀態(tài)選擇性執(zhí)行 scheduleDrainBuffers 方法,執(zhí)行該方法時(shí)通過同步鎖 evictionLock 保證同時(shí)只有一個(gè)線程能提交 PerformCleanupTask 任務(wù)。這個(gè)任務(wù)在創(chuàng)建緩存時(shí)已經(jīng)被初始化完成了,每次提交任務(wù)都會(huì)被復(fù)用,接下來我們看一下這個(gè)任務(wù)的具體實(shí)現(xiàn):

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {

    // 可重用的任務(wù),用于執(zhí)行 maintenance 方法,避免了使用 ForkJoinPool 來包裝
    static final class PerformCleanupTask extends ForkJoinTask implements Runnable {
        private static final long serialVersionUID = 1L;

        final WeakReference> reference;

        PerformCleanupTask(BoundedLocalCache cache) {
            reference = new WeakReference>(cache);
        }

        @Override
        public boolean exec() {
            try {
                run();
            } catch (Throwable t) {
                logger.log(Level.ERROR, "Exception thrown when performing the maintenance task", t);
            }

            // Indicates that the task has not completed to allow subsequent submissions to execute
            return false;
        }

        @Override
        public void run() {
            // 獲取到緩存對(duì)象
            BoundedLocalCache cache = reference.get();
            if (cache != null) {
                cache.performCleanUp(null);
            }
        }
        // ...
    }
}

它的實(shí)現(xiàn)非常簡單,其中 reference 字段在調(diào)用構(gòu)造方法時(shí)被賦值,引用的是緩存對(duì)象本身。當(dāng)任務(wù)被執(zhí)行時(shí),調(diào)用的是 BoundedLocalCache#performCleanUp 方法:

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {

    final ReentrantLock evictionLock = new ReentrantLock();
    
    // 執(zhí)行該任務(wù)時(shí),也要獲取同步鎖,表示任務(wù)只能由一個(gè)線程來執(zhí)行
    void performCleanUp(@Nullable Runnable task) {
        evictionLock.lock();
        try {
            // 執(zhí)行維護(hù)任務(wù)
            maintenance(task);
        } finally {
            evictionLock.unlock();
        }
        rescheduleCleanUpIfIncomplete();
    }

    @GuardedBy("evictionLock")
    void maintenance(@Nullable Runnable task) {
        // 更新狀態(tài)為執(zhí)行中
        setDrainStatusRelease(PROCESSING_TO_IDLE);

        try {
            // 處理讀緩沖區(qū)中的任務(wù)
            drainReadBuffer();

            // 處理寫緩沖區(qū)中的任務(wù)
            drainWriteBuffer();
            if (task != null) {
                task.run();
            }

            // 處理 key 和 value 的引用
            drainKeyReferences();
            drainValueReferences();

            // 過期和驅(qū)逐策略
            expireEntries();
            evictEntries();

            // “增值” 操作,后續(xù)重點(diǎn)講
            climb();
        } finally {
            // 狀態(tài)不是 PROCESSING_TO_IDLE 或者無法 CAS 更新為 IDLE 狀態(tài)的話,需要更新狀態(tài)為 REQUIRED,該狀態(tài)會(huì)再次執(zhí)行維護(hù)任務(wù)
            if ((drainStatusOpaque() != PROCESSING_TO_IDLE) || !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {
                setDrainStatusOpaque(REQUIRED);
            }
        }
    }
}

注意在執(zhí)行 performCleanUp 方法時(shí),也需要獲取到同步鎖 evictionLock,那么任務(wù)的提交和任務(wù)的執(zhí)行也是互斥的。這個(gè)執(zhí)行的核心邏輯在 maintenance “維護(hù)”方法中,注意這個(gè)方法被標(biāo)記了注解 @GuardedBy("evictionLock"),源碼中還有多個(gè)方法也標(biāo)記了該注解,執(zhí)行這些方法時(shí)都要獲取同步鎖,這也是在提醒我們這些方法同時(shí)只有由一條線程被執(zhí)行。因?yàn)槟壳瓣P(guān)注的是 put 方法,所以重點(diǎn)先看維護(hù)方法中 drainWriteBuffer 方法處理寫緩沖區(qū)中任務(wù)的邏輯:

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {

    static final int NCPU = Runtime.getRuntime().availableProcessors();

    static final int WRITE_BUFFER_MAX = 128 * ceilingPowerOfTwo(NCPU);

    final MpscGrowableArrayQueue writeBuffer;

    @GuardedBy("evictionLock")
    void drainWriteBuffer() {
        // 最大循環(huán)次數(shù)為 writeBuffer 最大容量,直至彈出元素為 null
        for (int i = 0; i <= WRITE_BUFFER_MAX; i++) {
            Runnable task = writeBuffer.poll();
            if (task == null) {
                return;
            }
            task.run();
        }
        // 更新狀態(tài)為 PROCESSING_TO_REQUIRED
        setDrainStatusOpaque(PROCESSING_TO_REQUIRED);
    }
}

執(zhí)行邏輯非常簡單,在獲取到同步鎖之后,在 WriteBuffer 中獲取要被執(zhí)行的任務(wù)并執(zhí)行。在這里我們能發(fā)現(xiàn)“SC 單消費(fèi)者”的實(shí)現(xiàn)使用 同步鎖的機(jī)制保證同時(shí)只能有一個(gè)消費(fèi)者消費(fèi)緩沖區(qū)中的任務(wù)。在上文中我們已經(jīng)知道,調(diào)用 put 方法時(shí)向緩沖區(qū) WriteBuffer 中添加的任務(wù)為 AddTask,下面我們看一下該任務(wù)的實(shí)現(xiàn):

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {

    static final long MAXIMUM_CAPACITY = Long.MAX_VALUE - Integer.MAX_VALUE;

    final class AddTask implements Runnable {
        final Node node;
        // 節(jié)點(diǎn)權(quán)重
        final int weight;

        AddTask(Node node, int weight) {
            this.weight = weight;
            this.node = node;
        }

        @Override
        @GuardedBy("evictionLock")
        @SuppressWarnings("FutureReturnValueIgnored")
        public void run() {
            // 是否指定了驅(qū)逐策略
            if (evicts()) {
                // 更新緩存權(quán)重和窗口區(qū)權(quán)重
                setWeightedSize(weightedSize() + weight);
                setWindowWeightedSize(windowWeightedSize() + weight);
                // 更新節(jié)點(diǎn)的 policyWeight,該字段只有在自定了權(quán)重計(jì)算規(guī)則時(shí)才有效
                // 否則像只定義了固定容量的驅(qū)逐策略,使用默認(rèn)元素權(quán)重為 1 是不需要關(guān)注該字段的
                node.setPolicyWeight(node.getPolicyWeight() + weight);

                // 檢測當(dāng)前總權(quán)重是否超過一半的最大容量
                long maximum = maximum();
                if (weightedSize() >= (maximum >>> 1)) {
                    // 如果超過最大容量
                    if (weightedSize() > MAXIMUM_CAPACITY) {
                        // 執(zhí)行驅(qū)逐操作
                        evictEntries();
                    } else {
                        // 延遲加載頻率草圖 frequencySketch 數(shù)據(jù)結(jié)構(gòu),用于統(tǒng)計(jì)元素訪問頻率
                        long capacity = isWeighted() ? data.mappingCount() : maximum;
                        frequencySketch().ensureCapacity(capacity);
                    }
                }

                // 更新頻率統(tǒng)計(jì)信息
                K key = node.getKey();
                if (key != null) {
                    // 因?yàn)轭l率草圖數(shù)據(jù)結(jié)構(gòu)具有延遲加載機(jī)制(權(quán)重超過半數(shù))
                    // 所以實(shí)際上在元素權(quán)重還未過半未完成初始化時(shí),調(diào)用 increment 是沒有作用的
                    frequencySketch().increment(key);
                }

                // 增加未命中樣本數(shù)
                setMissesInSample(missesInSample() + 1);
            }

            // 同步檢測節(jié)點(diǎn)是否還有效
            boolean isAlive;
            synchronized (node) {
                isAlive = node.isAlive();
            }
            if (isAlive) {
                // 寫后過期策略
                if (expiresAfterWrite()) {
                    writeOrderDeque().offerLast(node);
                }
                // 過期策略
                if (expiresVariable()) {
                    timerWheel().schedule(node);
                }
                // 驅(qū)逐策略
                if (evicts()) {
                    // 如果權(quán)重比配置的最大權(quán)重大
                    if (weight > maximum()) {
                        // 執(zhí)行固定權(quán)重(RemovalCause.SIZE)的驅(qū)逐策略
                        evictEntry(node, RemovalCause.SIZE, expirationTicker().read());
                    }
                    // 如果權(quán)重超過窗口區(qū)最大權(quán)重,則將其放在窗口區(qū)頭節(jié)點(diǎn)
                    else if (weight > windowMaximum()) {
                        accessOrderWindowDeque().offerFirst(node);
                    }
                    // 否則放在窗口區(qū)尾節(jié)點(diǎn)
                    else {
                        accessOrderWindowDeque().offerLast(node);
                    }
                }
                // 訪問后過期策略
                else if (expiresAfterAccess()) {
                    accessOrderWindowDeque().offerLast(node);
                }
            }

            // 處理異步計(jì)算
            if (isComputingAsync(node)) {
                synchronized (node) {
                    if (!Async.isReady((CompletableFuture) node.getValue())) {
                        long expirationTime = expirationTicker().read() + ASYNC_EXPIRY;
                        setVariableTime(node, expirationTime);
                        setAccessTime(node, expirationTime);
                        setWriteTime(node, expirationTime);
                    }
                }
            }
        }
    }
}

根據(jù)注釋很容易理解該方法的作用,因?yàn)槲覀兡壳皩?duì)緩存只定義了固定容量的驅(qū)逐策略,所以我們需要在看一下 evictEntry 方法:

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {

    final ConcurrentHashMap> data;
    
    @GuardedBy("evictionLock")
    @SuppressWarnings({"GuardedByChecker", "NullAway", "PMD.CollapsibleIfStatements"})
    boolean evictEntry(Node node, RemovalCause cause, long now) {
        K key = node.getKey();
        @SuppressWarnings("unchecked")
        V[] value = (V[]) new Object[1];
        boolean[] removed = new boolean[1];
        boolean[] resurrect = new boolean[1];
        Object keyReference = node.getKeyReference();
        RemovalCause[] actualCause = new RemovalCause[1];

        data.computeIfPresent(keyReference, (k, n) -> {
            if (n != node) {
                return n;
            }
            synchronized (n) {
                value[0] = n.getValue();

                // key 或 value 為 null,這種情況下可能使用了 Caffeine.weakKeys, Caffeine.weakValues, or Caffeine.softValues
                // 導(dǎo)致被垃圾回收了
                if ((key == null) || (value[0] == null)) {
                    // 標(biāo)記實(shí)際失效原因?yàn)槔厥?
                    actualCause[0] = RemovalCause.COLLECTED;
                }
                // 如果原因?yàn)槔厥?,記?resurrect 復(fù)活標(biāo)記為 true
                else if (cause == RemovalCause.COLLECTED) {
                    resurrect[0] = true;
                    return n;
                }
                // 否則記錄入?yún)⒅械脑?                else {
                    actualCause[0] = cause;
                }

                // 過期驅(qū)逐策略判斷
                if (actualCause[0] == RemovalCause.EXPIRED) {
                    boolean expired = false;
                    if (expiresAfterAccess()) {
                        expired |= ((now - n.getAccessTime()) >= expiresAfterAccessNanos());
                    }
                    if (expiresAfterWrite()) {
                        expired |= ((now - n.getWriteTime()) >= expiresAfterWriteNanos());
                    }
                    if (expiresVariable()) {
                        expired |= (n.getVariableTime() <= now);
                    }
                    if (!expired) {
                        resurrect[0] = true;
                        return n;
                    }
                }
                // 固定容量驅(qū)逐策略
                else if (actualCause[0] == RemovalCause.SIZE) {
                    int weight = node.getWeight();
                    if (weight == 0) {
                        resurrect[0] = true;
                        return n;
                    }
                }

                // 通知驅(qū)逐策略監(jiān)聽器,調(diào)用它的方法
                notifyEviction(key, value[0], actualCause[0]);
                // 將該 key 對(duì)應(yīng)的刷新策略失效
                discardRefresh(keyReference);
                // 標(biāo)記該節(jié)點(diǎn)被驅(qū)逐
                removed[0] = true;
                // 退休準(zhǔn)備被垃圾回收
                node.retire();
            }
            return null;
        });

        // 如果復(fù)活標(biāo)記為 true 那么不被移除
        if (resurrect[0]) {
            return false;
        }

        // 節(jié)點(diǎn)已經(jīng)要被驅(qū)逐
        // 如果在窗口區(qū),那么直接從窗口區(qū)移除
        if (node.inWindow() && (evicts() || expiresAfterAccess())) {
            accessOrderWindowDeque().remove(node);
        }
        // 如果沒在窗口區(qū)
        else if (evicts()) {
            // 在試用區(qū)直接在試用區(qū)移除
            if (node.inMainProbation()) {
                accessOrderProbationDeque().remove(node);
            }
            // 在保護(hù)區(qū)則直接從保護(hù)區(qū)移除
            else {
                accessOrderProtectedDeque().remove(node);
            }
        }
        // 將寫后失效和時(shí)間輪中關(guān)于該節(jié)點(diǎn)的元素移除
        if (expiresAfterWrite()) {
            writeOrderDeque().remove(node);
        } else if (expiresVariable()) {
            timerWheel().deschedule(node);
        }

        // 同步機(jī)制將 node 置為 dead
        synchronized (node) {
            logIfAlive(node);
            makeDead(node);
        }

        if (removed[0]) {
            // 節(jié)點(diǎn)被移除監(jiān)控計(jì)數(shù)和節(jié)點(diǎn)移除通知回調(diào)
            statsCounter().recordEviction(node.getWeight(), actualCause[0]);
            notifyRemoval(key, value[0], actualCause[0]);
        }

        return true;
    }
}

該方法比較簡單,是將節(jié)點(diǎn)進(jìn)行驅(qū)逐的邏輯,在后文中它會(huì)被多次復(fù)用,需要留一個(gè)印象?;氐?AddTask 任務(wù)的邏輯中,當(dāng)被添加的元素權(quán)重超過最大權(quán)重限制時(shí)會(huì)被直接移除。這種特殊情況試用于指定了權(quán)重計(jì)算策略的緩存,如果只指定了固定容量,元素權(quán)重默認(rèn)為 1,所以不會(huì)直接超過最大緩存數(shù)量限制。

現(xiàn)在我們已經(jīng)將 put 方法中向緩存中添加元素的邏輯介紹完了,接下來需要關(guān)注 put 方法中對(duì)已存在的相同 key 值元素的處理邏輯:

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {

    static final int MAX_PUT_SPIN_WAIT_ATTEMPTS = 1024 - 1;

    static final long EXPIRE_WRITE_TOLERANCE = TimeUnit.SECONDS.toNanos(1);
    
    final ConcurrentHashMap> data;
    
    @Nullable
    V put(K key, V value, Expiry expiry, boolean onlyIfAbsent) {
        requireNonNull(key);
        requireNonNull(value);

        Node node = null;
        long now = expirationTicker().read();
        int newWeight = weigher.weigh(key, value);
        Object lookupKey = nodeFactory.newLookupKey(key);
        for (int attempts = 1; ; attempts++) {
            Node prior = data.get(lookupKey);
            if (prior == null) {
                // ... 
            }

            // 元素被讀到之后可能已經(jīng)被驅(qū)逐了
            if (!prior.isAlive()) {
                // 自旋嘗試重新從 ConcurrentHashMap 中獲取,再獲取時(shí)如果為 null 則執(zhí)行新增邏輯
                if ((attempts & MAX_PUT_SPIN_WAIT_ATTEMPTS) != 0) {
                    Thread.onSpinWait();
                    continue;
                }
                // 如果自旋嘗試后元素仍未被刪除,校驗(yàn)元素是否處于存活狀態(tài)
                // 如果處于非存活狀態(tài),那么可能這個(gè)元素已經(jīng)被破壞,無法被移除,拋出異常
                data.computeIfPresent(lookupKey, (k, n) -> {
                    requireIsAlive(key, n);
                    return n;
                });
                continue;
            }

            V oldValue;
            // 新的過期時(shí)間
            long varTime;
            int oldWeight;
            boolean expired = false;
            boolean mayUpdate = true;
            boolean exceedsTolerance = false;
            // 為元素加同步鎖
            synchronized (prior) {
                // 如果此時(shí)元素已經(jīng)失效了,那么需要重新循環(huán)
                if (!prior.isAlive()) {
                    continue;
                }
                oldValue = prior.getValue();
                oldWeight = prior.getWeight();
                // oldValue 為 null 證明它被垃圾回收器回收了
                if (oldValue == null) {
                    // 記錄元素創(chuàng)建后的過期時(shí)間
                    varTime = expireAfterCreate(key, value, expiry, now);
                    // 驅(qū)逐監(jiān)聽器回調(diào)
                    notifyEviction(key, null, RemovalCause.COLLECTED);
                }
                // 如果元素已經(jīng)過期了
                else if (hasExpired(prior, now)) {
                    // 標(biāo)記過期標(biāo)志為 true
                    expired = true;
                    // 記錄元素創(chuàng)建后的過期時(shí)間并回調(diào)驅(qū)逐監(jiān)聽器
                    varTime = expireAftexpireAfterCreateerCreate(key, value, expiry, now);
                    notifyEviction(key, oldValue, RemovalCause.EXPIRED);
                }
                // onlyInAbsent 為 true 時(shí)不會(huì)對(duì)已存在 key 的值進(jìn)行修改
                else if (onlyIfAbsent) {
                    mayUpdate = false;
                    // 記錄元素讀后過期時(shí)間
                    varTime = expireAfterRead(prior, key, value, expiry, now);
                } else {
                    // 記錄元素修改后過期時(shí)間
                    varTime = expireAfterUpdate(prior, key, value, expiry, now);
                }

                // 需要修改原有 key 的 value 值
                if (mayUpdate) {
                    exceedsTolerance =
                            // 配置了寫后過期策略且已經(jīng)超過寫后時(shí)間的容忍范圍
                            (expiresAfterWrite() && (now - prior.getWriteTime()) > EXPIRE_WRITE_TOLERANCE)
                                    // 或者配置了可變時(shí)間過期策略同樣判斷是否超過時(shí)間的容忍范圍
                                    || (expiresVariable() && Math.abs(varTime - prior.getVariableTime()) > EXPIRE_WRITE_TOLERANCE);

                    // 更新值,更新權(quán)重,更新寫時(shí)間
                    prior.setValue(value, valueReferenceQueue());
                    prior.setWeight(newWeight);
                    setWriteTime(prior, now);

                    // 寫后刷新策略失效
                    discardRefresh(prior.getKeyReference());
                }

                // 更新過期時(shí)間
                setVariableTime(prior, varTime);
                // 更新訪問時(shí)間
                setAccessTime(prior, now);
            }

            // 根據(jù)不同的情況回調(diào)不同的監(jiān)聽器
            if (expired) {
                notifyRemoval(key, oldValue, RemovalCause.EXPIRED);
            } else if (oldValue == null) {
                notifyRemoval(key, /* oldValue */ null, RemovalCause.COLLECTED);
            } else if (mayUpdate) {
                notifyOnReplace(key, oldValue, value);
            }

            // 計(jì)算寫后權(quán)重變化
            int weightedDifference = mayUpdate ? (newWeight - oldWeight) : 0;
            // 如果 oldValue 已經(jīng)被回收 或 權(quán)重修改前后發(fā)生變更 或 已經(jīng)過期,添加更新任務(wù)
            if ((oldValue == null) || (weightedDifference != 0) || expired) {
                afterWrite(new UpdateTask(prior, weightedDifference));
            }
            // 如果超過了時(shí)間容忍范圍,添加更新任務(wù)
            else if (!onlyIfAbsent && exceedsTolerance) {
                afterWrite(new UpdateTask(prior, weightedDifference));
            } else {
                // 沒有超過時(shí)間容忍范圍,更新寫時(shí)間
                if (mayUpdate) {
                    setWriteTime(prior, now);
                }
                // 處理讀后操作
                afterRead(prior, now, /* recordHit */ false);
            }

            return expired ? null : oldValue;
        }
    }
}

對(duì)于已有元素的變更,會(huì)對(duì)節(jié)點(diǎn)添加同步鎖,更新它的權(quán)重等一系列變量,如果超過 1s 的時(shí)間容忍范圍,則會(huì)添加 UpdateTask 更新任務(wù),至于處理讀后操作 afterRead 在讀方法中再去介紹。接下來我們需要重新再看一下 afterWrite 方法,其中有部分我們?cè)谏衔闹袥]有介紹的邏輯:

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {

    final ReentrantLock evictionLock;
    
    void afterWrite(Runnable task) {
        // 這段邏輯我們?cè)诳?AddTask 的邏輯時(shí)已經(jīng)看過了,所以略過
        for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {
            if (writeBuffer.offer(task)) {
                scheduleAfterWrite();
                return;
            }
            scheduleDrainBuffers();
            Thread.onSpinWait();
        }

        // 以下邏輯用于解決在重試了 100 次后仍然寫入失敗的問題,它會(huì)嘗試獲取 evictionLock 同步鎖
        // 直接同步執(zhí)行“維護(hù)”方法并執(zhí)行當(dāng)前任務(wù),但是它并無法解決某個(gè)寫入操作執(zhí)行時(shí)間很長的問題
        // 發(fā)生這種情況的原因可能是由于執(zhí)行器的所有線程都很忙(可能是寫入此緩存),寫入速率大大超過了消耗速率,優(yōu)先級(jí)反轉(zhuǎn),或者執(zhí)行器默默地丟棄了維護(hù)任務(wù)
        lock();
        try {
            maintenance(task);
        } catch (RuntimeException e) {
            logger.log(Level.ERROR, "Exception thrown when performing the maintenance task", e);
        } finally {
            evictionLock.unlock();
        }
        // 重新調(diào)度異步維護(hù)任務(wù),確保維護(hù)操作能及時(shí)執(zhí)行
        rescheduleCleanUpIfIncomplete();
    }

    void lock() {
        long remainingNanos = WARN_AFTER_LOCK_WAIT_NANOS;
        long end = System.nanoTime() + remainingNanos;
        boolean interrupted = false;
        try {
            for (;;) {
                try {
                    if (evictionLock.tryLock(remainingNanos, TimeUnit.NANOSECONDS)) {
                        return;
                    }
                    logger.log(Level.WARNING, "The cache is experiencing excessive wait times for acquiring "
                            + "the eviction lock. This may indicate that a long-running computation has halted "
                            + "eviction when trying to remove the victim entry. Consider using AsyncCache to "
                            + "decouple the computation from the map operation.", new TimeoutException());
                    evictionLock.lock();
                    return;
                } catch (InterruptedException e) {
                    remainingNanos = end - System.nanoTime();
                    interrupted = true;
                }
            }
        } finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    // 調(diào)用同步的維護(hù)方法時(shí),可能發(fā)生獲取鎖超時(shí),那么再重新開啟一個(gè)異步維護(hù)調(diào)度
    void rescheduleCleanUpIfIncomplete() {
        // 校驗(yàn)是否有任務(wù)需要被執(zhí)行
        if (drainStatusOpaque() != REQUIRED) {
            return;
        }
        
        // 默認(rèn)線程池調(diào)度任務(wù)執(zhí)行,這個(gè)方法我們?cè)谏衔闹幸呀?jīng)詳細(xì)介紹過
        if (executor == ForkJoinPool.commonPool()) {
            scheduleDrainBuffers();
            return;
        }
        
        // 如果自定義了線程池,那么會(huì)使用自定義的線程池進(jìn)行處理
        var pacer = pacer();
        if ((pacer != null) && !pacer.isScheduled() && evictionLock.tryLock()) {
            try {
                if ((drainStatusOpaque() == REQUIRED) && !pacer.isScheduled()) {
                    pacer.schedule(executor, drainBuffersTask, expirationTicker().read(), Pacer.TOLERANCE);
                }
            } finally {
                evictionLock.unlock();
            }
        }
    }
}

寫后操作除了在添加任務(wù)到緩沖區(qū)成功后會(huì)執(zhí)行維護(hù)方法,添加失?。ㄗC明寫入操作非常頻繁)依然會(huì)嘗試同步執(zhí)行維護(hù)方法和發(fā)起異步維護(hù),用于保證緩存中的任務(wù)能夠被及時(shí)執(zhí)行,使緩存中元素都處于“預(yù)期”狀態(tài)中。接下來我們?cè)诳匆幌?UpdateTask 更新任務(wù)的邏輯:

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {

    final class UpdateTask implements Runnable {
        final int weightDifference;
        final Node node;

        public UpdateTask(Node node, int weightDifference) {
            this.weightDifference = weightDifference;
            this.node = node;
        }

        @Override
        @GuardedBy("evictionLock")
        public void run() {
            // 寫后過期和自定義過期邏輯
            if (expiresAfterWrite()) {
                reorder(writeOrderDeque(), node);
            } else if (expiresVariable()) {
                timerWheel().reschedule(node);
            }
            // 指定了驅(qū)逐策略
            if (evicts()) {
                // 變更節(jié)點(diǎn)權(quán)重
                int oldWeightedSize = node.getPolicyWeight();
                node.setPolicyWeight(oldWeightedSize + weightDifference);
                // 如果是窗口區(qū)節(jié)點(diǎn)
                if (node.inWindow()) {
                    // 更新窗口區(qū)權(quán)重
                    setWindowWeightedSize(windowWeightedSize() + weightDifference);
                    // 節(jié)點(diǎn)權(quán)重超過最大權(quán)重限制,直接驅(qū)逐
                    if (node.getPolicyWeight() > maximum()) {
                        evictEntry(node, RemovalCause.SIZE, expirationTicker().read());
                    }
                    // 節(jié)點(diǎn)權(quán)重比窗口區(qū)最大值小
                    else if (node.getPolicyWeight() <= windowMaximum()) {
                        onAccess(node);
                    }
                    // 窗口區(qū)包含該節(jié)點(diǎn)且該節(jié)點(diǎn)的權(quán)重大于窗口最大權(quán)重,則放到頭節(jié)點(diǎn)
                    else if (accessOrderWindowDeque().contains(node)) {
                        accessOrderWindowDeque().moveToFront(node);
                    }
                }
                // 如果是試用區(qū)節(jié)點(diǎn)
                else if (node.inMainProbation()) {
                    // 節(jié)點(diǎn)權(quán)重比最大權(quán)重限制小
                    if (node.getPolicyWeight() <= maximum()) {
                        onAccess(node);
                    }
                    // 否則將該節(jié)點(diǎn)驅(qū)逐
                    else {
                        evictEntry(node, RemovalCause.SIZE, expirationTicker().read());
                    }
                }
                // 如果是保護(hù)區(qū)節(jié)點(diǎn)
                else if (node.inMainProtected()) {
                    // 更新保護(hù)區(qū)權(quán)重
                    setMainProtectedWeightedSize(mainProtectedWeightedSize() + weightDifference);
                    // 同樣的邏輯
                    if (node.getPolicyWeight() <= maximum()) {
                        onAccess(node);
                    } else {
                        evictEntry(node, RemovalCause.SIZE, expirationTicker().read());
                    }
                }

                // 更新緩存權(quán)重大小
                setWeightedSize(weightedSize() + weightDifference);
                // 更新完成后超過最大權(quán)重限制執(zhí)行驅(qū)逐操作
                if (weightedSize() > MAXIMUM_CAPACITY) {
                    evictEntries();
                }
            }
            // 配置了訪問后過期
            else if (expiresAfterAccess()) {
                onAccess(node);
            }
        }
    }

    @GuardedBy("evictionLock")
    void onAccess(Node node) {
        if (evicts()) {
            K key = node.getKey();
            if (key == null) {
                return;
            }
            // 更新訪問頻率
            frequencySketch().increment(key);
            // 如果節(jié)點(diǎn)在窗口區(qū),則將其移動(dòng)到尾節(jié)點(diǎn)
            if (node.inWindow()) {
                reorder(accessOrderWindowDeque(), node);
            }
            // 在試用區(qū)的節(jié)點(diǎn)執(zhí)行 reorderProbation 方法,可能會(huì)將該節(jié)點(diǎn)從試用區(qū)晉升到保護(hù)區(qū)
            else if (node.inMainProbation()) {
                reorderProbation(node);
            }
            // 否則移動(dòng)到保護(hù)區(qū)的尾結(jié)點(diǎn)
            else {
                reorder(accessOrderProtectedDeque(), node);
            }
            // 更新命中量
            setHitsInSample(hitsInSample() + 1);
        }
        // 配置了訪問過期策略
        else if (expiresAfterAccess()) {
            reorder(accessOrderWindowDeque(), node);
        }
        // 配置了自定義時(shí)間過期策略
        if (expiresVariable()) {
            timerWheel().reschedule(node);
        }
    }

    static  void reorder(LinkedDeque> deque, Node node) {
        // 如果節(jié)點(diǎn)存在,將其移動(dòng)到尾結(jié)點(diǎn)
        if (deque.contains(node)) {
            deque.moveToBack(node);
        }
    }

    @GuardedBy("evictionLock")
    void reorderProbation(Node node) {
        // 檢查試用區(qū)是否包含該節(jié)點(diǎn),不包含則證明已經(jīng)被移除,則不處理
        if (!accessOrderProbationDeque().contains(node)) {
            return;
        }
        // 檢查節(jié)點(diǎn)的權(quán)重是否超過保護(hù)區(qū)最大值
        else if (node.getPolicyWeight() > mainProtectedMaximum()) {
            // 如果超過,將該節(jié)點(diǎn)移動(dòng)到 試用區(qū) 尾巴節(jié)點(diǎn),保證超重的節(jié)點(diǎn)不會(huì)被移動(dòng)到保護(hù)區(qū)
            reorder(accessOrderProbationDeque(), node);
            return;
        }

        // 更新保護(hù)區(qū)權(quán)重大小
        setMainProtectedWeightedSize(mainProtectedWeightedSize() + node.getPolicyWeight());
        // 在試用區(qū)中移除該節(jié)點(diǎn)
        accessOrderProbationDeque().remove(node);
        // 在保護(hù)區(qū)尾節(jié)點(diǎn)中添加
        accessOrderProtectedDeque().offerLast(node);
        // 將該節(jié)點(diǎn)標(biāo)記為保護(hù)區(qū)節(jié)點(diǎn)
        node.makeMainProtected();
    }
}

UpdateTask 修改任務(wù)負(fù)責(zé)變更權(quán)重值,并更新節(jié)點(diǎn)所在隊(duì)列的順序和訪問頻率,這里我們也能發(fā)現(xiàn),這三個(gè)區(qū)域的隊(duì)列采用了 LRU 算法,一般情況下,最新被訪問的元素會(huì)被移動(dòng)到尾節(jié)點(diǎn)。到現(xiàn)在,向有固定容量限制的緩存中調(diào)用 put 方法添加元素的邏輯基本已經(jīng)介紹完了,目前對(duì) Caffeine 緩存的了解程度如下所示:

wKgZO2iRqWiAfY6aABD4WGFvKZk608.png

put 添加元素時(shí)會(huì)先直接添加到 ConcurrentHashMap 中,并在 WriteBuffer 中添加 AddTask/UpdateTask 任務(wù),WriteBuffer 是一個(gè) MPSC 的緩沖區(qū),添加成功后會(huì)有加鎖的同步機(jī)制在默認(rèn)的 ForkJoinPool.commonPool() 線程池中提交 PerformCleanupTask 任務(wù),PerformCleanupTask 任務(wù)的主要作用是執(zhí)行 maintenance 維護(hù)方法,該方法執(zhí)行前需要先獲取同步鎖,單線程消費(fèi) WriteBuffer 中的任務(wù)。執(zhí)行 AddTask 任務(wù)時(shí)會(huì)將元素先添加到窗口區(qū),如果是 UpdateTask,它會(huì)修改三個(gè)不同區(qū)域的雙端隊(duì)列,這些隊(duì)列采用LRU算法,最新被訪問的元素會(huì)被放在尾節(jié)點(diǎn)處,并且試用區(qū)的元素被訪問后會(huì)被晉升到保護(hù)區(qū)尾節(jié)點(diǎn),元素對(duì)應(yīng)的訪問頻率也會(huì)在頻率草圖中更新,如果被添加的節(jié)點(diǎn)權(quán)重超過緩存最大權(quán)重會(huì)直接被驅(qū)逐。(目前維護(hù)方法中除了 drainWriteBuffer 方法外,其他步驟還未詳細(xì)解釋,之后會(huì)在后文中不斷完善)


審核編輯 黃宇

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • 算法
    +關(guān)注

    關(guān)注

    23

    文章

    4750

    瀏覽量

    97009
  • 源碼
    +關(guān)注

    關(guān)注

    8

    文章

    679

    瀏覽量

    30968
收藏 人收藏
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

    評(píng)論

    相關(guān)推薦
    熱點(diǎn)推薦

    緩存:從根理解 ConcurrentHashMap

    本文將詳細(xì)介紹 ConcurrentHashMap 構(gòu)造方法、添加值方法和擴(kuò)容操作等源碼實(shí)現(xiàn)。 ConcurrentHashMap 是線程安全的哈希表,此哈希表的設(shè)計(jì)主要目的是在最小化更新操作對(duì)哈希
    的頭像 發(fā)表于 08-05 14:48 ?344次閱讀

    本地緩存 Caffeine 中的時(shí)間輪(TimeWheel)是什么?

    我們?cè)敿?xì)介紹了 Caffeine 緩存添加元素和讀取元素的流程,并詳細(xì)解析了配置固定元素?cái)?shù)量驅(qū)逐策略的實(shí)現(xiàn)原理。在本文中我們將主要介紹 配置元素過期時(shí)間策略的實(shí)現(xiàn)原理 ,補(bǔ)全
    的頭像 發(fā)表于 08-05 14:48 ?427次閱讀
    本地<b class='flag-5'>緩存</b> <b class='flag-5'>Caffeine</b> 中的時(shí)間輪(TimeWheel)是什么?

    詳解絕緣體硅技術(shù)

    絕緣體硅(SOI)技術(shù)作為硅基集成電路領(lǐng)域的重要分支,其核心特征在于通過埋氧層(BOX)實(shí)現(xiàn)有源層與襯底的電學(xué)隔離,從而賦予場效應(yīng)晶體管獨(dú)特的電學(xué)特性。
    的頭像 發(fā)表于 07-28 15:27 ?1346次閱讀
    一<b class='flag-5'>文</b><b class='flag-5'>詳解</b>絕緣體<b class='flag-5'>上</b>硅技術(shù)

    harmony-utilsCacheUtil,緩存工具類

    harmony-utilsCacheUtil,緩存工具類
    的頭像 發(fā)表于 07-04 16:36 ?278次閱讀

    harmony-utilsLRUCacheUtil,LRUCache緩存工具類

    harmony-utilsLRUCacheUtil,LRUCache緩存工具類 harmony-utils 簡介與說明 harmony-utils 一款功能豐富且極易上手的HarmonyOS工具庫
    的頭像 發(fā)表于 07-03 18:11 ?329次閱讀

    高性能緩存設(shè)計(jì):如何解決緩存偽共享問題

    緩存行,引發(fā)無效化風(fēng)暴,使看似無關(guān)的變量操作拖慢整體效率。本文從緩存結(jié)構(gòu)原理出發(fā),通過實(shí)驗(yàn)代碼復(fù)現(xiàn)偽共享問題(耗時(shí)從3709ms優(yōu)化至473ms),解析其底層機(jī)制;同時(shí)深入剖析高性能緩存C
    的頭像 發(fā)表于 07-01 15:01 ?444次閱讀
    高性能<b class='flag-5'>緩存</b>設(shè)計(jì):如何解決<b class='flag-5'>緩存</b>偽共享問題

    請(qǐng)問如何在C++中使用NPU的模型緩存

    無法確定如何在 C++ 中的 NPU 使用模型緩存
    發(fā)表于 06-24 07:25

    MCU緩存設(shè)計(jì)

    MCU 設(shè)計(jì)通過優(yōu)化指令與數(shù)據(jù)的訪問效率,顯著提升系統(tǒng)性能并降低功耗,其核心架構(gòu)與實(shí)現(xiàn)策略如下: 一、緩存類型與結(jié)構(gòu) 指令緩存(I-Cache)與數(shù)據(jù)緩存(D-Cache)? I-Ca
    的頭像 發(fā)表于 05-07 15:29 ?732次閱讀

    Nginx緩存配置詳解

    Nginx 是一個(gè)功能強(qiáng)大的 Web 服務(wù)器和反向代理服務(wù)器,它可以用于實(shí)現(xiàn)靜態(tài)內(nèi)容的緩存,緩存可以分為客戶端緩存和服務(wù)端緩存
    的頭像 發(fā)表于 05-07 14:03 ?903次閱讀
    Nginx<b class='flag-5'>緩存</b>配置<b class='flag-5'>詳解</b>

    nginx中強(qiáng)緩存和協(xié)商緩存介紹

    強(qiáng)緩存直接告訴瀏覽器:在緩存過期前,無需與服務(wù)器通信,直接使用本地緩存。
    的頭像 發(fā)表于 04-01 16:01 ?656次閱讀

    詳解天神眼C三目方案,跟大疆“撞車”了?

    電子發(fā)燒友網(wǎng)報(bào)道(/梁浩斌)最近比亞迪推出的“天神眼”高階智駕系統(tǒng)引爆了行業(yè),將高階智駕從過去的20左右價(jià)格,大幅下放至10級(jí)別,甚至7.88
    的頭像 發(fā)表于 02-14 01:28 ?4385次閱讀

    ADS4129后級(jí)接緩存器,緩存器出現(xiàn)過熱的原因?

    ,焊接沒有問題,同時(shí)也注意了緩存器方向問題,AD轉(zhuǎn)換數(shù)據(jù)輸出也有;電不工作時(shí)也發(fā)燙,想請(qǐng)教各位其中的原因可能是什么呢?謝謝給位了?。?!
    發(fā)表于 02-07 08:42

    HTTP緩存頭的使用 本地緩存與遠(yuǎn)程緩存的區(qū)別

    HTTP緩存頭是一組HTTP響應(yīng)頭,它們控制瀏覽器和中間代理服務(wù)器如何緩存網(wǎng)頁內(nèi)容。合理使用HTTP緩存頭可以顯著提高網(wǎng)站的加載速度和性能,減少服務(wù)器的負(fù)載。 1. HTTP緩存頭概述
    的頭像 發(fā)表于 12-18 09:41 ?761次閱讀

    什么是緩存(Cache)及其作用

    緩存(Cache)是一種高速存儲(chǔ)器,用于臨時(shí)存儲(chǔ)數(shù)據(jù),以便快速訪問。在計(jì)算機(jī)系統(tǒng)中,緩存的作用是減少處理器訪問主存儲(chǔ)器(如隨機(jī)存取存儲(chǔ)器RAM)所需的時(shí)間。 緩存(Cache)概述 緩存
    的頭像 發(fā)表于 12-18 09:28 ?1.5w次閱讀

    緩存——如何選擇合適的本地緩存?

    Guava cache是Google開發(fā)的Guava工具包中一套完善的JVM本地緩存框架,底層實(shí)現(xiàn)的數(shù)據(jù)結(jié)構(gòu)類似于ConcurrentHashMap,但是進(jìn)行了更多的能力拓展,包括緩存過期時(shí)間設(shè)置、
    的頭像 發(fā)表于 11-17 14:24 ?1035次閱讀
    <b class='flag-5'>緩存</b><b class='flag-5'>之</b><b class='flag-5'>美</b>——如何選擇合適的本地<b class='flag-5'>緩存</b>?