分布式应用缓存

缓存是解决分布式提供高可用的利器之一,它可以大幅度的提升系统的查询效率。在大多数场景下,系统查询的次数会远远大于写的次数,这也是被称作利器的原因之一。

缓存命中率

缓存命中率是指:缓存命中次数与总读取次数的比率。(缓存命中率=从缓存中读取的次数/总读取次数)一般情况下,缓存命中率越高,其支撑高可用系统的效果就越好。

缓存回收策略

基于空间

基于空间是指设置了缓存存储空间的大小,如设置为100M。当存储空间达到上限时,就会按照一定的策略移除数据。

基于容量

基于容量是指设置了缓存条目的个数大小。当超过预定条数时,则按照一定的策略移除旧数据。

基于时间

TTL(Time To Live):存活期,意思就是给缓存设置一个固定的存活时间,到时间后立即将缓存删除。

TTI(Time To Idle):空闲期,意思就是缓存多久没有被访问后,被移除缓存的时间。

基于Java对象引用

软引用:在当JVM内存不足时,GC可以回收软引用,从而可以在一定程度上避免OOM。

弱引用:JVM在进行GC时,如果发现弱引用,就会立即将其回收。

回收算法

FIFO(First In First Out):先进先出的原则。

LRU(Least Recently Used):最近最少使用算法,使用时间距离现在最远的缓存被移除。

LFU(Least Frequently Used):一定时间内使用次数最少的缓存被移除。

Java缓存类型

堆内存:当我们在写代码时,创建了一个Map时,这个Map就可以认为是堆内缓存。堆内存的好处就是,没有序列化/反序列化,读取的速度比较快。但是缺点也是很明显的,就是当数据量比较大时,GC暂停的时间也会变长,而且存储的大小受限于JVM的内存大小。一般通过软引用/弱引用来存储对象,在JVM内存不足时,可以及时的回收内存。可以使用Guava Cache、Ehcache等实现。

非堆内存:意思就是缓存存储在堆外,不收JVM内存限制,可以减少GC的暂停时间。但是读/写数据时,需要对数据进行序列化/反序列化,因此会比堆内存慢很多。可以使用Ehcache等实现。

磁盘缓存:磁盘缓存就是将数据存储在磁盘上,JVM重启时数据还是存在的。

分布式缓存:在多台应用服务器上,如果缓存是单独存储的,其命中率有可能会大幅度下降,所以分布式缓存应运而生。一般情况下,我们不会自己去实现分布式缓存,而是直接使用第三方中间件或者自研中间件,如Redis等。

堆内存

Guava Cache实现

Guava Cache只提供了堆内存,小巧灵活,性能比较好。如果只需要使用堆内存,使用它就够了。

Cache<String, String> myCache = CacheBuilder.newBuilder().concurrencyLevel(4).expireAfterWrite(10, TimeUnit.SECONDS).maximumSize(10000).build();
myCache.put(key, value);
myCache.getIfPresent(key);

CacheBuilder有几类参数:缓存回收策略、并发设置、统计命中率等。

回收策略(基于容量)

maximumSize:设置缓存的容量,当超出maximumSize时,就会按照LRU进行回收。

回收策略(基于时间)

expireAfterWrite:设置TTL,缓存在给定时间内没有写(创建/覆盖)时,则被回收。

expireAfterAccess:设置TTI,设置缓存数据在给定的时间内没有读写时,则被回收。每次访问时,都会更新TTI。如果数据是非常热的,则永远都不会失效,但这有可能会造成过期数据,这个在使用时需要留意一下。

回收策略(对象引用)

weakKeys/weakValues:设置弱引用缓存。

softValues:设置软引用缓存。

回收策略(主动失效/删除)

invalidate(Object key)/invalidateAll(Iterable keys)/invalidateAll():主动删除缓存。

并发级别

concurrencyLevel:Guava Cache重写了ConcurrentHashMap,concurrencyLevel用来设置Segment数量。

统计命中率

recordStats:启动记录统计信息,比如命中率等。

Ehcache实现

