# 040. 在库存服务中实现缓存与数据库双写一致性保障方案
由于该章节视频是分段录制的,然而整体是写代码耗费的时间,这里合并为一个章节,对应原始视频章节 040、041、042、043
TIP
对于这个案例背景,是简化的,本课程重点是亿级流量高并发缓存架构, 代码也是简化的比较简单的能把主要流程走通。
所以不要对代码有太高的期待,对于实际的代码组织,本人(笔记者) 会根据思路理解自行组织,当然大方向还是会跟着视频中的走,部分是在是太乱的代码, 会进行重组。
040~044 章节代码对于自己来说是一个从来没有接触过的思路写法, 这个写法单独开辟分支保留了出来 gitHub 传送门 (opens new window)
再来回顾下之前的思路:
- 数据更新:根据唯一标识路由到一个队里中,「删除缓存 + 更新数据」
- 数据读取:如果不在缓存中,根据唯一标识路由到一个队里中,「读取数据 + 写入缓存」
投入队里之后,就等待结果完成,由于同一个标识路由到的是同一个队列中, 所以相当于加锁了。
下面就来实现这个思路,分几步走:
- 系统初启动时,初始化线程池与内存队列
- 两种请求对象封装
- 请求异步执行 service 封装
- 两种请求 Controller 封装
- 读请求去重优化
- 空数据读请求过滤优化
由于该代码核心的就几个地方,其他的都是基础的业务与数据库的常规操作, 故而笔记只记录重点思路地方
# 系统初启动时,初始化线程池与内存队列
通过 ApplicationRunner 机制,在系统初始化时,对线程池进行初始化操作
/**
* 线程与队列初始化
*
* @author : zhuqiang
* @date : 2019/4/3 22:44
*/
@Component
public class RequestQueue implements ApplicationRunner {
private List<ArrayBlockingQueue<Request>> queues = new ArrayList<>();
@Override
public void run(ApplicationArguments args) throws Exception {
int workThread = 10;
ExecutorService executorService = Executors.newFixedThreadPool(workThread);
for (int i = 0; i < workThread; i++) {
ArrayBlockingQueue<Request> queue = new ArrayBlockingQueue<>(100);
executorService.submit(new RequestProcessorThread(queue));
queues.add(queue);
}
}
public ArrayBlockingQueue<Request> getQueue(int index) {
return queues.get(index);
}
}
/**
* 处理请求的线程
*
* @author : zhuqiang
* @date : 2019/4/3 22:38
*/
public class RequestProcessorThread implements Callable<Boolean> {
private ArrayBlockingQueue<Request> queue;
public RequestProcessorThread(ArrayBlockingQueue<Request> queue) {
this.queue = queue;
}
@Override
public Boolean call() throws Exception {
try {
while (true) {
Request take = queue.take();
take.process();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# 两种请求对象封装
数据更新请求
/**
* 数据更新请求
*
* @author : zhuqiang
* @date : 2019/4/3 23:05
*/
public class ProductInventoryDBUpdateRequest implements Request {
private ProductInventory productInventory;
private ProductInventoryService productInventoryService;
public ProductInventoryDBUpdateRequest(ProductInventory productInventory, ProductInventoryService productInventoryService) {
this.productInventory = productInventory;
this.productInventoryService = productInventoryService;
}
@Override
public void process() {
//1. 删除缓存
productInventoryService.removeProductInventoryCache(productInventory.getProductId());
//2. 更新库存
productInventoryService.updateProductInventory(productInventory);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
数据刷新请求
/**
* 缓存刷新请求
*
* @author : zhuqiang
* @date : 2019/4/6 14:13
*/
public class ProductInventoryCacheRefreshRequest implements Request {
private Integer productId;
private ProductInventoryService productInventoryService;
public ProductInventoryCacheRefreshRequest(Integer productId, ProductInventoryService productInventoryService) {
this.productId = productId;
this.productInventoryService = productInventoryService;
}
@Override
public void process() {
// 1. 读取数据库库存
ProductInventory productInventory = productInventoryService.findProductInventory(productId);
// 2. 设置缓存
productInventoryService.setProductInventoryCache(productInventory);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 请求异步执行 service 封装
@Service
public class RequestAsyncProcessServiceImpl implements RequestAsyncProcessService {
@Autowired
private RequestQueue requestQueue;
@Override
public void process(Request request) {
try {
// 1. 根据商品 id 路由到具体的队列
ArrayBlockingQueue<Request> queue = getRoutingQueue(request.getProductId());
// 2. 放入队列
queue.put(request);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private ArrayBlockingQueue<Request> getRoutingQueue(Integer productId) {
// 先获取 productId 的 hash 值
String key = String.valueOf(productId);
int h;
int hash = (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
// 对hash值取模,将hash值路由到指定的内存队列中,比如内存队列大小8
// 用内存队列的数量对hash值取模之后,结果一定是在0~7之间
// 所以任何一个商品id都会被固定路由到同样的一个内存队列中去的
int index = (requestQueue.queueSize() - 1) & hash;
return requestQueue.getQueue(index);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 两种请求 Controller 封装
/**
* 商品库存
*
* @author : zhuqiang
* @date : 2019/4/6 15:23
*/
@RestController
public class ProductInventoryController {
@Autowired
private RequestAsyncProcessService requestAsyncProcessService;
@Autowired
private ProductInventoryService productInventoryService;
/**
* 更新商品库存
*/
@RequestMapping("/updateProductInventory")
public Response updateProductInventory(ProductInventory productInventory) {
try {
ProductInventoryDBUpdateRequest request = new ProductInventoryDBUpdateRequest(productInventory, productInventoryService);
requestAsyncProcessService.process(request);
return new Response(Response.SUCCESS);
} catch (Exception e) {
e.printStackTrace();
return new Response(Response.FAILURE);
}
}
@RequestMapping("/getProductInventory")
public ProductInventory getProductInventory(Integer productId) {
try {
// 异步获取
ProductInventoryCacheRefreshRequest request = new ProductInventoryCacheRefreshRequest(productId, productInventoryService);
requestAsyncProcessService.process(request);
ProductInventory productInventory = null;
long startTime = System.currentTimeMillis();
long endTime = 0L;
long waitTime = 0L;
// 最多等待 200 毫秒
while (true) {
if (waitTime > 200) {
break;
}
// 尝试去redis中读取一次商品库存的缓存数据
productInventory = productInventoryService.getProductInventoryCache(productId);
// 如果读取到了结果,那么就返回
if (productInventory != null) {
return productInventory;
}
// 如果没有读取到结果,那么等待一段时间
else {
Thread.sleep(20);
endTime = System.currentTimeMillis();
waitTime = endTime - startTime;
}
}
// 直接尝试从数据库中读取数据
productInventory = productInventoryService.findProductInventory(productId);
if (productInventory != null) {
return productInventory;
}
} catch (Exception e) {
e.printStackTrace();
}
return new ProductInventory(productId, -1L);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# 读请求去重优化
核心思路是通过:map 来保存写标志
@Service
public class RequestAsyncProcessServiceImpl implements RequestAsyncProcessService {
@Autowired
private RequestQueue requestQueue;
@Override
public void process(Request request) {
try {
Map<Integer, Boolean> flagMap = requestQueue.getFlagMap();
// 如果是一个更新数据库请求
if (request instanceof ProductInventoryDBUpdateRequest) {
flagMap.put(request.getProductId(), true);
} else if (request instanceof ProductInventoryCacheRefreshRequest) {
Boolean flag = flagMap.get(request.getProductId());
// 系统启动后,就没有写请求,全是读,可能导致 flas = null
if (flag == null) {
flagMap.put(request.getProductId(), false);
}
// 已经有过读或写的请求 并且前面已经有一个写请求了
if (flag != null && flag) {
// 读取请求把,写请求标志冲掉
flagMap.put(request.getProductId(), false);
}
// 如果是读请求,直接返回,等待写完成即可
else if (flag != null && !flag) {
return;
}
}
// 1. 根据商品 id 路由到具体的队列
ArrayBlockingQueue<Request> queue = getRoutingQueue(request.getProductId());
// 2. 放入队列
queue.put(request);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 空数据读请求过滤优化
上面的逻辑,会让这种场景下的请求不执行,但是在 getProductInventory 中,如果从缓存中没有读取到,则最终会走一次数据库。
// 系统启动后,就没有写请求,全是读,可能导致 flas = null
if (flag == null) {
flagMap.put(request.getProductId(), false);
}
2
3
4
这里就存在一个 bug 了,会导致缓存一直被穿透,如果没有写请求的话,读请求被去重了,一直请求数据库。
修复这个 bug 的话,最简单的办法就是在读取数据库后,直接写入缓存中,如果不考虑并发问题的话,直接在 getProductInventory 中读取数据库后写入缓存即可。
那么就还有一个场景会导致缓存会穿透:数据库中没有这个数据,就会一直走查库的操作,这个问题后续会有解决方案;
# 深入解决去读请求去重优化
上面的代码存在几个问题:
- RequestAsyncProcessServiceImpl.process 判定与设置 flag 值在并发情况下会导致 flag 值问题
- 查库之后直接写缓存在并发情况下会导致数据不一致的情况(多个请求写数据,队列无意义了)
在 ProductInventoryController 中只要走了数据库后,就强制请求刷新缓存
// 直接尝试从数据库中读取数据
productInventory = productInventoryService.findProductInventory(productId);
if (productInventory != null) {
// 读取到了数据,强制刷新缓存
ProductInventoryCacheRefreshRequest forceRfreshRequest = new ProductInventoryCacheRefreshRequest(productId, productInventoryService, true);
requestAsyncProcessService.process(forceRfreshRequest);
return productInventory;
}
2
3
4
5
6
7
8
每个工作线程,自己处理自己队列的读去重请求
/**
* 处理请求的线程
*
* @author : zhuqiang
* @date : 2019/4/3 22:38
*/
public class RequestProcessorThread implements Callable<Boolean> {
private ArrayBlockingQueue<Request> queue;
/**
* k: 商品 id v:请求标志: true : 有更新请求
*/
private Map<Integer, Boolean> flagMap = new ConcurrentHashMap<>();
public RequestProcessorThread(ArrayBlockingQueue<Request> queue) {
this.queue = queue;
}
@Override
public Boolean call() throws Exception {
try {
while (true) {
Request request = queue.take();
// 非强制刷新请求的话,就是一个正常的读请求
if (!request.isForceRfresh()) {
// 如果是一个更新数据库请求
if (request instanceof ProductInventoryDBUpdateRequest) {
flagMap.put(request.getProductId(), true);
} else if (request instanceof ProductInventoryCacheRefreshRequest) {
Boolean flag = flagMap.get(request.getProductId());
if (flag == null) {
flagMap.put(request.getProductId(), false);
}
// 已经有过读或写的请求 并且前面已经有一个写请求了
if (flag != null && flag) {
// 读取请求把,写请求标志冲掉
// 本次读会正常的执行,组成 1+1 (1 写 1 读)
// 后续的正常读请求会被过滤掉
flagMap.put(request.getProductId(), false);
}
// 如果是读请求,直接返回,等待写完成即可
else if (flag != null && !flag) {
continue;
}
}
}
request.process();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
这样一改造之后,并发的地方,就利用队列串行起来了,那么此代码还存在以下场景的缺陷:
当大量请求超过 200 毫秒未获取到缓存,会导致大量请求汇聚到数据库
这种情况的发生场景有:
- 大量的写请求在前面,导致后续的大量读请求超时,直接读库
- 数据库压根就没有这个商品,导致缓存被穿透,一直读库
当大量请求超过 200 毫秒后,在数据库获取到了,并请求强制刷新缓存,导致大量请求又回去到数据库了
这种情况是由于增加了强制刷新标志,导致的另外一个 bug,这个时候的思路可以再增加一个强制刷新队列来做强制读请求去重
# 总结
异步串行化的实现核心思路:
使用队列来避免数据竞争
- 删除缓存 + 更新数据库 封装成一个写请求
- 读取数据库 + 写缓存 封装成一个读请求
根据商品 id 路由到同一个队列中(此方案暂未考虑多服务实例的场景)
有写 1+1(1 写 1 读)时,需要过滤掉大量的读请求
这部分正常读请求如不过滤掉,会进入数据库,且库存并未更新,浪费性能资源与缓存穿透(有数据,且数据已经进入了缓存,但是队列中还一直去数据库执行并刷新缓存)
等待读请求需要超时机制,一旦超时则从数据库获取
此类场景出现的时候可能的原因有如下几点:
- 每个读或写请求测试耗时不准确
- 测试不准确导致服务实例不够(当然此章节并未解决多服务实例怎么路由或者解决并发的问题)
- 缓存被穿透,使用不存在的数据一致访问