2.1.3. 缓存并发的解决和抽象

通过一个案例: 查询信息从缓存中查询信息,如果缓存中没有则从db查询,再放入缓存中。

这里就会引出一个问题:

|300个客户| -> |服务器| -> 缓存
|300个客户| -> |服务器| -> 缓存        ===>  如果缓存未命中,则访问DB
|300个客户| -> |服务器| -> 缓存

在真实的环境中存缓存都会放置过期时间,用来逃脱那些不使用的数据。

在上面示意中,假设 缓存过期时间为 3 秒,那么在第三秒的时候,有900个并发从缓存未命中,那么这个时候,就要从数据库中加载数据,然后set回缓存中。如果不加锁,那么这个时间点到达数据库的并发将是900个,mysql数据库很有可能会挂掉。

2.1.3.1. 方法上加锁 synchronized

/**
 * 服务模拟
 * @author zhuqiang
 * @version 1.0.1 2017/2/9 13:45
 * @date 2017/2/9 13:45
 * @since 1.0
 */
public class MockService {
    private Logger log = LoggerFactory.getLogger(getClass());
    final String code = "book";
    JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost");

    /**
     * 模拟从db查询数据
     * @return
     */
    private Map<Integer, Object> getDBBookMap() {
        //假设业务处理需要时间
        try {
            TimeUnit.MILLISECONDS.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Map<Integer, Object> mockData = new HashMap<>();
        for (int i = 0; i < 10; i++) {
            mockData.put(i, "java" + i);
        }
        return mockData;
    }

    /**
     * 模拟service 查询数据
     * @return
     */
    public synchronized Map<Integer, Object> query() {
        Jedis jedis = pool.getResource();
        String jsonStr = jedis.get(code);
        if (jsonStr != null) {
            log.info("==== Cache Hit");
            return JSON.parseObject(jsonStr, Map.class);
        }
        log.info("==== no Cache");
        // 缓存未命中,则从数据库获取数据
        Map<Integer, Object> dbBookMap = getDBBookMap();

        jedis.setex(code, 3, JSON.toJSONString(dbBookMap));
        return dbBookMap;
    }
}

测试数据类如下:

public class DemoTest {
    private MockService mockService = new MockService();

    /**
     * 普通调用两次,测试数据
     */
    @Test
    public void demo1() {
        mockService.query();
        mockService.query();
    }
    @Test
    public void demo2() {
        // 模拟并发
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                try {
                    cd2.await(); // 等待发令枪,3个线程就绪后再同时执行该操作
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                mockService.query();
            }).start();
            cd2.countDown();
        }
        synchronized (this) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

demo1输出结果:
23:00:47.777 [main] INFO study001.cache.MockService - ==== no Cache
23:00:48.226 [main] INFO study001.cache.MockService - ==== Cache Hit

因为不是并发,串行执行,所以第二次获取到了。


demo2输出结果:
13:56:45.144 [Thread-2] INFO MockService - ==== no Cache
13:56:45.510 [Thread-1] INFO MockService - ==== Cache Hit
13:56:45.530 [Thread-0] INFO MockService - ==== Cache Hit


貌似达到了效果。但是这种效果就和demo1中的性能差不多了。性能会非常差。

2.1.3.2. 同步代码块

/**
 * 服务模拟
 * @author zhuqiang
 * @version 1.0.1 2017/2/9 13:45
 * @date 2017/2/9 13:45
 * @since 1.0
 */
public class MockService {
    private Logger log = LoggerFactory.getLogger(getClass());
    final String code = "book";
    JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost");

