spring cache @Cacheable缓存穿透
chengbinbbs 人气:0最近发现线上监控有个SQL调用量很大,但是方法的调用量不是很大,查看接口实现,发现接口是做了缓存操作的,使用Spring cache缓存注解结合tair实现缓存操作。
但是为啥SQL调用量这么大,难道缓存没有生效。测试发现缓存是正常的,分析了代码发现,代码存在缓存穿透的风险。
具体注解是这样的
@Cacheable(value = "storeDeliveryCoverage", key = "#sellerId + '|' + #cityCode", unless = "#result == null")
unless = "#result == null"表明接口返回值不为空的时候才缓存,如果线上有大量不合法的请求参数过来,由于为空的不会缓存起来,每次请求都打到DB上,导致DB的sql调用量巨大,给了黑客可乘之机,风险还是很大的。
找到原因之后就修改,查询结果为空的时候兜底一个null,把这句unless = "#result == null"条件去掉测试了一下,发现为空的话还是不会缓存。于是debug分析了一波源码,终于发现原来是tair的问题。
由于tair自身的特性,无法缓存null。既然无法缓存null,那我们就兜底一个空对象进去,取出来的时候把空对象转化为null。
基于这个思路我把Cache的实现改造了一下
@Override public void put(Object key, Object value) { if (value == null) { // 为空的话,兜底一个空对象,防止缓存穿透(由于tair自身特性不允许缓存null对象的原因,这里缓存一个空对象) value = new Nil(); } if (value instanceof Serializable) { final String tairKey = String.format("%s:%s", this.name, key); final ResultCode resultCode = this.tairManager.put( this.namespace, tairKey, (Serializable) value, 0, this.timeout ); if (resultCode != ResultCode.SUCCESS) { TairSpringCache.log.error( String.format( "[CachePut]: unable to put %s => %s into tair due to: %s", key, value, resultCode.getMessage() ) ); } } else { throw new RuntimeException( String.format( "[CachePut]: value %s is not Serializable", value ) ); } }
Nil类默认是一个空对象,这里给了个内部类:
static class Nil implements Serializable { private static final long serialVersionUID = -9138993336039047508L; }
取缓存的get方法实现
@Override public ValueWrapper get(Object key) { final String tairKey = String.format("%s:%s", this.name, key); final Result<DataEntry> result = this.tairManager.get(this.namespace, tairKey); if (result.isSuccess() && (result.getRc() == ResultCode.SUCCESS)) { final Object obj = result.getValue().getValue(); // 缓存为空兜底的是Nil对象,这里返回的时候需要转为null if (obj instanceof Nil) { return null; } return () -> obj; } return null; }
改好了之后,测试一下,结果发现还是没有生效,缓存没有兜底,请求都打到DB上了。
debug走一遍,看了下Cache的源码,终于发现关键问题所在(具体实现流程参考上一篇:Spring Cache- 缓存拦截器( CacheInterceptor)):
private Object execute(final CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) { // Special handling of synchronized invocation if (contexts.isSynchronized()) { CacheOperationContext context = contexts.get(CacheableOperation.class).iterator().next(); if (isConditionPassing(context, CacheOperationExpressionEvaluator.NO_RESULT)) { Object key = generateKey(context, CacheOperationExpressionEvaluator.NO_RESULT); Cache cache = context.getCaches().iterator().next(); try { return wrapCacheValue(method, cache.get(key, new Callable<Object>() { @Override public Object call() throws Exception { return unwrapReturnValue(invokeOperation(invoker)); } })); } catch (Cache.ValueRetrievalException ex) { // The invoker wraps any Throwable in a ThrowableWrapper instance so we // can just make sure that one bubbles up the stack. throw (CacheOperationInvoker.ThrowableWrapper) ex.getCause(); } } else { // No caching required, only call the underlying method return invokeOperation(invoker); } } // 处理beforeIntercepte=true的缓存删除操作 processCacheEvicts(contexts.get(CacheEvictOperation.class), true, CacheOperationExpressionEvaluator.NO_RESULT); // 从缓存中查找,是否有匹配@Cacheable的缓存数据 Cache.ValueWrapper cacheHit = findCachedItem(contexts.get(CacheableOperation.class)); // 如果@Cacheable没有被缓存,那么就需要将数据缓存起来,这里将@Cacheable操作收集成CachePutRequest集合,以便后续做@CachePut缓存数据存放。 List<CachePutRequest> cachePutRequests = new LinkedList<CachePutRequest>(); if (cacheHit == null) { collectPutRequests(contexts.get(CacheableOperation.class), CacheOperationExpressionEvaluator.NO_RESULT, cachePutRequests); } Object cacheValue; Object returnValue; //如果没有@CachePut操作,就使用@Cacheable获取的结果(可能也没有@Cableable,所以result可能为空)。 if (cacheHit != null && cachePutRequests.isEmpty() && !hasCachePut(contexts)) { //如果没有@CachePut操作,并且cacheHit不为空,说明命中缓存了,直接返回缓存结果 cacheValue = cacheHit.get(); returnValue = wrapCacheValue(method, cacheValue); } else { // 否则执行具体方法内容,返回缓存的结果 returnValue = invokeOperation(invoker); cacheValue = unwrapReturnValue(returnValue); } // Collect any explicit @CachePuts collectPutRequests(contexts.get(CachePutOperation.class), cacheValue, cachePutRequests); // Process any collected put requests, either from @CachePut or a @Cacheable miss for (CachePutRequest cachePutRequest : cachePutRequests) { cachePutRequest.apply(cacheValue); } // Process any late evictions processCacheEvicts(contexts.get(CacheEvictOperation.class), false, cacheValue); return returnValue; }
根据key从缓存中查找,返回的结果是ValueWrapper,它是返回结果的包装器:
private Cache.ValueWrapper findCachedItem(Collection<CacheOperationContext> contexts) { Object result = CacheOperationExpressionEvaluator.NO_RESULT; for (CacheOperationContext context : contexts) { if (isConditionPassing(context, result)) { Object key = generateKey(context, result); Cache.ValueWrapper cached = findInCaches(context, key); if (cached != null) { return cached; } else { if (logger.isTraceEnabled()) { logger.trace("No cache entry for key '" + key + "' in cache(s) " + context.getCacheNames()); } } } } return null; }
private Cache.ValueWrapper findInCaches(CacheOperationContext context, Object key) { for (Cache cache : context.getCaches()) { Cache.ValueWrapper wrapper = doGet(cache, key); if (wrapper != null) { if (logger.isTraceEnabled()) { logger.trace("Cache entry for key '" + key + "' found in cache '" + cache.getName() + "'"); } return wrapper; } } return null; }
这里判断缓存是否命中的逻辑是根据cacheHit是否为空,而cacheHit是ValueWrapper类型,查看ValueWrapper是一个接口,它的实现类是SimpleValueWrapper,这是一个包装器,将缓存的结果包装起来了。
而我们前面的get方法取缓存的时候如果为Nil对象,返回的是null,这样缓存判断出来是没有命中,即cacheHit==null,就会去执行具体方法朔源。
所以到这里已经很清晰了,关键问题是get取缓存的结果如果是兜底的Nil对象,应该返回new SimpleValueWrapper(null)。
应该返回包装器,包装的是缓存的对象为null。
测试了一下,发现ok了
具体源码如下:
/** * 基于tair的缓存,适配spring缓存框架 */ public class TairSpringCache implements Cache { private static final Logger log = LoggerFactory.getLogger(TairSpringCache.class); private TairManager tairManager; private final String name; private int namespace; private int timeout; public TairSpringCache(String name, TairManager tairManager, int namespace) { this(name, tairManager, namespace, 0); } public TairSpringCache(String name, TairManager tairManager, int namespace, int timeout) { this.name = name; this.tairManager = tairManager; this.namespace = namespace; this.timeout = timeout; } @Override public String getName() { return this.name; } @Override public Object getNativeCache() { return this.tairManager; } @Override public ValueWrapper get(Object key) { final String tairKey = String.format("%s:%s", this.name, key); final Result<DataEntry> result = this.tairManager.get(this.namespace, tairKey); if (result.isSuccess() && (result.getRc() == ResultCode.SUCCESS)) { final Object obj = result.getValue().getValue(); // 缓存为空兜底的是Nil对象,这里返回的时候需要转为null if (obj instanceof Nil) { return () -> null; } return () -> obj; } return null; } @Override public <T> T get(Object key, Class<T> type) { return (T) this.get(key).get(); } public <T> T get(Object o, Callable<T> callable) { return null; } @Override public void put(Object key, Object value) { if (value == null) { // 为空的话,兜底一个空对象,防止缓存穿透(由于tair自身特性不允许缓存null对象的原因,这里缓存一个空对象) value = new Nil(); } if (value instanceof Serializable) { final String tairKey = String.format("%s:%s", this.name, key); final ResultCode resultCode = this.tairManager.put( this.namespace, tairKey, (Serializable) value, 0, this.timeout ); if (resultCode != ResultCode.SUCCESS) { TairSpringCache.log.error( String.format( "[CachePut]: unable to put %s => %s into tair due to: %s", key, value, resultCode.getMessage() ) ); } } else { throw new RuntimeException( String.format( "[CachePut]: value %s is not Serializable", value ) ); } } public ValueWrapper putIfAbsent(Object key, Object value) { final ValueWrapper vw = this.get(key); if (vw.get() == null) { this.put(key, value); } return vw; } @Override public void evict(Object key) { final String tairKey = String.format("%s:%s", this.name, key); final ResultCode resultCode = this.tairManager.delete(this.namespace, tairKey); if ((resultCode == ResultCode.SUCCESS) || (resultCode == ResultCode.DATANOTEXSITS) || (resultCode == ResultCode.DATAEXPIRED)) { return; } else { final String errMsg = String.format( "[CacheDelete]: unable to evict key %s, resultCode: %s", key, resultCode ); TairSpringCache.log.error(errMsg); throw new RuntimeException(errMsg); } } @Override public void clear() { //TODO fgz: implement here later } public void setTairManager(TairManager tairManager) { this.tairManager = tairManager; } public void setNamespace(int namespace) { this.namespace = namespace; } public void setTimeout(int timeout) { this.timeout = timeout; } static class Nil implements Serializable { private static final long serialVersionUID = -9138993336039047508L; } }
测试用例就不贴了。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持。
加载全部内容