CacheManager cacheManager = CacheManagerBuilder.newCacheManagerBuilder().build(true);
CacheConfiguationBuilder.newCacheConfigurationBuilder(String.class, String.class, ResourcePoolsBuilder.newResourcePoolsBuilder().heap(100, EntryUnit.ENTRIES))
    .withDispatcherConcurrency(4)
    .withExpiry(Expirations.timeToLiveExpiration(Duration.of(10, TimeUnit.SECONDS)));
Cache<String, String> myCache = cacheManager.createCache("myCache", cacheConfig);

CacheManager在JVM关闭时调用CacheManager.close()方法,可以通过PUT、GET来读写缓存。CacheConfigurationBuilder也有几类参数:缓存回收策略、并发设置、统计命中率等。

回收策略(基于容量)

heap(100, EntryUnit.ENTRIES):设置缓存条目的数量,当超出数量时,按照LRU进行回收。

回收策略(基于空间)

heap(100, MemoryUnit.MB):设置缓存的内存空间,超出时按照LRU进行回收。另外还需要设置:withSizeOfMaxObjectGraph(2)统计对象大小时,对象图遍历深度和withSizeOfMaxObjectSize(1, MemoryUnit.KB)来设置最大的对象大小。

回收策略(基于时间)

withExpiry(Expirations.timeToLiveExpiration(Duration.of(10, TimeUnit.SECONDS))):设置TTL。

withExpiry(Expirations.timeToIdleExpiration(Duration.of(10, TimeUnit.SECONDS))):同时设置TTL和TTI,并且2个值相同。

回收策略(主动失效)

remove(K key)/removeAll(Set keys)/clear():主动失效某些缓存数据。

并发级别

Ehcache使用ConcurrentHashMap作为缓存存储,并发级别为16。withDispatcherConcurrency是用来设置时间分发时的并发级别。

非堆内存

CacheConfigurationBuilder<String, String> cacheConfig = CacheConfigurationBuilder.newCacheConfigurationBuilder(String.class, String.class, ResourcePoolsBuilder.newResourcePoolsBuilder().offheap(100, MemoryUnit.MB))
    .withDispatcherConcurrency(4)
    .withExpiry(Expirations.timeToLiveExpiration(Duration.of(10, TimeUnit.SECONDS)))
    .withSizeOfMaxObjectGraph(3)
    .withSizeOfMaxObjectSize(1, MemoryUnit.KB);

非堆内存不支持基于容量的缓存策略。

磁盘缓存

EhCache实现

CacheManager cacheManager = CacheManagerBuilder.newCacheManagerBuilder()
    // 默认线程池
    .using(PooledExecutionServiceConfigurationBuilder
        .newPooledExecutionServiceConfigurationBuilder()
        .defaultPool("default", 1, 10).build())
    // 磁盘文件存储位置
    .with(new CacheManagerPersistenceConfiguration(new File("/usr/data")))
    .build(true);
CacheConfigurationBuilder<String, String> cacheConfig = CacheConfigurationBuilder.newCacheConfigurationBuilder(String.class, String.class, ResourcePoolsBuilder.newResourcePoolsBuilder().disk(100, MemoryUnit.MB, true)) // 磁盘缓存
    .withDiskStoreThreadPool("default", 5) // 使用"default"线程池dump文件到磁盘
    .withExpiry(Expirations.timeToLiveExpiration(Duration.of(50, TimeUnit.SECONDS)))
    .withSizeOfMaxObjectGraph(3)
    .withSizeOfMaxObjectSize(1, MemoryUnit.KB);

当JVM停止时,需要使用cacheManager.close(),从而可以保证内存数据可以dump到磁盘上。

分布式缓存

分布式缓存一般情况下不会自己去实现,而是使用第三方软件/自研软件。

多级缓存

简单来说就是分级去查找缓存数据,这里面后面详细介绍。

示例代码

多级缓存封装

通常情况下,我们在C端应用上面都会使用多级缓存,以减少请求回源至DB上面的频率。当请求进入到应用服务器后,通常的做法是:首先看本地缓存是否有数据,若无则请求分布式缓存,再若无则请求DB数据。存储请求的速度:本地缓存 > 分布式 > DB。