    /**
     * 模拟从db查询数据
     * @return
     */
    private Map<Integer, Object> getDBBookMap() {
        //假设业务处理需要时间
        try {
            TimeUnit.MILLISECONDS.sleep(999);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Map<Integer, Object> mockData = new HashMap<>();
        for (int i = 0; i < 10; i++) {
            mockData.put(i, "java" + i);
        }
        return mockData;
    }

    private Integer count = 0;

    /**
     * 模拟service 查询数据
     * @return
     */
    public Map<Integer, Object> query() {
        Jedis jedis = pool.getResource();
        try {
            String jsonStr = jedis.get(code);
            if (jsonStr != null) {
                return JSON.parseObject(jsonStr, Map.class);
            }
            synchronized (code) {
                jsonStr = jedis.get(code);
                if (jsonStr != null) {
                    return JSON.parseObject(jsonStr, Map.class);
                }
                count ++;
                log.info("================== no Cache");
                // 缓存未命中,则从数据库获取数据
                Map<Integer, Object> dbBookMap = getDBBookMap();

                jedis.setex(code, 1, JSON.toJSONString(dbBookMap));
                return dbBookMap;
            }
        } finally {
            jedis.close();  // 这里一定要关闭,否则会死锁
        }
    }

    public Integer getCount() {
        return count;
    }
}

测试数据

    @Test
    public void demo2() {
        // 模拟并发
        for (int i = 0; i < 10000; i++) {
            int finalI = i;
            new Thread(() -> {
                try {
                    cd2.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                mockService.query();
            }, "TEST" + i).start();
            cd2.countDown();
        }
        try {
            TimeUnit.SECONDS.sleep(10);
            log.info("缓存穿透个数:" + mockService.getCount());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

输出:
14:57:18.616 [TEST21] INFO MockService - ================== no Cache
14:57:22.134 [TEST3166] INFO MockService - ================== no Cache
14:57:24.159 [TEST6683] INFO MockService - ================== no Cache
14:57:27.581 [main] INFO DemoTestTest - 缓存穿透个数:3

在这里使用Jedis的时候刚开始只会有8个线程打印信息,然后应该是等待还是死锁了。应该和 jedispool 的配置或则什么有关。所以在通信完成之后一定要关闭链接。

这个已经解决了单机缓存传统的需求。仔细一看代码,这个思路是

  1. 缓存有则返回
  2. 同步代码块
    1. 再次检查缓存,有则返回
    2. 没有则走数据库

变化的部分只是查询数据库的部分。这样一样。就可以抽象成模版方法模式。

2.1.3.3. 缓存抽象封装

/**
 * 缓存模版抽象
 * @author zhuqiang
 * @version 1.0.1 2017/2/9 15:06
 * @date 2017/2/9 15:06
 * @since 1.0
 */
public class CacheTemplate {
    private Logger log = LoggerFactory.getLogger(getClass());
    JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost");

    /**
     * 获取缓存数据
     * @param key
     * @param clazz
     * @param expire 过期时间,秒
     * @param <T>
     * @return
     */
    public <T> T get(String key, TypeReference<T> clazz, int expire, LoadDataCallback<T> loadDataCallback) {
        Jedis jedis = pool.getResource();
        try {
            String jsonStr = jedis.get(key);
            if (jsonStr != null) {
                return JSON.parseObject(jsonStr, clazz);
            }
            synchronized (key) {
                jsonStr = jedis.get(key);
                if (jsonStr != null) {
                    return JSON.parseObject(jsonStr, clazz);
                }
                log.info("================== no Cache");
                T load = loadDataCallback.load();
                if (load != null) {
                    jedis.setex(key, expire, JSON.toJSONString(load));
                    return load;
                }
            }
        } finally {
            jedis.close();
        }
        return null;
    }

    public interface LoadDataCallback<T> {
        /**
         * 加载数据
         * @return 返回json串。 如果无数据则返回null
         */
        T load();
    }
}

修改调用处

  private CacheTemplate cacheTemplate = new CacheTemplate();

       /**
     * 模拟service 查询数据
     * @return
     */
    public Map<Integer, Object> query() {
        return cacheTemplate.get(
                code,
                new TypeReference<Map<Integer, Object>>() {
                },
                1,
                new CacheTemplate.LoadDataCallback<Map<Integer, Object>>() {
                    @Override
                    public Map<Integer, Object> load() {
                        return getDBBookMap();
                    }
                });
    }
  • TypeReference : 是fastJson中的对象,用来返回复杂的数据。像本文中的Map,如果使用class,那么解析出来的是不带泛型的结果集。
© All Rights Reserved            updated 2017-09-04 13:50:26

results matching ""

    No results matching ""