# 040. 在库存服务中实现缓存与数据库双写一致性保障方案

由于该章节视频是分段录制的,然而整体是写代码耗费的时间,这里合并为一个章节,对应原始视频章节 040、041、042、043

TIP

对于这个案例背景,是简化的,本课程重点是亿级流量高并发缓存架构, 代码也是简化的比较简单的能把主要流程走通。

所以不要对代码有太高的期待,对于实际的代码组织,本人(笔记者) 会根据思路理解自行组织,当然大方向还是会跟着视频中的走,部分是在是太乱的代码, 会进行重组。

040~044 章节代码对于自己来说是一个从来没有接触过的思路写法, 这个写法单独开辟分支保留了出来 gitHub 传送门 (opens new window)

再来回顾下之前的思路:

  1. 数据更新:根据唯一标识路由到一个队里中,「删除缓存 + 更新数据」
  2. 数据读取:如果不在缓存中,根据唯一标识路由到一个队里中,「读取数据 + 写入缓存」

投入队里之后,就等待结果完成,由于同一个标识路由到的是同一个队列中, 所以相当于加锁了。

下面就来实现这个思路,分几步走:

  1. 系统初启动时,初始化线程池与内存队列
  2. 两种请求对象封装
  3. 请求异步执行 service 封装
  4. 两种请求 Controller 封装
  5. 读请求去重优化
  6. 空数据读请求过滤优化

由于该代码核心的就几个地方,其他的都是基础的业务与数据库的常规操作, 故而笔记只记录重点思路地方

# 系统初启动时,初始化线程池与内存队列

通过 ApplicationRunner 机制,在系统初始化时,对线程池进行初始化操作

/**
 * 线程与队列初始化
 *
 * @author : zhuqiang
 * @date : 2019/4/3 22:44
 */
@Component
public class RequestQueue implements ApplicationRunner {
    private List<ArrayBlockingQueue<Request>> queues = new ArrayList<>();

    @Override
    public void run(ApplicationArguments args) throws Exception {
        int workThread = 10;
        ExecutorService executorService = Executors.newFixedThreadPool(workThread);
        for (int i = 0; i < workThread; i++) {
            ArrayBlockingQueue<Request> queue = new ArrayBlockingQueue<>(100);
            executorService.submit(new RequestProcessorThread(queue));
            queues.add(queue);
        }
    }

  public ArrayBlockingQueue<Request> getQueue(int index) {
    return queues.get(index);
  }
}


/**
 * 处理请求的线程
 *
 * @author : zhuqiang
 * @date : 2019/4/3 22:38
 */
public class RequestProcessorThread implements Callable<Boolean> {
    private ArrayBlockingQueue<Request> queue;

    public RequestProcessorThread(ArrayBlockingQueue<Request> queue) {
        this.queue = queue;
    }

