您现在的位置是:网站首页> 编程资料编程资料
详解redis缓存与数据库一致性问题解决_Redis_
2023-05-27
381人已围观
简介 详解redis缓存与数据库一致性问题解决_Redis_
数据库与缓存读写模式策略
写完数据库后是否需要马上更新缓存还是直接删除缓存?
(1)、如果写数据库的值与更新到缓存值是一样的,不需要经过任何的计算,可以马上更新缓存,但是如果对于那种写数据频繁而读数据少的场景并不合适这种解决方案,因为也许还没有查询就被删除或修改了,这样会浪费时间和资源
(2)、如果写数据库的值与更新缓存的值不一致,写入缓存中的数据需要经过几个表的关联计算后得到的结果插入缓存中,那就没有必要马上更新缓存,只有删除缓存即可,等到查询的时候在去把计算后得到的结果插入到缓存中即可。
所以一般的策略是当更新数据时,先删除缓存数据,然后更新数据库,而不是更新缓存,等要查询的时候才把最新的数据更新到缓存
数据库与缓存双写情况下导致数据不一致问题
场景一
当更新数据时,如更新某商品的库存,当前商品的库存是100,现在要更新为99,先更新数据库更改成99,然后删除缓存,发现删除缓存失败了,这意味着数据库存的是99,而缓存是100,这导致数据库和缓存不一致。
场景一解决方案
这种情况应该是先删除缓存,然后在更新数据库,如果删除缓存失败,那就不要更新数据库,如果说删除缓存成功,而更新数据库失败,那查询的时候只是从数据库里查了旧的数据而已,这样就能保持数据库与缓存的一致性。
场景二
在高并发的情况下,如果当删除完缓存的时候,这时去更新数据库,但还没有更新完,另外一个请求来查询数据,发现缓存里没有,就去数据库里查,还是以上面商品库存为例,如果数据库中产品的库存是100,那么查询到的库存是100,然后插入缓存,插入完缓存后,原来那个更新数据库的线程把数据库更新为了99,导致数据库与缓存不一致的情况
场景二解决方案
遇到这种情况,可以用队列的去解决这个问,创建几个队列,如20个,根据商品的ID去做hash值,然后对队列个数取摸,当有数据更新请求时,先把它丢到队列里去,当更新完后在从队列里去除,如果在更新的过程中,遇到以上场景,先去缓存里看下有没有数据,如果没有,可以先去队列里看是否有相同商品ID在做更新,如果有也把查询的请求发送到队列里去,然后同步等待缓存更新完成。
这里有一个优化点,如果发现队列里有一个查询请求了,那么就不要放新的查询操作进去了,用一个while(true)循环去查询缓存,循环个200MS左右,如果缓存里还没有则直接取数据库的旧数据,一般情况下是可以取到的。
在高并发下解决场景二要注意的问题
(1)读请求时长阻塞
由于读请求进行了非常轻度的异步化,所以一定要注意读超时的问题,每个读请求必须在超时间内返回,该解决方案最大的风险在于可能数据更新很频繁,导致队列中挤压了大量的更新操作在里面,然后读请求会发生大量的超时,最后导致大量的请求直接走数据库,像遇到这种情况,一般要做好足够的压力测试,如果压力过大,需要根据实际情况添加机器。
(2)请求并发量过高
这里还是要做好压力测试,多模拟真实场景,并发量在最高的时候QPS多少,扛不住就要多加机器,还有就是做好读写比例是多少
(3)多服务实例部署的请求路由
可能这个服务部署了多个实例,那么必须保证说,执行数据更新操作,以及执行缓存更新操作的请求,都通过nginx服务器路由到相同的服务实例上
(4)热点商品的路由问题,导致请求的倾斜
某些商品的读请求特别高,全部打到了相同的机器的相同丢列里了,可能造成某台服务器压力过大,因为只有在商品数据更新的时候才会清空缓存,然后才会导致读写并发,所以更新频率不是太高的话,这个问题的影响并不是很大,但是确实有可能某些服务器的负载会高一些。
数据库与缓存数据一致性解决方案流程图
数据库与缓存数据一致性解决方案对应代码
商品库存实体
package com.shux.inventory.entity; /** ********************************************** * 描述: * Simba.Hua * 2017年8月30日 ********************************************** **/ public class InventoryProduct { private Integer productId; private Long InventoryCnt; public Integer getProductId() { return productId; } public void setProductId(Integer productId) { this.productId = productId; } public Long getInventoryCnt() { return InventoryCnt; } public void setInventoryCnt(Long inventoryCnt) { InventoryCnt = inventoryCnt; } }
请求接口
/** ********************************************** * 描述: * Simba.Hua * 2017年8月27日 ********************************************** **/ public interface Request { public void process(); public Integer getProductId(); public boolean isForceFefresh(); }
数据更新请求
package com.shux.inventory.request; import org.springframework.transaction.annotation.Transactional; import com.shux.inventory.biz.InventoryProductBiz; import com.shux.inventory.entity.InventoryProduct; /** ********************************************** * 描述:更新库存信息 * 1、先删除缓存中的数据 * 2、更新数据库中的数据 * Simba.Hua * 2017年8月30日 ********************************************** **/ public class InventoryUpdateDBRequest implements Request{ private InventoryProductBiz inventoryProductBiz; private InventoryProduct inventoryProduct; public InventoryUpdateDBRequest(InventoryProduct inventoryProduct,InventoryProductBiz inventoryProductBiz){ this.inventoryProduct = inventoryProduct; this.inventoryProductBiz = inventoryProductBiz; } @Override @Transactional public void process() { inventoryProductBiz.removeInventoryProductCache(inventoryProduct.getProductId()); inventoryProductBiz.updateInventoryProduct(inventoryProduct); } @Override public Integer getProductId() { // TODO Auto-generated method stub return inventoryProduct.getProductId(); } @Override public boolean isForceFefresh() { // TODO Auto-generated method stub return false; } }
查询请求
package com.shux.inventory.request; import com.shux.inventory.biz.InventoryProductBiz; import com.shux.inventory.entity.InventoryProduct; /** ********************************************** * 描述:查询缓存数据 * 1、从数据库中查询 * 2、从数据库中查询后插入到缓存中 * Simba.Hua * 2017年8月30日 ********************************************** **/ public class InventoryQueryCacheRequest implements Request { private InventoryProductBiz inventoryProductBiz; private Integer productId; private boolean isForceFefresh; public InventoryQueryCacheRequest(Integer productId,InventoryProductBiz inventoryProductBiz,boolean isForceFefresh) { this.productId = productId; this.inventoryProductBiz = inventoryProductBiz; this.isForceFefresh = isForceFefresh; } @Override public void process() { InventoryProduct inventoryProduct = inventoryProductBiz.loadInventoryProductByProductId(productId); inventoryProductBiz.setInventoryProductCache(inventoryProduct); } @Override public Integer getProductId() { // TODO Auto-generated method stub return productId; } public boolean isForceFefresh() { return isForceFefresh; } public void setForceFefresh(boolean isForceFefresh) { this.isForceFefresh = isForceFefresh; } }
spring启动时初始化队列线程池
package com.shux.inventory.thread; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.shux.inventory.request.Request; import com.shux.inventory.request.RequestQueue; import com.shux.utils.other.SysConfigUtil; /** ********************************************** * 描述:请求处理线程池,初始化队列数及每个队列最多能处理的数量 * Simba.Hua * 2017年8月27日 ********************************************** **/ public class RequestProcessorThreadPool { private static final int blockingQueueNum = SysConfigUtil.get("request.blockingqueue.number")==null?10:Integer.valueOf(SysConfigUtil.get("request.blockingqueue.number").toString()); private static final int queueDataNum = SysConfigUtil.get("request.everyqueue.data.length")==null?100:Integer.valueOf(SysConfigUtil.get("request.everyqueue.data.length").toString()); private ExecutorService threadPool = Executors.newFixedThreadPool(blockingQueueNum); private RequestProcessorThreadPool(){ for(int i=0;iqueue = new ArrayBlockingQueue (queueDataNum);//每个队列中放100条数据 RequestQueue.getInstance().addQueue(queue); threadPool.submit(new RequestProcessorThread(queue));//把每个queue交个线程去处理,线程会处理每个queue中的数据 } } public static class Singleton{ private static RequestProcessorThreadPool instance; static{ instance = new RequestProcessorThreadPool(); } public static RequestProcessorThreadPool getInstance(){ return instance; } } public static RequestProcessorThreadPool getInstance(){ return Singleton.getInstance(); } /** * 初始化线程池 */ public static void init(){ getInstance(); } }
请求处理线程
package com.shux.inventory.thread; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import com.shux.inventory.request.InventoryUpdateDBRequest; import com.shux.inventory.request.Request; import com.shux.inventory.request.RequestQueue; /** ********************************************** * 描述:请求处理线程 * Simba.Hua * 2017年8月27日 ********************************************** **/ public class RequestProcessorThread implements Callable{ private ArrayBlockingQueue queue; public RequestProcessorThread(ArrayBlockingQueue queue){ this.queue = queue; } @Override public Boolean call() throws Exception { Request request = queue.take(); Map flagMap = RequestQueue.getInstance().getFlagMap(); //不需要强制刷新的时候,查询请求去重处理 if (!request.isForceFefresh()){ if (request instanceof InventoryUpdateDBRequest) {//如果是更新请求,那就置为false flagMap.put(request.getProductId(), true); } else { Boolean flag = flagMap.get(request.getProductId()); /** * 标志位为空,有三种情况 * 1、没有过更新请求 * 2、没有查询请求 * 3、数据库中根本没有数据 * 在最初情况,一旦库存了插入了数据,那就好会在缓存中也会放一份数据, * 但这种情况下有可能由于redis中内存满了,redis通过LRU算法把这个商品给清除了,导致缓存中没有数据 * 所以当标志位为空的时候,需要从数据库重查询一次,并且把标志位置为false,以便后面的请求能够从缓存中取 */ if ( flag == null) { flagMap.put(request.getProductId(), false); } /** * 如果不为空,并且flag为true,说明之前有一次更新请求,说明缓存中没有数据了(更新缓存会先删除缓存), * 这个时候就要去刷新缓存,即从数据库中查询一次,并把标志位设置为false */ if ( flag != null && flag) { flagMap.put(request.getProductId(), false); } /** * 这种情况说明之前有一个查询请求,并且把数据刷新到了缓存中,所以这时候就不用去刷新缓存了,直接返回就可以了 */ if (flag != null && !flag) { flagMap.put(request.getProductId(), false); return true; } } } request.process(); return true; } }
请求队列
package com.shux.inventory.request; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; /** ********************************************** * 描述:请求队列 * Simba.Hua * 2017年8月27日 ********************************************** **/ public class RequestQueue { private List> queues = new ArrayList<>(); private Map flagMap = new ConcurrentHashMap<>(); private RequestQueue(){ } private static class Singleton{ private static RequestQueue queue; static{ queue = new RequestQueue(); } public static RequestQueue getInstance() { return queue; } } public static RequestQueue getInstance(){ return Singleton.getInstance(); } public void addQueue(ArrayBlockingQueue queue) { queues.add(queue); } public int getQueueSize(){ return queues.size(); } public ArrayBlockingQueue getQueueByIndex(int index) { return queues.get(index); } public Map getFlagMap() { return this.flagMap; } }
spring 启动初始化线程池类
package com.shux.inventory.listener; import org.springframework.context.ApplicationListener; import org.springframework.context.event.Context
点击排行
本栏推荐