本地缓存初始化

public class LocalCacheInitService extends BaseService {
    
    @Override
    public void afterPropertiesSet() throws Exception {
        // 商品类目缓存
        Cache<String, Object> categoryCache = CacheBuilder.newBuilder().softValues()
            .maximumSize(1000000)
            .expireAfterWrite(Switches.CATEGORY.getExpiresInseconds() / 2, TimeUnit.SECONDS)
            .build();
        addCache(CacheKeys.CATEGORY_KEY, categoryCache);
    }
    
    private void addCache(String key, Cache<?, ?> Cache) {
        localCacheService.addCache(key, cache);
    }
    
}

上面的本地缓存过期时间是分布式缓存的一半,这样可以防止本地缓存时间过长,造成多实例的数据不一致。

写缓存封装

缓存先写入本地,然后异步的更新分布式缓存。

public void set (final String key, final Object value, final int remoteCacheExpiresInSeconds) throws RuntimeException {
    if (value == null) {
        return ;
    }

    // 复制value值:(可以避免写入的过程中,Value值被修改,从而造成本地和分布式缓存数据不一致)
    final Object finalValue = copy(value);
    // 如果配置了写本地,则根据KEY获得本地缓存,然后写入
    if (writeLocalCache) {
        Cache localCache = getLocalCache(key);
        if (localCache != null) {
            localCache.put(key, finalValue);
        }
    }
    // 如果配置了分布式缓存,则直接返回
    if (!writeRemoteCache) {
        return;
    }
    // 异步更新分布式缓存
    asyncTaskExecutor.execute(() -> {
        try {
            redisCache.set(key, JSON.toJSONString(finalValue), remoteCacheExpiresInSeconds);
        } catch (Exception e) {
            LOGGER.error("update redis cache error, key = {}, e = {}", key, e);
        }
    });
    
}

读缓存封装

先读取本地缓存,若不命中则批量查询分布式缓存(分区批量查询)。当然这里面的例子只是Demo,在实际应用时,需要根据自己的场景去设计。

private Map innerMget (List<String> keys, List<Class> types) throws Exception {
	Map<String, Object> result = Maps.newHashMap();
	List<String> missKeys = Lists.newArrayList();
	List<Class> missTypes = Lists.newArrayList();
	// 如果配置了读本地缓存,则先读取本地
	if (readLocalCache) {
		for (int i = 0; i < keys.size(); i++) {
			String key = keys.get(i);
			Class type = types.get(i);
			Cache localCache = getLocalCache(key);
			if (localCache != null) {
				Object value = localCache.getIfPresent(key);
				result.put(key, value);
				if (value == null) {
					missKeys.add(key);
					missTypes.add(type);
				}
			} else {
				missKeys.add(key);
				missTypes.add(type);
			}
		}
	}
	// 如果配置了不读取分布式缓存,则返回
	if (!readRemoteCache) {
		return result;
	}
	
	final Map<String, String> missResult = Maps.newHashMap();
	
	// 对key分区,不要一次性批量调用太大
	final List<List<String>> keysPage = Lists.partition(missKeys, 10);
	List<Future<Map<String, String>>> pageFutures = Lists.newArrayList();
	
	try {
		// 批量获取分布式缓存
		for (final List<String> partitionKeys : keysPage) {
			pageFutures.add(asyncTaskExecutor.submit(() -> redisCache.mget(partitionKeys)));
		}
		for (Future<Map<String, String>> future : pageFutures) {
			missResult.putAll(future.get(3000, TimeUnit.MILLISECONDS));
		}
	} catch (Exception e) {
		pageFutures.forEach(future -> future.cancel(true));
		throw e;
	}
	// 合并result和missResult, 此处略
	return result;
}

NULL Cache

NULL Cache的意思就是:当缓存KEY在DB中不存在时,就在缓存中存储为NULL(如:"None"或""字符等)。从而可以减少这类KEY的回源次数,减少DB的压力。


参考:《亿级流量网站架构核心技术》