    @Override
    public Boolean call() throws Exception {
        try {
            while (true) {
                Request take = queue.take();
                take.process();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

# 两种请求对象封装

数据更新请求

/**
 * 数据更新请求
 *
 * @author : zhuqiang
 * @date : 2019/4/3 23:05
 */
public class ProductInventoryDBUpdateRequest implements Request {
    private ProductInventory productInventory;
    private ProductInventoryService productInventoryService;

    public ProductInventoryDBUpdateRequest(ProductInventory productInventory, ProductInventoryService productInventoryService) {
        this.productInventory = productInventory;
        this.productInventoryService = productInventoryService;
    }

    @Override
    public void process() {
        //1. 删除缓存
        productInventoryService.removeProductInventoryCache(productInventory.getProductId());
        //2. 更新库存
        productInventoryService.updateProductInventory(productInventory);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

数据刷新请求

/**
 * 缓存刷新请求
 *
 * @author : zhuqiang
 * @date : 2019/4/6 14:13
 */
public class ProductInventoryCacheRefreshRequest implements Request {

    private Integer productId;
    private ProductInventoryService productInventoryService;

    public ProductInventoryCacheRefreshRequest(Integer productId, ProductInventoryService productInventoryService) {
        this.productId = productId;
        this.productInventoryService = productInventoryService;
    }

    @Override
    public void process() {
        // 1. 读取数据库库存
        ProductInventory productInventory = productInventoryService.findProductInventory(productId);
        // 2. 设置缓存
        productInventoryService.setProductInventoryCache(productInventory);
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 请求异步执行 service 封装

@Service
public class RequestAsyncProcessServiceImpl implements RequestAsyncProcessService {
    @Autowired
    private RequestQueue requestQueue;

    @Override
    public void process(Request request) {
        try {
            // 1. 根据商品 id 路由到具体的队列
            ArrayBlockingQueue<Request> queue = getRoutingQueue(request.getProductId());
            // 2. 放入队列
            queue.put(request);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private ArrayBlockingQueue<Request> getRoutingQueue(Integer productId) {
        // 先获取 productId 的 hash 值
        String key = String.valueOf(productId);
        int h;
        int hash = (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);

        // 对hash值取模,将hash值路由到指定的内存队列中,比如内存队列大小8
        // 用内存队列的数量对hash值取模之后,结果一定是在0~7之间
        // 所以任何一个商品id都会被固定路由到同样的一个内存队列中去的
        int index = (requestQueue.queueSize() - 1) & hash;
        return requestQueue.getQueue(index);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# 两种请求 Controller 封装

/**
 * 商品库存
 *
 * @author : zhuqiang
 * @date : 2019/4/6 15:23
 */
@RestController
public class ProductInventoryController {
    @Autowired
    private RequestAsyncProcessService requestAsyncProcessService;
    @Autowired
    private ProductInventoryService productInventoryService;

    /**
     * 更新商品库存
     */
    @RequestMapping("/updateProductInventory")
    public Response updateProductInventory(ProductInventory productInventory) {
        try {
            ProductInventoryDBUpdateRequest request = new ProductInventoryDBUpdateRequest(productInventory, productInventoryService);
            requestAsyncProcessService.process(request);
            return new Response(Response.SUCCESS);
        } catch (Exception e) {
            e.printStackTrace();
            return new Response(Response.FAILURE);
        }
    }

    @RequestMapping("/getProductInventory")
    public ProductInventory getProductInventory(Integer productId) {
        try {
            // 异步获取
            ProductInventoryCacheRefreshRequest request = new ProductInventoryCacheRefreshRequest(productId, productInventoryService);
            requestAsyncProcessService.process(request);
            ProductInventory productInventory = null;

            long startTime = System.currentTimeMillis();
            long endTime = 0L;
            long waitTime = 0L;
            // 最多等待 200 毫秒
            while (true) {
                if (waitTime > 200) {
                    break;
                }
                // 尝试去redis中读取一次商品库存的缓存数据
                productInventory = productInventoryService.getProductInventoryCache(productId);

                // 如果读取到了结果,那么就返回
                if (productInventory != null) {
                    return productInventory;
                }
                // 如果没有读取到结果,那么等待一段时间
                else {
                    Thread.sleep(20);
                    endTime = System.currentTimeMillis();
                    waitTime = endTime - startTime;
                }
            }
            // 直接尝试从数据库中读取数据
            productInventory = productInventoryService.findProductInventory(productId);
            if (productInventory != null) {
                return productInventory;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return new ProductInventory(productId, -1L);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

# 读请求去重优化

核心思路是通过:map 来保存写标志

@Service
public class RequestAsyncProcessServiceImpl implements RequestAsyncProcessService {
    @Autowired
    private RequestQueue requestQueue;

    @Override
    public void process(Request request) {
        try {
            Map<Integer, Boolean> flagMap = requestQueue.getFlagMap();

            // 如果是一个更新数据库请求
            if (request instanceof ProductInventoryDBUpdateRequest) {
                flagMap.put(request.getProductId(), true);
            } else if (request instanceof ProductInventoryCacheRefreshRequest) {
                Boolean flag = flagMap.get(request.getProductId());
                // 系统启动后,就没有写请求,全是读,可能导致 flas = null
                if (flag == null) {
                    flagMap.put(request.getProductId(), false);
                }
                // 已经有过读或写的请求 并且前面已经有一个写请求了
                if (flag != null && flag) {
                    // 读取请求把,写请求标志冲掉
                    flagMap.put(request.getProductId(), false);
                }
                // 如果是读请求,直接返回,等待写完成即可
                else if (flag != null && !flag) {
                    return;
                }
            }

            // 1. 根据商品 id 路由到具体的队列
            ArrayBlockingQueue<Request> queue = getRoutingQueue(request.getProductId());
            // 2. 放入队列
            queue.put(request);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

# 空数据读请求过滤优化

上面的逻辑,会让这种场景下的请求不执行,但是在 getProductInventory 中,如果从缓存中没有读取到,则最终会走一次数据库。

// 系统启动后,就没有写请求,全是读,可能导致 flas = null
if (flag == null) {
    flagMap.put(request.getProductId(), false);
}
1
2
3
4

这里就存在一个 bug 了,会导致缓存一直被穿透,如果没有写请求的话,读请求被去重了,一直请求数据库。

修复这个 bug 的话,最简单的办法就是在读取数据库后,直接写入缓存中,如果不考虑并发问题的话,直接在 getProductInventory 中读取数据库后写入缓存即可。

那么就还有一个场景会导致缓存会穿透:数据库中没有这个数据,就会一直走查库的操作,这个问题后续会有解决方案;

# 深入解决去读请求去重优化

上面的代码存在几个问题:

  1. RequestAsyncProcessServiceImpl.process 判定与设置 flag 值在并发情况下会导致 flag 值问题
  2. 查库之后直接写缓存在并发情况下会导致数据不一致的情况(多个请求写数据,队列无意义了)

在 ProductInventoryController 中只要走了数据库后,就强制请求刷新缓存

// 直接尝试从数据库中读取数据
productInventory = productInventoryService.findProductInventory(productId);
if (productInventory != null) {
    // 读取到了数据,强制刷新缓存
    ProductInventoryCacheRefreshRequest forceRfreshRequest = new ProductInventoryCacheRefreshRequest(productId, productInventoryService, true);
    requestAsyncProcessService.process(forceRfreshRequest);
    return productInventory;
}
1
2
3
4
5
6
7
8

每个工作线程,自己处理自己队列的读去重请求

/**
 * 处理请求的线程
 *
 * @author : zhuqiang
 * @date : 2019/4/3 22:38
 */
public class RequestProcessorThread implements Callable<Boolean> {
    private ArrayBlockingQueue<Request> queue;
    /**
     * k: 商品 id v:请求标志: true : 有更新请求
     */
    private Map<Integer, Boolean> flagMap = new ConcurrentHashMap<>();

    public RequestProcessorThread(ArrayBlockingQueue<Request> queue) {
        this.queue = queue;
    }

    @Override
    public Boolean call() throws Exception {
        try {
            while (true) {
                Request request = queue.take();
                // 非强制刷新请求的话,就是一个正常的读请求
                if (!request.isForceRfresh()) {
                    // 如果是一个更新数据库请求
                    if (request instanceof ProductInventoryDBUpdateRequest) {
                        flagMap.put(request.getProductId(), true);
                    } else if (request instanceof ProductInventoryCacheRefreshRequest) {
                        Boolean flag = flagMap.get(request.getProductId());
                        if (flag == null) {
                            flagMap.put(request.getProductId(), false);
                        }
                        // 已经有过读或写的请求 并且前面已经有一个写请求了
                        if (flag != null && flag) {
                          // 读取请求把,写请求标志冲掉
                          // 本次读会正常的执行,组成 1+1 (1 写 1 读)
                          // 后续的正常读请求会被过滤掉
                            flagMap.put(request.getProductId(), false);
                        }
                        // 如果是读请求,直接返回,等待写完成即可
                        else if (flag != null && !flag) {
                            continue;
                        }
                    }
                }
                request.process();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

这样一改造之后,并发的地方,就利用队列串行起来了,那么此代码还存在以下场景的缺陷:

  1. 当大量请求超过 200 毫秒未获取到缓存,会导致大量请求汇聚到数据库

    这种情况的发生场景有:

    1. 大量的写请求在前面,导致后续的大量读请求超时,直接读库
    2. 数据库压根就没有这个商品,导致缓存被穿透,一直读库
  2. 当大量请求超过 200 毫秒后,在数据库获取到了,并请求强制刷新缓存,导致大量请求又回去到数据库了

    这种情况是由于增加了强制刷新标志,导致的另外一个 bug,这个时候的思路可以再增加一个强制刷新队列来做强制读请求去重

# 总结

异步串行化的实现核心思路:

  1. 使用队列来避免数据竞争

    1. 删除缓存 + 更新数据库 封装成一个写请求
    2. 读取数据库 + 写缓存 封装成一个读请求
  2. 根据商品 id 路由到同一个队列中(此方案暂未考虑多服务实例的场景)

  3. 有写 1+1(1 写 1 读)时,需要过滤掉大量的读请求

    这部分正常读请求如不过滤掉,会进入数据库,且库存并未更新,浪费性能资源与缓存穿透(有数据,且数据已经进入了缓存,但是队列中还一直去数据库执行并刷新缓存)

  4. 等待读请求需要超时机制,一旦超时则从数据库获取

    此类场景出现的时候可能的原因有如下几点:

    1. 每个读或写请求测试耗时不准确
    2. 测试不准确导致服务实例不够(当然此章节并未解决多服务实例怎么路由或者解决并发的问题)
    3. 缓存被穿透,使用不存在的数据一致访问