1 - 1.基本配置

在Spring的应用中,redis可以算是基础操作了。那么想要玩转redis,我们需要知道哪些知识点呢?

  • redis配置,默认,非默认,集群,多实例,连接池参数等
  • redis读写操作,RedisTemplate的基本使用姿势
  • 几种序列化方式对比

本篇博文为redis系列的开篇,将介绍最基本的配置

I. redis基本配置

1. 默认配置

最简单的使用其实开箱即可用,添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

本机启动redis,一切采用默认的配置 (host:127.0.0.1, port:6379, 无密码)

然后就可以愉快的玩耍了,可以直接注入redisTemplate实例,进行各种读写操作

@SpringBootApplication
public class Application {

    public Application(RedisTemplate<String, String> redisTemplate) {
        redisTemplate.opsForValue().set("hello", "world");
        String ans = redisTemplate.opsForValue().get("hello");
        Assert.isTrue("world".equals(ans));
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

2. 自定义配置参数

前面是默认的配置参数,在实际的使用中,一般都会修改这些默认的配置项,如果我的应用中,只有一个redis,那么完全可以只修改默认的配置参数

修改配置文件: application.yml

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password:
    database: 0
    lettuce:
      pool:
        max-active: 32
        max-wait: 300ms
        max-idle: 16
        min-idle: 8

使用和前面没有什么区别,直接通过注入RedisTemplate来操作即可,需要额外注意的是设置了连接池的相关参数,需要额外引入依赖

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

3. 多redis配置

依赖多个不同的redis,也就是说我的项目需要从多个redis实例中获取数据,这种时候,就不能直接使用默认的,需要我们自己来声明ConnectionFactoryRedisTemplate

配置如下

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password:
    lettuce:
      pool:
        max-active: 32
        max-wait: 300
        max-idle: 16
        min-idle: 8
    database: 0
  local-redis:
    host: 127.0.0.1
    port: 6379
    database: 0
    password:
    lettuce:
      pool:
        max-active: 16
        max-wait: 100
        max-idle: 8
        min-idle: 4

对应的配置类,采用Lettuce,基本设置如下,套路都差不多,先读取配置,初始化ConnectionFactory,然后创建RedisTemplate实例,设置连接工厂

@Configuration
public class RedisAutoConfig {

    @Bean
    public LettuceConnectionFactory defaultLettuceConnectionFactory(RedisStandaloneConfiguration defaultRedisConfig,
            GenericObjectPoolConfig defaultPoolConfig) {
        LettuceClientConfiguration clientConfig =
                LettucePoolingClientConfiguration.builder().commandTimeout(Duration.ofMillis(100))
                        .poolConfig(defaultPoolConfig).build();
        return new LettuceConnectionFactory(defaultRedisConfig, clientConfig);
    }

    @Bean
    public RedisTemplate<String, String> defaultRedisTemplate(
            LettuceConnectionFactory defaultLettuceConnectionFactory) {
        RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(defaultLettuceConnectionFactory);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

    @Bean
    @ConditionalOnBean(name = "localRedisConfig")
    public LettuceConnectionFactory localLettuceConnectionFactory(RedisStandaloneConfiguration localRedisConfig,
            GenericObjectPoolConfig localPoolConfig) {
        LettuceClientConfiguration clientConfig =
                LettucePoolingClientConfiguration.builder().commandTimeout(Duration.ofMillis(100))
                        .poolConfig(localPoolConfig).build();
        return new LettuceConnectionFactory(localRedisConfig, clientConfig);
    }

    @Bean
    @ConditionalOnBean(name = "localLettuceConnectionFactory")
    public RedisTemplate<String, String> localRedisTemplate(LettuceConnectionFactory localLettuceConnectionFactory) {
        RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(localLettuceConnectionFactory);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

    @Configuration
    @ConditionalOnProperty(name = "host", prefix = "spring.local-redis")
    public static class LocalRedisConfig {
        @Value("${spring.local-redis.host:127.0.0.1}")
        private String host;
        @Value("${spring.local-redis.port:6379}")
        private Integer port;
        @Value("${spring.local-redis.password:}")
        private String password;
        @Value("${spring.local-redis.database:0}")
        private Integer database;

        @Value("${spring.local-redis.lettuce.pool.max-active:8}")
        private Integer maxActive;
        @Value("${spring.local-redis.lettuce.pool.max-idle:8}")
        private Integer maxIdle;
        @Value("${spring.local-redis.lettuce.pool.max-wait:-1}")
        private Long maxWait;
        @Value("${spring.local-redis.lettuce.pool.min-idle:0}")
        private Integer minIdle;

        @Bean
        public GenericObjectPoolConfig localPoolConfig() {
            GenericObjectPoolConfig config = new GenericObjectPoolConfig();
            config.setMaxTotal(maxActive);
            config.setMaxIdle(maxIdle);
            config.setMinIdle(minIdle);
            config.setMaxWaitMillis(maxWait);
            return config;
        }

        @Bean
        public RedisStandaloneConfiguration localRedisConfig() {
            RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
            config.setHostName(host);
            config.setPassword(RedisPassword.of(password));
            config.setPort(port);
            config.setDatabase(database);
            return config;
        }
    }


    @Configuration
    public static class DefaultRedisConfig {
        @Value("${spring.redis.host:127.0.0.1}")
        private String host;
        @Value("${spring.redis.port:6379}")
        private Integer port;
        @Value("${spring.redis.password:}")
        private String password;
        @Value("${spring.redis.database:0}")
        private Integer database;

        @Value("${spring.redis.lettuce.pool.max-active:8}")
        private Integer maxActive;
        @Value("${spring.redis.lettuce.pool.max-idle:8}")
        private Integer maxIdle;
        @Value("${spring.redis.lettuce.pool.max-wait:-1}")
        private Long maxWait;
        @Value("${spring.redis.lettuce.pool.min-idle:0}")
        private Integer minIdle;

        @Bean
        public GenericObjectPoolConfig defaultPoolConfig() {
            GenericObjectPoolConfig config = new GenericObjectPoolConfig();
            config.setMaxTotal(maxActive);
            config.setMaxIdle(maxIdle);
            config.setMinIdle(minIdle);
            config.setMaxWaitMillis(maxWait);
            return config;
        }

        @Bean
        public RedisStandaloneConfiguration defaultRedisConfig() {
            RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
            config.setHostName(host);
            config.setPassword(RedisPassword.of(password));
            config.setPort(port);
            config.setDatabase(database);
            return config;
        }
    }
}

测试类如下,简单的演示下两个template的读写

@SpringBootApplication
public class Application {

    public Application(RedisTemplate<String, String> localRedisTemplate, RedisTemplate<String, String>
            defaultRedisTemplate)
            throws InterruptedException {
        // 10s的有效时间
        localRedisTemplate.delete("key");
        localRedisTemplate.opsForValue().set("key", "value", 100, TimeUnit.MILLISECONDS);
        String ans = localRedisTemplate.opsForValue().get("key");
        System.out.println("value".equals(ans));
        TimeUnit.MILLISECONDS.sleep(200);
        ans = localRedisTemplate.opsForValue().get("key");
        System.out.println("value".equals(ans) + " >> false ans should be null! ans=[" + ans + "]");


        defaultRedisTemplate.opsForValue().set("key", "value", 100, TimeUnit.MILLISECONDS);
        ans = defaultRedisTemplate.opsForValue().get("key");
        System.out.println(ans);
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

上面的代码执行演示如下

showcase

上面的演示为动图,抓一下重点:

  • 注意 localRedisTemplate, defaultRedisTemplate 两个对象不相同(看debug窗口后面的@xxx)
  • 同样两个RedisTemplateConnectionFactory也是两个不同的实例(即分别对应前面配置类中的两个Factory)
  • 执行后输出的结果正如我们预期的redis操作
    • 塞值,马上取出没问题
    • 失效后,再查询,返回null
  • 最后输出异常日志,提示如下
Description:

Parameter 0 of method redisTemplate in org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration required a single bean, but 2 were found:
	- defaultLettuceConnectionFactory: defined by method 'defaultLettuceConnectionFactory' in class path resource [com/git/hui/boot/redis/config/RedisAutoConfig.class]
	- localLettuceConnectionFactory: defined by method 'localLettuceConnectionFactory' in class path resource [com/git/hui/boot/redis/config/RedisAutoConfig.class]


Action:

Consider marking one of the beans as @Primary, updating the consumer to accept multiple beans, or using @Qualifier to identify the bean that should be consumed

上面表示说有多个ConnectionFactory存在,然后创建默认的RedisTemplate就不知道该选择哪一个了,有两种方法

方法一:指定默认的ConnectionFactory

借助@Primary来指定默认的连接工厂,然后在使用工程的时候,通过@Qualifier注解来显示指定,我需要的工厂是哪个(主要是localRedisTemplate这个bean的定义,如果不加,则会根据defaultLettuceConnectionFactory这个实例来创建Redis连接了)

@Bean
@Primary
public LettuceConnectionFactory defaultLettuceConnectionFactory(RedisStandaloneConfiguration defaultRedisConfig,
        GenericObjectPoolConfig defaultPoolConfig) {
    // ...
}

@Bean
public RedisTemplate<String, String> defaultRedisTemplate(
        @Qualifier("defaultLettuceConnectionFactory") LettuceConnectionFactory defaultLettuceConnectionFactory) {
    // ....
}

@Bean
@ConditionalOnBean(name = "localRedisConfig")
public LettuceConnectionFactory localLettuceConnectionFactory(RedisStandaloneConfiguration localRedisConfig,
        GenericObjectPoolConfig localPoolConfig) {
    // ...
}

@Bean
@ConditionalOnBean(name = "localLettuceConnectionFactory")
public RedisTemplate<String, String> localRedisTemplate(
        @Qualifier("localLettuceConnectionFactory") LettuceConnectionFactory localLettuceConnectionFactory) {
    // ...
}

方法二:忽略默认的自动配置类

既然提示的是org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration类加载bean冲突,那么就不加载这个配置即可

@SpringBootApplication
@EnableAutoConfiguration(exclude = {RedisAutoConfiguration.class, RedisReactiveAutoConfiguration.class})
public class Application {
  // ...
}

II. 其他

0. 项目

2 - 2.Jedis配置

SpringBoot2之后,默认采用Lettuce作为redis的连接客户端,当然我们还是可以强制捡回来,使用我们熟悉的Jedis的,本篇简单介绍下使用Jedis的相关配置

I. 基本配置

1. 依赖

使用Jedis与Lettuce不同的是,需要额外的引入Jedis包的依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-pool2</artifactId>
    </dependency>

    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
    </dependency>
</dependencies>

2. 配置

redis的相关配置,和前面的差不多,只是线程池的参数稍稍有点区别

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password:
    database: 0
    jedis:
      pool:
        max-idle: 6
        max-active: 32
        max-wait: 100
        min-idle: 4

3. AutoConfig

与前面不同的是,我们需要定义一个RedisConnectionFactory的bean作为默认的连接工厂,以此来确定底层的连接采用的是Jedis客户端

@Configuration
public class RedisAutoConfig {

    @Bean
    public RedisConnectionFactory redisConnectionFactory(JedisPoolConfig jedisPool,
            RedisStandaloneConfiguration jedisConfig) {
        JedisConnectionFactory connectionFactory = new JedisConnectionFactory(jedisConfig);
        connectionFactory.setPoolConfig(jedisPool);
        return connectionFactory;
    }

    @Configuration
    public static class JedisConf {
        @Value("${spring.redis.host:127.0.0.1}")
        private String host;
        @Value("${spring.redis.port:6379}")
        private Integer port;
        @Value("${spring.redis.password:}")
        private String password;
        @Value("${spring.redis.database:0}")
        private Integer database;

        @Value("${spring.redis.jedis.pool.max-active:8}")
        private Integer maxActive;
        @Value("${spring.redis.jedis.pool.max-idle:8}")
        private Integer maxIdle;
        @Value("${spring.redis.jedis.pool.max-wait:-1}")
        private Long maxWait;
        @Value("${spring.redis.jedis.pool.min-idle:0}")
        private Integer minIdle;

        @Bean
        public JedisPoolConfig jedisPool() {
            JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
            jedisPoolConfig.setMaxIdle(maxIdle);
            jedisPoolConfig.setMaxWaitMillis(maxWait);
            jedisPoolConfig.setMaxTotal(maxActive);
            jedisPoolConfig.setMinIdle(minIdle);
            return jedisPoolConfig;
        }

        @Bean
        public RedisStandaloneConfiguration jedisConfig() {
            RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
            config.setHostName(host);
            config.setPort(port);
            config.setDatabase(database);
            config.setPassword(RedisPassword.of(password));
            return config;
        }
    }
}

4. 测试

测试主要就是查看下RedisTemplate的连接工厂类,到底是啥,简单的是截图如下

testshow

II. 其他

0. 项目

3 - 3.String数据结构的读写

Redis目前常用的存储结构有五种,String字符串,List列表,Set集合,Hash散列,ZSet有序集合;本篇则主要集中在String这个数据结构的读写操作之上

对于String的操作,除了常见的get/set之外,还有一些比较有特色的功能,如用于实现redis分布式锁的setnx/getset方法;用于实现计数的incr/decr方法;位图算法的经典实用场景之bitmap的使用方法等也有必要了解一下

I. 使用方法

1. 基本配置

在项目中首先需要添加基本的依赖,然后我们使用默认的Redis配置进行操作,即本机启动redis服务,端口号为6379,密码没有

<dependencies>
  <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
  </dependency>
</dependencies>

2. Get/Set方法

直接使用默认的RedisTemplate进行redis的读写操作,因为我们没有指定序列化方式,所以本篇使用中,并没有使用更简单的opsForValue进行操作,具体原因可以参考博文

@Component
public class KVBean {

    private final StringRedisTemplate redisTemplate;

    public KVBean(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    // kv数据结构的测试相关

    /**
     * 设置并获取之间的结果,要求key,value都不能为空;如果之前没有值,返回null
     *
     * @param key
     * @param value
     * @return
     */
    public byte[] setAndGetOldValue(String key, String value) {
        return redisTemplate.execute((RedisCallback<byte[]>) con -> con.getSet(key.getBytes(), value.getBytes()));
    }

    public Boolean setValue(String key, String value) {
        return redisTemplate
                .execute((RedisCallback<Boolean>) connection -> connection.set(key.getBytes(), value.getBytes()));
    }

    public byte[] getValue(String key) {
        return redisTemplate.execute((RedisCallback<byte[]>) connection -> connection.get(key.getBytes()));
    }

    public Boolean mSetValue(Map<String, String> values) {
        Map<byte[], byte[]> map = new HashMap<>(values.size());
        for (Map.Entry<String, String> entry : values.entrySet()) {
            map.put(entry.getKey().getBytes(), entry.getValue().getBytes());
        }

        return redisTemplate.execute((RedisCallback<Boolean>) con -> con.mSet(map));
    }

    public List<byte[]> mGetValue(List<String> keys) {
        return redisTemplate.execute((RedisCallback<List<byte[]>>) con -> {
            byte[][] bkeys = new byte[keys.size()][];
            for (int i = 0; i < keys.size(); i++) {
                bkeys[i] = keys.get(i).getBytes();
            }
            return con.mGet(bkeys);
        });
    }
}

上面演示了基本的get/set方法的使用,以及批量的读取和写入缓存值;也给出了getSet方法,基本上就是redis命令的一层浅封装;

对于getset方法,表示的是用新的值覆盖旧的值,并返回旧的值,如果旧的不存在,则返回null

3. 计数

统计计数,也算是一个比较常见的case了,虽然对于redis而言,存储的都是String,但是从我们的逻辑上来看,存在redis中的值,应该是数字型,然后就可以直接传入一个增量,实现存储数据的运算效果,并返回最终的结果

因为redis是单进程方式的,因此采用redis的计数方式,可以较简单的实现分布式的计数效果

 // 自增、自减方式实现计数

/**
 * 实现计数的加/减( value为负数表示减)
 *
 * @param key
 * @param value
 * @return 返回redis中的值
 */
public Long incr(String key, long value) {
    return redisTemplate.execute((RedisCallback<Long>) con -> con.incrBy(key.getBytes(), value));
}

public Long decr(String key, long value) {
    return redisTemplate.execute((RedisCallback<Long>) con -> con.decrBy(key.getBytes(), value));
}

4. bitmap操作

位图操作,什么地方用得比较多呢?一个经典的case就是统计网站的日活,用户首次登陆时,根据用户id,设置位图中下标为userId的值为1,表示这个用户激活了;然后一天结束之后,只需要统计这个位图中为1的个数就可以知道每日的日活;也可以借此来统计每个用户的活跃状况

下面给出几个常用的位图方法

// bitmap的测试相关

public Boolean setBit(String key, Integer index, Boolean tag) {
    return redisTemplate.execute((RedisCallback<Boolean>) con -> con.setBit(key.getBytes(), index, tag));
}

public Boolean getBit(String key, Integer index) {
    return redisTemplate.execute((RedisCallback<Boolean>) con -> con.getBit(key.getBytes(), index));
}

/**
 * 统计bitmap中,value为1的个数,非常适用于统计网站的每日活跃用户数等类似的场景
 *
 * @param key
 * @return
 */
public Long bitCount(String key) {
    return redisTemplate.execute((RedisCallback<Long>) con -> con.bitCount(key.getBytes()));
}

public Long bitCount(String key, int start, int end) {
    return redisTemplate.execute((RedisCallback<Long>) con -> con.bitCount(key.getBytes(), start, end));
}

public Long bitOp(RedisStringCommands.BitOperation op, String saveKey, String... desKey) {
    byte[][] bytes = new byte[desKey.length][];
    for (int i = 0; i < desKey.length; i++) {
        bytes[i] = desKey[i].getBytes();
    }
    return redisTemplate.execute((RedisCallback<Long>) con -> con.bitOp(op, saveKey.getBytes(), bytes));
}

前面的setBit和getBit就比较简单了,设置或获取位图中某个小标的值;bitCount主要就是统计为1的个数;下面主要说一下最后一个方法bitOp

BITOP 命令支持 AND 、 OR 、 NOT 、 XOR 这四种操作中的任意一种参数:

  • BITOP AND destkey srckey1 … srckeyN ,对一个或多个 key 求逻辑与,并将结果保存到 destkey
  • BITOP OR destkey srckey1 … srckeyN,对一个或多个 key 求逻辑或,并将结果保存到 destkey
  • BITOP XOR destkey srckey1 … srckeyN,对一个或多个 key 求逻辑异或,并将结果保存到 destkey
  • BITOP NOT destkey srckey,对给定 key 求逻辑非,并将结果保存到 destkey

除了 NOT 操作之外,其他操作都可以接受一个或多个 key 作为输入,执行结果将始终保持到destkey里面。

5. 其他

上面的几个操作都是持久化的写入到redis,如果希望写入待失效时间的,可以使用setex,在写入值的同时加上失效时间;也可以调用额外的设置key的失效时间的方式;使用姿势比较简单,不单独列出

至于另外一个setnx的使用,则将放在后面的应用篇中,实现一个redis的分布式锁的时候,一并加以说明;而bitmap的更多使用,在也会放在后面的借助bitmap实现某个特殊业务场景的应用篇中详细说明

II. 简单测试

简单演示下基本的使用姿势

@RestController
@RequestMapping(path = "rest")
public class DemoRest {

    @Autowired
    private KVBean kvBean;

    @GetMapping(path = "show")
    public String showKv(String key) {
        Map<String, String> result = new HashMap<>(16);

        // kv test
        String kvKey = "kvKey";
        String kvVal = UUID.randomUUID().toString();
        kvBean.setValue(kvKey, kvVal);
        String kvRes = new String(kvBean.getValue(kvKey));
        result.put("kv get set", kvRes + "==>" + kvVal.equals(kvRes));


        // kv getSet

        // 如果原始数据存在时
        String kvOldRes = new String(kvBean.setAndGetOldValue(kvKey, kvVal + "==>new"));
        result.put("kv setAndGet", kvOldRes + " # " + new String(kvBean.getValue(kvKey)));

        // 如果原始数据不存在时
        byte[] kvOldResNull = kvBean.setAndGetOldValue("not exists", "...");
        result.put("kv setAndGet not exists", kvOldResNull == null ? "null" : new String(kvOldResNull));


        // 自增
        String cntKey = "kvIncrKey";
        long val = 10L;
        long incrRet = kvBean.incr(cntKey, val);
        String incrRes = new String(kvBean.getValue(cntKey));
        result.put("kv incr", incrRet + "#" + incrRes);


        // bitmap 测试
        String bitMapKey = "bitmapKey";
        kvBean.setBit(bitMapKey, 100, true);
        boolean bitRes = kvBean.getBit(bitMapKey, 100);
        boolean bitRes2 = kvBean.getBit(bitMapKey, 101);
        result.put("bitMap", bitRes + ">> true | " + bitRes2 + ">> false");
        return JSONObject.toJSONString(result);
    }
}

演示结果如下

show

II. 其他

0. 项目

4 - 4.List数据结构使用姿势

前面一篇博文介绍redis五种数据结构中String的使用姿势,这一篇则将介绍另外一个用的比较多的List,对于列表而言,用的最多的场景可以说是当做队列或者堆栈来使用了

I. 基本使用

1. 序列化指定

前面一篇的操作都是直接使用的execute配合回调方法来说明的,其实还有一种更加方便的方式,即 opsForValue, opsForList,本文则以这种方式演示list数据结构的操作

所以在正式开始之前,有必要指定一下key和value的序列化方式,当不现实指定时,采用默认的序列化(即jdk的对象序列化方式),直接导致的就是通过redis-cli获取存储数据时,会发现和你预期的不一样

首先实现序列化类

public class DefaultSerializer implements RedisSerializer<Object> {
    private final Charset charset;

    public DefaultSerializer() {
        this(Charset.forName("UTF8"));
    }

    public DefaultSerializer(Charset charset) {
        Assert.notNull(charset, "Charset must not be null!");
        this.charset = charset;
    }


    @Override
    public byte[] serialize(Object o) throws SerializationException {
        return o == null ? null : String.valueOf(o).getBytes(charset);
    }

    @Override
    public Object deserialize(byte[] bytes) throws SerializationException {
        return bytes == null ? null : new String(bytes, charset);

    }
}

其次定义RedisTemplate的序列化方式

@Configuration
public class AutoConfig {

    @Bean
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, String> redis = new RedisTemplate<>();
        redis.setConnectionFactory(redisConnectionFactory);

        // 设置redis的String/Value的默认序列化方式
        DefaultSerializer stringRedisSerializer = new DefaultSerializer();
        redis.setKeySerializer(stringRedisSerializer);
        redis.setValueSerializer(stringRedisSerializer);
        redis.setHashKeySerializer(stringRedisSerializer);
        redis.setHashValueSerializer(stringRedisSerializer);

        redis.afterPropertiesSet();
        return redis;
    }
}

2. 添加元素

对于list而言,添加元素常见的有两种,从左边加和从右边加,以lpush为例

/**
 * 在列表的最左边塞入一个value
 *
 * @param key
 * @param value
 */
public void lpush(String key, String value) {
    redisTemplate.opsForList().leftPush(key, value);
}

3. 获取元素

既然是list,就是有序的,因此完全是可以向jdk的list容器一样,获取指定索引的值

/**
 * 获取指定索引位置的值, index为-1时,表示返回的是最后一个;当index大于实际的列表长度时,返回null
 *
 * @param key
 * @param index
 * @return
 */
public String index(String key, int index) {
    return redisTemplate.opsForList().index(key, index);
}

与jdk中的List获取某个索引value不同的是,这里的index可以为负数,-1表示最右边的一个,-2则表示最右边的第二个,依次

4. 范围查询

这个查询就类似JDK容器中的List#subList了,查询指定范围的列表

/**
 * 获取范围值,闭区间,start和end这两个下标的值都会返回; end为-1时,表示获取的是最后一个;
 *
 * 如果希望返回最后两个元素,可以传入  -2, -1
 *
 * @param key
 * @param start
 * @param end
 * @return
 */
public List<String> range(String key, int start, int end) {
    return redisTemplate.opsForList().range(key, start, end);
}

5. 列表长度

/**
 * 返回列表的长度
 *
 * @param key
 * @return
 */
public Long size(String key) {
    return redisTemplate.opsForList().size(key);
}

6. 修改

更新List中某个下标的value,也属于比较常见的case了,

/**
 * 设置list中指定下标的值,采用干的是替换规则, 最左边的下标为0;-1表示最右边的一个
 *
 * @param key
 * @param index
 * @param value
 */
public void set(String key, Integer index, String value) {
    redisTemplate.opsForList().set(key, index, value);
}

7. 删除

在接口中没有看到删除指定小标的元素,倒是看到可以根据value进行删除,以及控制列表长度的方法

/**
 * 删除列表中值为value的元素,总共删除count次;
 *
 * 如原来列表为 【1, 2, 3, 4, 5, 2, 1, 2, 5】
 * 传入参数 value=2, count=1 表示删除一个列表中value为2的元素
 * 则执行后,列表为 【1, 3, 4, 5, 2, 1, 2, 5】
 *
 * @param key
 * @param value
 * @param count
 */
public void remove(String key, String value, int count) {
    redisTemplate.opsForList().remove(key, count, value);
}

/**
 * 删除list首尾,只保留 [start, end] 之间的值
 *
 * @param key
 * @param start
 * @param end
 */
public void trim(String key, Integer start, Integer end) {
    redisTemplate.opsForList().trim(key, start, end);
}

个人感觉在实际的使用中remove这个方法用得不太多;但是trim方法则比较有用了,特别是在控制list的长度,避免出现非常大的列表时,很有效果,传入的start/end参数,采用的是闭区间的原则

II. 其他

0. 项目

5 - 5.Hash数据结构使用姿势

Redis的五大数据结构,前面讲述了String和List的使用姿势,而Hash数据结构,也是比较常用的,接下来看下hash数据结构的读取,删除,塞入的基本使用姿势

I. 基本使用

在开始之前,序列化的指定需要额外处理,上一篇已经提及,相关内容可以参考:

1. 查询元素

hash数据结构和我们理解jdk中的hash差不多,使用的姿势也没什么区别,需要注意的是需要,定位一个元素,需要由缓存的key + hash的key-field

/**
 * 获取hash中field对应的值
 *
 * @param key
 * @param field
 * @return
 */
public String hget(String key, String field) {
    Object val = redisTemplate.opsForHash().get(key, field);
    return val == null ? null : val.toString();
}

2. 添加元素

/**
 * 添加or更新hash的值
 *
 * @param key
 * @param field
 * @param value
 */
public void hset(String key, String field, String value) {
    redisTemplate.opsForHash().put(key, field, value);
}

3. 删除

hash最好的一个地方,我个人感觉就是在删除时特别方便,比如将同类的数据聚集在一个hash中,删除key就可以实现全部都删除,清理数据就比较方便了;除此之外,另外一种就是删除hash中的部分key

/**
 * 删除hash中field这一对kv
 *
 * @param key
 * @param field
 */
public void hdel(String key, String field) {
    redisTemplate.opsForHash().delete(key, field);
}

4. 批量查询

批量查询有两种,一个是全部捞出来,一个是捞出指定key的相关数据

public Map<String, String> hgetall(String key) {
    return redisTemplate.execute((RedisCallback<Map<String, String>>) con -> {
        Map<byte[], byte[]> result = con.hGetAll(key.getBytes());
        if (CollectionUtils.isEmpty(result)) {
            return new HashMap<>(0);
        }

        Map<String, String> ans = new HashMap<>(result.size());
        for (Map.Entry<byte[], byte[]> entry : result.entrySet()) {
            ans.put(new String(entry.getKey()), new String(entry.getValue()));
        }
        return ans;
    });
}

public Map<String, String> hmget(String key, List<String> fields) {
    List<String> result = redisTemplate.<String, String>opsForHash().multiGet(key, fields);
    Map<String, String> ans = new HashMap<>(fields.size());
    int index = 0;
    for (String field : fields) {
        if (result.get(index) == null) {
            continue;
        }
        ans.put(field, result.get(index));
    }
    return ans;
}

5. 自增

hash的value如果是数字,提供了一个自增的方式,和String中的incr/decr差不多的效果

// hash 结构的计数

public long hincr(String key, String field, long value) {
    return redisTemplate.opsForHash().increment(key, field, value);
}

6. hash + list

hash的value如果另外一种场景就是数组,目前没有找到特别友好的操作方式,只能在业务层进行兼容

/**
 * value为列表的场景
 *
 * @param key
 * @param field
 * @return
 */
public <T> List<T> hGetList(String key, String field, Class<T> obj) {
    Object value = redisTemplate.opsForHash().get(key, field);
    if (value != null) {
        return JSONObject.parseArray(value.toString(), obj);
    } else {
        return new ArrayList<>();
    }
}

public <T> void hSetList(String key, String field, List<T> values) {
    String v = JSONObject.toJSONString(values);
    redisTemplate.opsForHash().put(key, field, v);
}

II. 其他

0. 项目

6 - 6.Set数据结构使用姿势

Redis的五大数据结构,前面讲述了String和List,Hash的使用姿势,接下来就是Set集合,与list最大的区别就是里面不允许存在重复的数据

I. 基本使用

在开始之前,序列化的指定需要额外处理,上一篇已经提及,相关内容可以参考:

1. 新增元素

新增元素时,可以根据返回值来判断是否添加成功, 如下面的单个插入时,如果集合中之前就已经有数据了,那么返回0,否则返回1


/**
 * 新增一个  sadd
 *
 * @param key
 * @param value
 */
public void add(String key, String value) {
    redisTemplate.opsForSet().add(key, value);
}

2. 删除元素

因为list是有序的,所以在list的删除需要指定位置;而set则不需要

/**
 * 删除集合中的值  srem
 *
 * @param key
 * @param value
 */
public void remove(String key, String value) {
    redisTemplate.opsForSet().remove(key, value);
}

3. 判断是否存在

set一个最大的应用场景就是判断某个元素是否有了,从而决定怎么执行后续的操作, 用的是 isMember方法,来判断集合中是否存在某个value

/**
 * 判断是否包含  sismember
 *
 * @param key
 * @param value
 */
public void contains(String key, String value) {
    redisTemplate.opsForSet().isMember(key, value);
}

4. 获取所有的value

set无序,因此像list一样获取某个范围的数据,不太容易,更常见的方式就是全部获取出来

/**
 * 获取集合中所有的值 smembers
 *
 * @param key
 * @return
 */
public Set<String> values(String key) {
    return redisTemplate.opsForSet().members(key);
}

5. 集合运算

set还提供了另外几个高级一点的功能,就是集合的运算,如求并集,交集等操作,虽然在我有限的业务应用中,并没有使用到这些高级功能,下面依然个给出使用的姿势

/**
 * 返回多个集合的并集  sunion
 *
 * @param key1
 * @param key2
 * @return
 */
public Set<String> union(String key1, String key2) {
    return redisTemplate.opsForSet().union(key1, key2);
}

/**
 * 返回多个集合的交集 sinter
 *
 * @param key1
 * @param key2
 * @return
 */
public Set<String> intersect(String key1, String key2) {
    return redisTemplate.opsForSet().intersect(key1, key2);
}

/**
 * 返回集合key1中存在,但是key2中不存在的数据集合  sdiff
 *
 * @param key1
 * @param key2
 * @return
 */
public Set<String> diff(String key1, String key2) {
    return redisTemplate.opsForSet().difference(key1, key2);
}

II. 其他

0. 项目

7 - 7.ZSet数据结构使用姿势

Redis的五大数据结构,目前就剩下最后的ZSET,可以简单的理解为带权重的集合;与前面的set最大的区别,就是每个元素可以设置一个score,从而可以实现各种排行榜的功能

I. 基本使用

在开始之前,序列化的指定需要额外处理,前面List这一篇已经提及,相关内容可以参考:

1. 新增元素

新增元素时,用起来和set差不多,无非是多一个score的参数指定而已

如果元素存在,会用新的score来替换原来的,返回0;如果元素不存在,则会会新增一个

/**
 * 添加一个元素, zset与set最大的区别就是每个元素都有一个score,因此有个排序的辅助功能;  zadd
 *
 * @param key
 * @param value
 * @param score
 */
public void add(String key, String value, double score) {
    redisTemplate.opsForZSet().add(key, value, score);
}

2. 删除元素

删除就和普通的set没啥区别了

/**
 * 删除元素 zrem
 *
 * @param key
 * @param value
 */
public void remove(String key, String value) {
    redisTemplate.opsForZSet().remove(key, value);
}

3. 修改score

zset中的元素塞入之后,可以修改其score的值,通过 zincrby 来对score进行加/减;当元素不存在时,则会新插入一个

从上面的描述来看,zincrbyzadd 最大的区别是前者是增量修改;后者是覆盖score方式

/**
 * score的增加or减少 zincrby
 *
 * @param key
 * @param value
 * @param score
 */
public Double incrScore(String key, String value, double score) {
    return redisTemplate.opsForZSet().incrementScore(key, value, score);
}

4. 获取value对应的score

这个需要注意的是,当value在集合中时,返回其score;如果不在,则返回null

/**
 * 查询value对应的score   zscore
 *
 * @param key
 * @param value
 * @return
 */
public Double score(String key, String value) {
    return redisTemplate.opsForZSet().score(key, value);
}

5. 获取value在集合中排名

前面是获取value对应的score;这里则是获取排名;这里score越小排名越高;

从这个使用也可以看出结合4、5, 用zset来做排行榜可以很简单的获取某个用户在所有人中的排名与积分

/**
 * 判断value在zset中的排名  zrank
 *
 * @param key
 * @param value
 * @return
 */
public Long rank(String key, String value) {
    return redisTemplate.opsForZSet().rank(key, value);
}

6. 集合大小

/**
 * 返回集合的长度
 *
 * @param key
 * @return
 */
public Long size(String key) {
    return redisTemplate.opsForZSet().zCard(key);
}

7. 获取集合中数据

因为是有序,所以就可以获取指定范围的数据,下面有两种方式

  • 根据排序位置获取数据
  • 根据score区间获取排序位置
/**
 * 查询集合中指定顺序的值, 0 -1 表示获取全部的集合内容  zrange
 *
 * 返回有序的集合,score小的在前面
 *
 * @param key
 * @param start
 * @param end
 * @return
 */
public Set<String> range(String key, int start, int end) {
    return redisTemplate.opsForZSet().range(key, start, end);
}

/**
 * 查询集合中指定顺序的值和score,0, -1 表示获取全部的集合内容
 *
 * @param key
 * @param start
 * @param end
 * @return
 */
public Set<ZSetOperations.TypedTuple<String>> rangeWithScore(String key, int start, int end) {
    return redisTemplate.opsForZSet().rangeWithScores(key, start, end);
}

/**
 * 查询集合中指定顺序的值  zrevrange
 *
 * 返回有序的集合中,score大的在前面
 *
 * @param key
 * @param start
 * @param end
 * @return
 */
public Set<String> revRange(String key, int start, int end) {
    return redisTemplate.opsForZSet().reverseRange(key, start, end);
}

/**
 * 根据score的值,来获取满足条件的集合  zrangebyscore
 *
 * @param key
 * @param min
 * @param max
 * @return
 */
public Set<String> sortRange(String key, int min, int max) {
    return redisTemplate.opsForZSet().rangeByScore(key, min, max);
}

II. 其他

0. 项目

8 - 10.Redis集群环境配置

之前介绍的几篇redis的博文都是基于单机的redis基础上进行演示说明的,然而在实际的生产环境中,使用redis集群的可能性应该是大于单机版的redis的,那么集群的redis如何操作呢?它的配置和单机的有什么区别,又有什么需要注意的呢?

本篇将主要介绍SpringBoot项目整合redis集群,并针对这个过程中出现的问题进行说明,并给出相应的解决方案

I. 环境相关

首先需要安装redis集群环境,可以参考博文:redis-集群搭建手册

然后初始化springboot项目,对应的pom结构如下

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.7</version>
    <relativePath/> <!-- lookup parent from update -->
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <spring-cloud.version>Finchley.RELEASE</spring-cloud.version>
    <java.version>1.8</java.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-pool2</artifactId>
    </dependency>
</dependencies>

<build>
    <pluginManagement>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </pluginManagement>
</build>
<repositories>
    <repository>
        <id>spring-milestones</id>
        <name>Spring Milestones</name>
        <url>https://repo.spring.io/milestone</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
</repositories

需要注意的是,我们引入了两个包,一个是必要的 spring-boot-starter-data-redis,官方封装的一个操作redis的start工具,借助它我们可以很方便的直接使用RedisTemplate来操作redis

另外一个是commonos-pool2这个包,主要是当我们配置了redis的连接池的时候,需要用到它,否则会抛一个Class Not Found 的异常

II. 环境整合及采坑

这里我将redis集群搭建在局域网内的一台centos机器上,从后面的配置文件也可以看出(为什么这么处理?主要是为了引出后面一个问题)

1. application.yml

首我们先按照默认的配置方式,来获取我们的 RedisTemplate, 以实现最快的接入redis集群

spring:
  redis:
    password:
    cluster:
      nodes: 192.168.0.203:7000,192.168.0.203:7001,192.168.0.203:7002
      max-redirects: 3
    lettuce:
      pool:
        max-idle: 16
        max-active: 32
        min-idle: 8

我们搭建的redis集群,没有做主备(否则需要6个实例),为了省事,也没有设置密码(生产环境下,这是严格禁止的)

2. 使用测试

因为我们采用默认的配置,因此可以直接获取RedisTemplate的bean对象,来操作redis集群

@SpringBootApplication
public class Application {

    public Application(RedisTemplate redisTemplate) {
        redisTemplate.opsForValue().set("spring-r-cluster-1", 123);
        redisTemplate.opsForValue().set("spring-r-cluster-2", 456);
        redisTemplate.opsForValue().set("spring-r-cluster-3", 789);
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
}

a. 拒绝连接

上面执行之后,报的第一个错误是连接拒绝,而我在redis集群所在的机器(203)上是可以连接成功的,但是本机连接报错

出现上面的问题,一般有两个原因,一个是防火墙导致端口不能外部访问,一个是redis的配置

防火墙的确认方式

  • 判断防火墙是否开启: firewall-cmd --state 如果提示not running 表示未开启
  • 查看防火墙规则: firewall-cmd --list-all

然后可以根据实际场景,添加端口

# 永久开启7000端口的公共访问权限
sudo firewall-cmd --zone=public --add-port=7000/tcp --permanent
sudo firewall-cmd --reload

当然在内网的测试环境下,可以直接关闭防火墙

//Disable firewall
systemctl disable firewalld
systemctl stop firewalld
systemctl status firewalld

//Enable firewall
systemctl enable firewalld
systemctl start firewalld
systemctl status firewalld

redis配置

如果确认不是防火墙问题,那么多半是redis的配置需要修改一下了,在redis.conf中,有一行bind 127.0.0.1配置默认开启,表示只允许本机访问,其他机器无权访问

解决办法就是修改一下这个配置,并重启

bind 0.0.0.0

b. Unable to connect to 127.0.0.1:7001

执行前面的测试用例时,发现会抛一个奇怪的异常如下

关键堆栈信息如下

Caused by: org.springframework.data.redis.RedisSystemException: Redis exception; nested exception is io.lettuce.core.RedisException: io.lettuce.core.RedisConnectionException: Unable to connect to 127.0.0.1:7001
	at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:74) ~[spring-data-redis-2.0.9.RELEASE.jar:2.0.9.RELEASE]
	at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:41) ~[spring-data-redis-2.0.9.RELEASE.jar:2.0.9.RELEASE]
	at org.springframework.data.redis.PassThroughExceptionTranslationStrategy.translate(PassThroughExceptionTranslationStrategy.java:44) ~[spring-data-redis-2.0.9.RELEASE.jar:2.0.9.RELEASE]
	at org.springframework.data.redis.FallbackExceptionTranslationStrategy.translate(FallbackExceptionTranslationStrategy.java:42) ~[spring-data-redis-2.0.9.RELEASE.jar:2.0.9.RELEASE]
	at org.springframework.data.redis.connection.lettuce.LettuceConnection.convertLettuceAccessException(LettuceConnection.java:257) ~[spring-data-redis-2.0.9.RELEASE.jar:2.0.9.RELEASE]
	at org.springframework.data.redis.connection.lettuce.LettuceStringCommands.convertLettuceAccessException(LettuceStringCommands.java:718) ~[spring-data-redis-2.0.9.RELEASE.jar:2.0.9.RELEASE]
	at org.springframework.data.redis.connection.lettuce.LettuceStringCommands.set(LettuceStringCommands.java:143) ~[spring-data-redis-2.0.9.RELEASE.jar:2.0.9.RELEASE]
	at org.springframework.data.redis.connection.DefaultedRedisConnection.set(DefaultedRedisConnection.java:231) ~[spring-data-redis-2.0.9.RELEASE.jar:2.0.9.RELEASE]
	at org.springframework.data.redis.core.DefaultValueOperations$3.inRedis(DefaultValueOperations.java:202) ~[spring-data-redis-2.0.9.RELEASE.jar:2.0.9.RELEASE]
	at org.springframework.data.redis.core.AbstractOperations$ValueDeserializingRedisCallback.doInRedis(AbstractOperations.java:59) ~[spring-data-redis-2.0.9.RELEASE.jar:2.0.9.RELEASE]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:224) ~[spring-data-redis-2.0.9.RELEASE.jar:2.0.9.RELEASE]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:184) ~[spring-data-redis-2.0.9.RELEASE.jar:2.0.9.RELEASE]
	at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:95) ~[spring-data-redis-2.0.9.RELEASE.jar:2.0.9.RELEASE]
	at org.springframework.data.redis.core.DefaultValueOperations.set(DefaultValueOperations.java:198) ~[spring-data-redis-2.0.9.RELEASE.jar:2.0.9.RELEASE]
	at com.git.hui.boot.redis.cluster.Application.<init>(Application.java:14) [classes/:na]
	at com.git.hui.boot.redis.cluster.Application$$EnhancerBySpringCGLIB$$ac0c03ba.<init>(<generated>) ~[classes/:na]
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_171]
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_171]
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_171]
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_171]
	at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:170) ~[spring-beans-5.0.8.RELEASE.jar:5.0.8.RELEASE]
	... 19 common frames omitted
Caused by: io.lettuce.core.RedisException: io.lettuce.core.RedisConnectionException: Unable to connect to 127.0.0.1:7001
	at io.lettuce.core.LettuceFutures.awaitOrCancel(LettuceFutures.java:125) ~[lettuce-core-5.0.4.RELEASE.jar:na]
	at io.lettuce.core.cluster.ClusterFutureSyncInvocationHandler.handleInvocation(ClusterFutureSyncInvocationHandler.java:118) ~[lettuce-core-5.0.4.RELEASE.jar:na]
	at io.lettuce.core.internal.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:80) ~[lettuce-core-5.0.4.RELEASE.jar:na]
	at com.sun.proxy.$Proxy44.set(Unknown Source) ~[na:na]
	at org.springframework.data.red

通过断点可以看到,集群中的节点ip/端口是准确的,但是异常提示出来个无法连接127.0.0.1:7001,出现这个问题的原因,主要是我们在创建redis集群的时候,设置集群节点使用如下面的命令

redis/src/redis-cli --cluster create 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002

通过上面这种方式创建的redis集群,并没有什么问题,但是在springbot的整合中,通过redis集群获取到的节点信息就是127.0.0.1:7000… 然后导致上面的问题,因此一个解决办法是在创建集群的时候,指定下ip

首先数据和配置,然后重新建立集群关系

# 删除数据配置
rm xxx/data/*

redis/src/redis-cli  --cluster create 192.168.0.203:7000 192.168.0.203:7001 192.168.0.203:7002

然后再次测试ok

3. jedis配置

前面的配置默认会使用letttuce作为redis的桥接工具,如果我们底层想使用jedis,可以怎么操作?

首先在pom依赖中添加jedis依赖

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
</dependency>

yml文件中的配置基本上不改都ok,在实际的项目中,对连接池稍微改了一下,不影响阅读,这里不贴出

接下来是定义RedisConnectionFactoy来替换默认的

下面的配置和以前的一篇博文 181101-SpringBoot高级篇Redis之Jedis配置 基本差不多,需要注意的是我们使用RedisClusterConfiguration替换了RedisStandaloneConfiguration

@Configuration
public class RedisAutoConfig {

    @Bean
    public RedisConnectionFactory redisConnectionFactory(JedisPoolConfig jedisPool,
            RedisClusterConfiguration jedisConfig) {
        JedisConnectionFactory factory = new JedisConnectionFactory(jedisConfig, jedisPool);
        factory.afterPropertiesSet();
        return factory;
    }

    @Configuration
    public static class JedisConf {
        @Value("${spring.redis.cluster.nodes:127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002}")
        private String nodes;
        @Value("${spring.redis.cluster.max-redirects:3}")
        private Integer maxRedirects;
        @Value("${spring.redis.password:}")
        private String password;
        @Value("${spring.redis.database:0}")
        private Integer database;

        @Value("${spring.redis.jedis.pool.max-active:8}")
        private Integer maxActive;
        @Value("${spring.redis.jedis.pool.max-idle:8}")
        private Integer maxIdle;
        @Value("${spring.redis.jedis.pool.max-wait:-1}")
        private Long maxWait;
        @Value("${spring.redis.jedis.pool.min-idle:0}")
        private Integer minIdle;

        @Bean
        public JedisPoolConfig jedisPool() {
            JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
            jedisPoolConfig.setMaxIdle(maxIdle);
            jedisPoolConfig.setMaxWaitMillis(maxWait);
            jedisPoolConfig.setMaxTotal(maxActive);
            jedisPoolConfig.setMinIdle(minIdle);
            return jedisPoolConfig;
        }

        @Bean
        public RedisClusterConfiguration jedisConfig() {
            RedisClusterConfiguration config = new RedisClusterConfiguration();

            String[] sub = nodes.split(",");
            List<RedisNode> nodeList = new ArrayList<>(sub.length);
            String[] tmp;
            for (String s : sub) {
                tmp = s.split(":");
                // fixme 先不考虑异常配置的case
                nodeList.add(new RedisNode(tmp[0], Integer.valueOf(tmp[1])));
            }

            config.setClusterNodes(nodeList);
            config.setMaxRedirects(maxRedirects);
            config.setPassword(RedisPassword.of(password));
            return config;
        }
    }
}

然后其他的依旧,此时RedisTemplate的底层连接就变成了Jedis

III. 其他

0. 项目&相关博文

关联博文

9 - 11.管道Pipelined使用姿势

redis管道技术,可以在服务端未响应时,客户端可以继续向服务端发送请求,并最终一次性读取所有服务端的响应,这种技术可以很方便的支持我们的批量请求,下面简单介绍下如何使用RedisTemplate来使用管道

I. 基本使用

1. 配置

我们使用SpringBoot 2.2.1.RELEASE来搭建项目环境,直接在pom.xml中添加redis依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

如果我们的redis是默认配置,则可以不额外添加任何配置;也可以直接在application.yml配置中,如下

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password:

2. 使用姿势

这里我们主要借助org.springframework.data.redis.core.RedisTemplate#executePipelined(org.springframework.data.redis.core.RedisCallback<?>),如下

@Component
public class PipelineBean {

    private RedisTemplate<String, String> redisTemplate;

    public PipelineBean(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }


    public void counter(String prefix, String key, String target) {
        // 请注意,返回的结果与内部的redis操作顺序是匹配的
        List<Object> res = redisTemplate.executePipelined(new RedisCallback<Long>() {
            @Override
            public Long doInRedis(RedisConnection redisConnection) throws DataAccessException {
                String mapKey = prefix + "_mp_" + key;
                String cntKey = prefix + "_cnt_" + target;
    
                redisConnection.openPipeline();
                redisConnection.incr(mapKey.getBytes());
                redisConnection.incr(cntKey.getBytes());
                return null;
            }
        });
        System.out.println(res);
    }
}

上面的使用中,有几个注意事项

  • redisConnection.openPipeline(); 开启管道
  • 返回结果为列表,内部第一个redis操作,对应的返回结果塞在列表的下标0;依次…

II. 其他

0. 项目

10 - 12.高级特性Bitmap使用姿势及应用场景介绍

前面介绍过redis的五种基本数据结构,如String,List, Set, ZSet, Hash,这些属于相对常见了;在这些基本结果之上,redis还提供了一些更高级的功能,如geo, bitmap, hyperloglog,pub/sub,本文将主要介绍Bitmap的使用姿势以及其适用场景,主要知识点包括

  • bitmap 基本使用
  • 日活统计应用场景中bitmap使用姿势
  • 点赞去重应用场景中bitmap使用姿势
  • 布隆过滤器bloomfilter基本原理及体验case

I. 基本使用

1. 配置

我们使用SpringBoot 2.2.1.RELEASE来搭建项目环境,直接在pom.xml中添加redis依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

如果我们的redis是默认配置,则可以不额外添加任何配置;也可以直接在application.yml配置中,如下

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password:

2. 使用姿势

bitmap主要就三个操作命令,setbitgetbit以及 bitcount

a. 设置标记

setbit,主要是指将某个索引,设置为1(设置0表示抹去标记),基本语法如下

# 请注意这个index必须是数字,后面的value必须是0/1
setbit key index 0/1

对应的SpringBoot中,借助RestTemplate可以比较容易的实现,通常有两种写法,都可以

@Autowired
private StringRedisTemplate redisTemplate;

/**
 * 设置标记位
 *
 * @param key
 * @param offset
 * @param tag
 * @return
 */
public Boolean mark(String key, long offset, boolean tag) {
    return redisTemplate.opsForValue().setBit(key, offset, tag);
}

public Boolean mark2(String key, long offset, boolean tag) {
    return redisTemplate.execute(new RedisCallback<Boolean>() {
        @Override
        public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
            return connection.setBit(key.getBytes(), offset, tag);
        }
    });
}

上面两种写法的核心区别,就是key的序列化问题,第一种写法使用默认的jdk字符串序列化,和后面的getBytes()会有一些区别,关于这个,有兴趣的小伙伴可以看一下我之前的博文: RedisTemplate配置与使用#序列化问题

b. 判断存在与否

getbit key index,如果返回1,表示存在否则不存在

/**
 * 判断是否标记过
 *
 * @param key
 * @param offest
 * @return
 */
public Boolean container(String key, long offest) {
    return redisTemplate.opsForValue().getBit(key, offest);
}

c. 计数

bitcount key,统计和

/**
 * 统计计数
 *
 * @param key
 * @return
 */
public long bitCount(String key) {
    return redisTemplate.execute(new RedisCallback<Long>() {
        @Override
        public Long doInRedis(RedisConnection redisConnection) throws DataAccessException {
            return redisConnection.bitCount(key.getBytes());
        }
    });
}

3. 应用场景

前面的基本使用比较简单,在介绍String数据结构的时候也提过,我们重点需要关注的是bitmap的使用场景,它可以干嘛用,什么场景下使用它会有显著的优势

  • 日活统计
  • 点赞
  • bloomfilter

上面三个场景虽有相似之处,但实际的应用场景还是些许区别,接下来我们逐一进行说明

a. 日活统计

统计应用或网站的日活,这个属于比较常见的case了,如果是用redis来做这个事情,首先我们最容易想到的是Hash结构,一般逻辑如下

  • 根据日期,设置key,如今天为 2020/10/13, 那么key可以为 app_20_10_13
  • 其次当用户访问时,设置field为userId, value设置为true
  • 判断日活则是统计map的个数hlen app_20_10_13

上面这个逻辑有毛病么?当然没有问题,但是想一想,当我们的应用做的很nb的时候,每天的日活都是百万,千万级时,这个内存开销就有点吓人了

接下来我们看一下bitmap可以怎么做

  • 同样根据日期设置key
  • 当用户访问时,index设置为userId,setbit app_20_10_13 uesrId 1
  • 日活统计 bitcount app_20_10_13

简单对比一下上面两种方案

当数据量小时,且userid分布不均匀,小的为个位数,大的几千万,上亿这种,使用bitmap就有点亏了,因为userId作为index,那么bitmap的长度就需要能容纳最大的userId,但是实际日活又很小,说明bitmap中间有大量的空白数据

反之当数据量很大时,比如百万/千万,userId是连续递增的场景下,bitmap的优势有两点:1.存储开销小, 2.统计总数快

c. 点赞

点赞的业务,最主要的一点是一个用户点赞过之后,就不能继续点赞了(当然某些业务场景除外),所以我们需要知道是否可以继续点赞

上面这个hash当然也可以实现,我们这里则主要讨论一下bitmap的实现逻辑

  • 比如我们希望对一个文章进行点赞统计,那么我们根据文章articleId来生成redisKey=like_1121,将userId作为index
  • 首先是通过getbit like_1121 userId 来判断是否点赞过,从而限制用户是否可以操作

Hash以及bitmap的选择和上面的考量范围差不多

d. 布隆过滤器bloomfilter

布隆过滤器可谓是大名鼎鼎了,我们这里简单的介绍一下这东西是啥玩意

  • 底层存储为一个bitmap
  • 当来一个数据时,经过n个hash函数,得到n个数值
  • 将hash得到的n个数值,映射到bitmap,标记对应的位置为1

如果来一个数据,通过hash计算之后,若这个n个值,对应的bitmap都是1,那么表示这个数据可能存在;如果有一个不为1,则表示这个数据一定不存在

请注意:不存在时,是一定不存在;存在时,则不一定

从上面的描述也知道,bloomfilter的底层数据结构就是bitmap,当然它的关键点在hash算法;根据它未命中时一定不存在的特性,非常适用于缓存击穿的问题解决

体验说明

Redis的布隆过滤器主要针对>=4.0,通过插件的形式提供,项目源码地址为: https://github.com/RedisBloom/RedisBloom,下面根据readme的说明,简单的体验一下redis中bloomfilter的使用姿势

# docker 方式安装
docker run -p 6379:6379 --name redis-redisbloom redislabs/rebloom:latest

# 通过redis-cli方式访问
docker exec -it redis-redisbloom bash

# 开始使用
# redis-cli
127.0.0.1:6379> keys *
(empty array)
127.0.0.1:6379> bf.add newFilter hello
(integer) 1
127.0.0.1:6379> bf.exists newFilter hello
(integer) 1
127.0.0.1:6379> bf.exists newFilter hell
(integer) 0

bloomfilter的使用比较简单,主要是两个命令bf.add添加元素,bf.exists判断是否存在,请注意它没有删除哦

4. 小结

bitmap位图属于一个比较精巧的数据结构,通常在数据量大的场景下,会有出现的表现效果;redis本身基于String数据结构来实现bitmap的功能支持,使用方式比较简单,基本上就下面三个命令

  • setbit key index 1/0: 设置
  • getbit key index: 判断是否存在
  • bitcount key: 计数统计

本文也给出了bitmap的三个常见的应用场景

  • 日活统计:主要借助bitcount来获取总数(后面会介绍,在日活十万百万以上时,使用hyperLogLog更优雅)
  • 点赞: 主要借助setbit/getbit来判断用户是否赞过,从而实现去重
  • bloomfilter: 基于bitmap实现的布隆过滤器,广泛用于去重的业务场景中(如缓存穿透,爬虫url去重等)

总的来讲,bitmap属于易用,巧用的数据结构,用得好即能节省内存也可以提高效率,用得不好貌似也不会带来太大的问题

II. 其他

0. 项目

系列博文

工程源码

11 - 13.高级特性发布订阅

通常来讲,当我们业务存在消息的业务逻辑时更多的是直接使用成熟的rabbitmq,rocketmq,但是一些简单的业务场景中,真的有必要额外的引入一个mq么?本文将介绍一下redis的发布订阅方式,来实现简易的消息系统逻辑

I. 基本使用

1. 配置

我们使用SpringBoot 2.2.1.RELEASE来搭建项目环境,直接在pom.xml中添加redis依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

如果我们的redis是默认配置,则可以不额外添加任何配置;也可以直接在application.yml配置中,如下

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password:

2. 使用姿势

redis的发布/订阅,主要就是利用两个命令publish/subscribe; 在SpringBoot中使用发布订阅模式比较简单,借助RedisTemplate可以很方便的实现

a. 消息发布

@Service
public class PubSubBean {
    @Autowired
    private StringRedisTemplate redisTemplate;

    public void publish(String key, String value) {
        redisTemplate.execute(new RedisCallback<Object>() {
            @Override
            public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
                redisConnection.publish(key.getBytes(), value.getBytes());
                return null;
            }
        });
    }
}

b. 订阅消息

消息订阅这里,需要注意我们借助org.springframework.data.redis.connection.MessageListener来实现消费逻辑

public void subscribe(MessageListener messageListener, String key) {
    redisTemplate.execute(new RedisCallback<Object>() {
        @Override
        public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
            redisConnection.subscribe(messageListener, key.getBytes());
            return null;
        }
    });
}

c. 测试case

写一个简单的测试case,来验证一下上面的发布订阅,顺带理解一下这个MessageListener的使用姿势;我们创建一个简单的WEB工程,提供两个rest接口

@RestController
@RequestMapping(path = "rest")
public class DemoRest {
    @Autowired
    private PubSubBean pubSubBean;
    
    // 发布消息
    @GetMapping(path = "pub")
    public String pubTest(String key, String value) {
        pubSubBean.publish(key, value);
        return "over";
    }
    
    // 新增消费者
    @GetMapping(path = "sub")
    public String subscribe(String key, String uuid) {
        pubSubBean.subscribe(new MessageListener() {
            @Override
            public void onMessage(Message message, byte[] bytes) {
                System.out.println(uuid + " ==> msg:" + message);
            }
        }, key);
        return "over";
    }
}

下面通过一个动图来演示一下case

a.gif

我们先创建了两个消费者,然后发送消息时,两个都收到;再新增一个消费者,发送消息时,三个都能收到

3. 使用说明与应用场景

redis的发布订阅,只适用于比较简单的场景,从上面的使用说明也能看出,它就是一个简单的发布订阅模型,支持1对N,而且发送的消息,只有在线的消费者才能get到(至于不在线的,那就只能说遗憾了)而且对于redis而言,消息推出去之后就完事了,至于消费者能不能正常消费,那就不care了

划重点:

  • 只有在线的消费者能接收到消息
  • 对于消费者一个消息只能拿到一次

接下来的问题就来了,什么样的场景下可以使用redis的发布订阅呢?

基于内存的缓存失效

利用reids + 内存做二级缓存,可以说是比较常见的方式了,借助基于内存的缓存,可以有效的提高系统的负载,但是问题也很明显,内存中缓存数据失效是个问题,特别是当一个应用部署多台服务器时,如果我希望同时失效所有服务器的某个内存缓存,使用redis的发布/订阅就是一个比较好的选择

SpringCloud Config配置刷新

使用SpringCloud Config做配置中心的小伙伴可能会经常遇到这个问题,配置修改之后的动态刷新是个问题(当然官方是支持通过mq走bus总线来同步,也可以通过spring boot admin来强刷)

借助redis发布/订阅,实现配置动态刷新也是一个不错的备选方案(后面给出一个具体的实现demo,如有兴趣请持续关注一灰灰Blog)

redis key失效订阅

我们在使用redis做缓存时,通常会设置一个过期时间,redis提供了一个过期的事件,当然默认是不开启的;我们也是可以通过subscribe来订阅缓存失效的事件

修改配置,开启key失效事件

notify-keyspace-events Ex

重启redis之后,订阅失效事件即可

subscribe __keyevent@0__:expired

II. 其他

0. 项目

系列博文

工程源码

12 - 14.高级特性HyperLoglog

hyperloglog算法,利用非常少的空间,实现比较大的数据量级统计;比如我们前面在介绍bitmap的过程中,说到了日活的统计,当数据量达到百万时,最佳的存储方式是hyperloglog,本文将介绍一下hyperloglog的基本原理,以及redis中的使用姿势

I. 基本使用

1. 配置

我们使用SpringBoot 2.2.1.RELEASE来搭建项目环境,直接在pom.xml中添加redis依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

如果我们的redis是默认配置,则可以不额外添加任何配置;也可以直接在application.yml配置中,如下

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password:

2. 使用姿势

我们下来看使用姿势,原理放在后面说明

redis中,hyperlolog使用非常简单,一般就两个操作命令,添加pfadd + 计数pfcount;另外还有一个不常用的merge

a. add

添加一条记录

public boolean add(String key, String obj) {
    // pfadd key obj
    return stringRedisTemplate.opsForHyperLogLog().add(key, obj) > 0;
}

b. pfcount

非精准的计数统计

public long count(String key) {
    // pfcount 非精准统计 key的计数
    return stringRedisTemplate.opsForHyperLogLog().size(key);
}

a. merge

将多个hyperloglog合并成一个新的hyperloglog;感觉用的场景并不会特别多

public boolean merge(String out, String... key) {
    // pfmerge out key1 key2  ---> 将key1 key2 合并成一个新的hyperloglog out
    return stringRedisTemplate.opsForHyperLogLog().union(out, key) > 0;
}

3. 原理说明

关于HyperLogLog的原理我这里也不进行详细赘述,说实话那一套算法以及调和平均公式我自己也没太整明白;下面大致说一下我个人的朴素理解

Redis中的HyperLogLog一共分了2^14=16384个桶,每个桶占6个bit

一个数据,塞入HyperLogLog之前,先hash一下,得到一个64位的二进制数据

  • 取低14位,用来定位桶的index
  • 高50位,从低到高数,找到第一个为1出现的位置n
    • 若桶中值 > n,则丢掉
    • 反之,则设置桶中的值为n

那么怎么进行计数统计呢?

  • 拿所有桶中的值,代入下面的公式进行计算

上面这个公式怎么得出的?

之前看到一篇文章,感觉不错,有兴趣了解原理的,可以移步: https://www.jianshu.com/p/55defda6dcd2

4. 应用场景

hyperloglog通常是用来非精确的计数统计,前面介绍了日活统计的case,当时使用的是bitmap来作为数据统计,然而当userId分散不均匀,小的特别小,大的特别大的时候,并不适用

在数据量级很大的情况下,hyperloglog的优势非常大,它所占用的存储空间是固定的2^14 下图引用博文《用户日活月活怎么统计》

使用HyperLogLog进行日活统计的设计思路比较简单

  • 每日生成一个key
  • 某个用户访问之后,执行 pfadd key userId
  • 统计总数: pfcount key

II. 其他

0. 项目

系列博文

工程源码

13 - 15.高级特性之GEO

GEO用于存储地理信息,最直观的就是我们日常使用的地图app中,如果我想查询我所在地的周边餐饮,就可以利用geo中的以(x,y)为圆心,以n为半径,扫描坐标在这个圈内的所有餐饮店,这个case借助redis的geo可以很方便的实现

I. 基本使用

1. 配置

我们使用SpringBoot 2.2.1.RELEASE来搭建项目环境,直接在pom.xml中添加redis依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

如果我们的redis是默认配置,则可以不额外添加任何配置;也可以直接在application.yml配置中,如下

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password:

2. 使用姿势

geo有6个常见的命令,下面逐一进行解释说明

a. geoadd 添加

存储指定的地理空间位置,一般需要三个基本的参数,经度 + 维度 + 位置名

private final StringRedisTemplate redisTemplate;

public GeoBean(StringRedisTemplate stringRedisTemplate) {
    this.redisTemplate = stringRedisTemplate;
}

/**
 * 添加geo信息
 *
 * @param key       缓存key
 * @param longitude 经度
 * @param latitude  纬度
 * @param member    位置名
 */
public void add(String key, double longitude, double latitude, String member) {
    // geoadd xhh_pos 114.31 30.52 武汉 116.46 39.92 北京
    redisTemplate.opsForGeo().add(key, new Point(longitude, latitude), member);
}

b. geopos 获取坐标

上面添加一组坐标 + 地理位置到redis中,如果我们想知道某个位置的坐标,则可以借助geopos来获取

/**
 * 获取某个地方的坐标
 *
 * @param key
 * @param member
 * @return
 */
public List<Point> get(String key, String... member) {
    // geopos xhh_pos 武汉
    List<Point> list = redisTemplate.opsForGeo().position(key, member);
    return list;
}

c. geodist 获取距离

计算两个位置之间的距离,比如我已经写入了武汉、北京的经纬度,这个时候希望知道他们两的距离,直接geodist即可

/**
 * 判断两个地点的距离
 *
 * @param key
 * @param source
 * @param dest
 * @return
 */
public Distance distance(String key, String source, String dest) {
    // 可以指定距离单位,默认是米, ft->英尺, mi->英里
    // geodist xhh_pos 武汉 北京 km
    return redisTemplate.opsForGeo().distance(key, source, dest);
}

d. georadius 获取临近元素

georadius 以给定的经纬度为中心, 返回与中心的距离不超过给定最大距离的所有位置元素。

 public void near(String key, double longitude, double latitude) {
    // georadius xhh_pos 114.31 30.52 5km
    Circle circle = new Circle(longitude, latitude, 5 * Metrics.KILOMETERS.getMultiplier());
    RedisGeoCommands.GeoRadiusCommandArgs args = RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs()
            .includeDistance()
            .includeCoordinates()
            .sortAscending().limit(5);
    GeoResults<RedisGeoCommands.GeoLocation<String>> results = redisTemplate.opsForGeo()
            .radius(key, circle, args);
    System.out.println(results);
}

e. georadiusbymember 获取临近元素

和上面的作用差不多,区别在于上面参数是经纬度,这里是位置

public void nearByPlace(String key, String member) {
    // georadiusbymember xhh_pos 武汉 1100 km
    Distance distance = new Distance(5, Metrics.KILOMETERS);
    RedisGeoCommands.GeoRadiusCommandArgs args = RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs()
            .includeDistance()
            .includeCoordinates()
            .sortAscending()
            .limit(5);
    GeoResults<RedisGeoCommands.GeoLocation<String>> results = redisTemplate.opsForGeo()
            .radius(key, member, distance, args);
    System.out.println(results);
}

f. geohash

GeoHash将二维的经纬度转换成字符串,将二维的经纬度转换为一维的字符串,可以方便业务优化;geohash有自己的一套算法,这里不详细展开,有兴趣的小伙伴可以搜索一下

public void geoHash(String key) {
    // geohash xhh_pos 武汉
    List<String> results = redisTemplate.opsForGeo()
            .hash(key, "北京", "上海", "深圳");
    System.out.println(results);
}

3. 小结

geo更适用于地图这种业务场景中,关于这块的业务没怎么接触过,也不太好确定诸如百度地图、高德地图这种是否有在真实业务中采用;如果我们把目标缩小一点,改成一个地下车库的导航,统计所在位置周边的空余车位,位置导航,停车位记录,感觉有点靠谱

注意上面的六个操作命令,没有删除,但如果我们错误的写入了一个数据,难道没法删除么?

  • 使用 zrem key member 执行删除操作,如上面的case中,删除北京的坐标,可以: zrem xhh_pos 北京

为什么可以这么操作?

  • geo的底层存储借助ZSET来实现的,因此zset的操作符都是支持的,geo添加的元素,会通过算法得出一个score,如上面case中的北京,武汉添加之后,zset值为

II. 其他

0. 项目

系列博文

工程源码

14 - 16.redisson分布式锁使用及注意事项

redis使用分布式锁,除了我们自己借助setnx来实现之外,更为推荐的是借助redisson来完成,借助redisson,可以非常方便的使用redis分布锁,但是一个使用姿势不对,将可能导致锁无法释放问题

本文将介绍一下SpringBoot中redisson分布式锁的使用姿势,以及使用不当导致锁无法释放的演示

I. 项目环境

1. pom依赖

本项目借助SpringBoot 2.2.1.RELEASE + maven 3.5.3 + IDEA + redis进行开发

下面是核心的pom.xml(源码可以再文末获取)

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.redisson/redisson-spring-boot-starter -->
    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson-spring-boot-starter</artifactId>
        <version>3.15.0</version>
    </dependency>
</dependencies>

2. 配置文件

redis的配置,我们这里采用默认的配置,本机启动一个redis实例

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password:

II. 分布式锁

1. 使用姿势

核心类就是获取一个RedissonClient实例,然后借助它来获取锁

@Configuration
public class RedissonConfig {
    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port}")
    private String port;

    @Value("${spring.redis.password:}")
    private String password;

    @Bean
    public RedissonClient redissonClient() {

        Config config = new Config();
        //单节点
        config.useSingleServer().setAddress("redis://" + host + ":" + port);
        if (StringUtils.isEmpty(password)) {
            config.useSingleServer().setPassword(null);
        } else {
            config.useSingleServer().setPassword(password);
        }

        //添加主从配置
        // config.useMasterSlaveServers().setMasterAddress("").setPassword("").addSlaveAddress(new String[]{"",""});

        // 集群模式配置 setScanInterval()扫描间隔时间,单位是毫秒, //可以用"rediss://"来启用SSL连接
        // config.useClusterServers().setScanInterval(2000).addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001").addNodeAddress("redis://127.0.0.1:7002");
        return Redisson.create(config);
    }
}

一种非阻塞的使用方式形如

lock.tryLock();
try {
  // ...
} finally {
  lock.unlock();
}

这里没有显示设置锁的失效时间,默认持有锁30s,且由watch dog(看门狗)每隔10s续期一波,这里的自动续期,请重点关注,后面会说明因为它导致的锁无法释放

手动设置失效时间: tryLock(time, TimeUnit)

  • 当指定失效时间时,将没有看门狗的自动续期逻辑

一个具体的分布式锁使用姿势如下

private void lockReIn(int id) {
    RLock rLock = redissonClient.getLock("lock_prefix_" + id);
    if (rLock.tryLock()) {
        try {
            System.out.println("------- 执行业务逻辑 --------" + Thread.currentThread());
            Thread.sleep(100);
            System.out.println("------- 执行完毕 ----------" + Thread.currentThread());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            rLock.unlock();
        }
    } else {
        System.out.println("get lock failed for " + Thread.currentThread());
    }
}

如果希望阻塞方式获取分布式锁时,使用RLock#lock()来替换RLock#tryLock()

2. 锁无法释放场景

重点关注下上面的自动续期方式,当我们使用姿势不对的时候,可能导致锁无法释放,那么什么样的场景会导致这个问题呢?

  • 主动释放锁异常失败
  • watch dog 一直存活,不断的续期

我们借助线程池来演示这个场景

// 固定线程池,且线程不会被回收,导致时而能重入获取锁,时而不行
ExecutorService executorService = Executors.newFixedThreadPool(2, new NamedThreadFactory("fixed-"));

// 普通线程池,空闲线程会被回收,这样就会导致将不会有其他业务方能获取到锁
ExecutorService customExecutorService = new ThreadPoolExecutor(0, 1,
        1L, TimeUnit.MICROSECONDS,
        new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("custom-"));

private void unLockFailed(int id) {
    RLock rLock = redissonClient.getLock("lock_prefix_" + id);
    if (rLock.tryLock()) {
        try {
            System.out.println("------- 执行业务逻辑 " + id + " --------" + Thread.currentThread());
        } finally {
            // 模拟释放锁失败
            System.out.println(1 / 0);
            rLock.unlock();
        }
    } else {
        System.out.println("get lock failed for " + Thread.currentThread());
    }
}

public void testLock() {
    executorService.submit(new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println("threadId fix : " + Thread.currentThread().getId());
                unLockFailed(2);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    });

    customExecutorService.submit(new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println("threadId custom : " + Thread.currentThread().getId());
                unLockFailed(3);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    });
}

在使用锁的业务逻辑中,释放锁时模拟了释放失败的case

两个线程池

  • 一个固定大小的线程池(线程不会被回收):再次访问时,之前持有锁的线程依然可以获取锁;另外一个不行
  • 一个普通的线程池(线程会被回收):将没有线程能持有锁

上面这种case最主要的问题在于redissonClient作为单实例,这个实例不回收,看门狗的续期任务也不会取消;因此即便持有锁的业务逻辑走完了,抛异常了,但是续期任务没有感知,依然在默默的执行,从而导致分布式锁一直无法释放,直到redissonClient实例销毁

小结

  • RedissonClient公用时,主动释放锁失败,但是注意看门狗的任务不注销,分布式锁一直续期,从而导致分布式锁无法有效释放

II. 其他

0. 项目

15 - 17.缓存注解@Cacheable @CacheEvit @CachePut使用姿势介绍

Spring在3.1版本,就提供了一条基于注解的缓存策略,实际使用起来还是很丝滑的,本文将针对几个常用的注解进行简单的介绍说明,有需要的小伙伴可以尝试一下

本文主要知识点:

  • @Cacheable: 缓存存在,则使用缓存;不存在,则执行方法,并将结果塞入缓存
  • @CacheEvit: 失效缓存
  • @CachePut: 更新缓存

I. 项目环境

1. 项目依赖

本项目借助SpringBoot 2.2.1.RELEASE + maven 3.5.3 + IDEA + redis5.0进行开发

开一个web服务用于测试

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
</dependencies>

全程使用默认配置,redis本机,端口6379,无密码

II. 缓存注解介绍

1. @Cacheable

这个注解用于修饰方法or类,当我们访问它修饰的方法时,优先从缓存中获取,若缓存中存在,则直接获取缓存的值;缓存不存在时,执行方法,并将结果写入缓存

这个注解,有两个比较核心的设置

	/**
	 * 与 cacheNames 效果等价
	 */
	@AliasFor("cacheNames")
	String[] value() default {};

	
	@AliasFor("value")
	String[] cacheNames() default {};

	/**
	 * 缓存key
	 */
	String key() default "";

cacheNames可以理解为缓存key的前缀,可以为组件缓存的key变量;当key不设置时,使用方法参数来初始化,注意key为SpEL表达式,因此如果要写字符串时,用单引号括起来

一个简单的使用姿势

/**
 * 首先从缓存中查,查到之后,直接返回缓存数据;否则执行方法,并将结果缓存
 * <p>
 * redisKey: cacheNames + key 组合而成 --> 支持SpEL
 * redisValue: 返回结果
 *
 * @param name
 * @return
 */
@Cacheable(cacheNames = "say", key = "'p_'+ #name")
public String sayHello(String name) {
    return "hello+" + name + "-->" + UUID.randomUUID().toString();
}

如我们传参为 yihuihui, 那么缓存key为 say::p_yihuihui

除了上面三个配置值之外,查看@Cacheable注解源码的童鞋可以看到还有condition设置,这个表示当它设置的条件达成时,才写入缓存

/**
 * 满足condition条件的才写入缓存
 *
 * @param age
 * @return
 */
@Cacheable(cacheNames = "condition", key = "#age", condition = "#age % 2 == 0")
public String setByCondition(int age) {
    return "condition:" + age + "-->" + UUID.randomUUID().toString();
}

上面这个case中,age为偶数的时候,才走缓存;否则不写缓存

接下来是unless参数,从名字上可以看出它表示不满足条件时才写入缓存

/**
 * unless, 不满足条件才写入缓存
 *
 * @param age
 * @return
 */
@Cacheable(cacheNames = "unless", key = "#age", unless = "#age % 2 == 0")
public String setUnless(int age) {
    return "unless:" + age + "-->" + UUID.randomUUID().toString();
}

2. @CachePut

不管缓存有没有,都将方法的返回结果写入缓存;适用于缓存更新

/**
 * 不管缓存有没有,都写入缓存
 *
 * @param age
 * @return
 */
@CachePut(cacheNames = "t4", key = "#age")
public String cachePut(int age) {
    return "t4:" + age + "-->" + UUID.randomUUID().toString();
}

3. @CacheEvict

这个就是我们理解的删除缓存

/**
 * 失效缓存
 *
 * @param name
 * @return
 */
@CacheEvict(cacheNames = "say", key = "'p_'+ #name")
public String evict(String name) {
    return "evict+" + name + "-->" + UUID.randomUUID().toString();
}

4. @Caching

在实际的工作中,经常会遇到一个数据变动,更新多个缓存的场景,对于这个场景,可以通过@Caching来实现

/**
 * caching实现组合,添加缓存,并失效其他的缓存
 *
 * @param age
 * @return
 */
@Caching(cacheable = @Cacheable(cacheNames = "caching", key = "#age"), evict = @CacheEvict(cacheNames = "t4", key = "#age"))
public String caching(int age) {
    return "caching: " + age + "-->" + UUID.randomUUID().toString();
}

上面这个就是组合操作

  • caching::age缓存取数据,不存在时执行方法并写入缓存;
  • 失效缓存 t4::age

5. 异常时,缓存会怎样?

上面的几个case,都是正常的场景,当方法抛出异常时,这个缓存表现会怎样?

/**
 * 用于测试异常时,是否会写入缓存
 *
 * @param age
 * @return
 */
@Cacheable(cacheNames = "exception", key = "#age")
@Cacheable(cacheNames = "say", key = "'p_yihuihui'")
public int exception(int age) {
    return 10 / age;
}

根据实测结果,当age==0时,上面两个缓存都不会成功

6. 测试用例

接下来验证下缓存注解与上面描述的是否一致

@RestController
public class IndexRest {
    @Autowired
    private BasicDemo helloService;

    @GetMapping(path = {"", "/"})
    public String hello(String name) {
        return helloService.sayHello(name);
    }
}

上面这个主要是验证@Cacheable注解,若缓存不命中,每次返回的结果应该都不一样,然而实际访问时,会发现返回的都是相同的

curl http://localhost:8080/?name=yihuihui

失效缓存

@GetMapping(path = "evict")
public String evict(String name) {
    return helloService.evict(String.valueOf(name));
}

失效缓存,需要和上面的case配合起来使用

curl http://localhost:8080/evict?name=yihuihui
curl http://localhost:8080/?name=yihuihui

剩下其他的相关测试类就比较好理解了,一并贴出对应的代码

@GetMapping(path = "condition")
public String t1(int age) {
    return helloService.setByCondition(age);
}

@GetMapping(path = "unless")
public String t2(int age) {
    return helloService.setUnless(age);
}

@GetMapping(path = "exception")
public String exception(int age) {
    try {
        return String.valueOf(helloService.exception(age));
    } catch (Exception e) {
        return e.getMessage();
    }
}

@GetMapping(path = "cachePut")
public String cachePut(int age) {
    return helloService.cachePut(age);
}

7. 小结

最后管理小结一下Spring提供的几个缓存注解

  • @Cacheable: 缓存存在,则从缓存取;否则执行方法,并将返回结果写入缓存
  • @CacheEvit: 失效缓存
  • @CachePut: 更新缓存
  • @Caching: 都注解组合

上面虽说可以满足常见的缓存使用场景,但是有一个非常重要的点没有说明,缓存失效时间应该怎么设置???

如何给每个缓存设置不同的缓存失效时间,咱么下篇博文见,我是一灰灰,欢迎关注长草的公众号一灰灰blog

III. 不能错过的源码和相关知识点

0. 项目

16 - 18.缓存注解@Cacheable之自定义key策略及缓存失效时间指定

上一篇博文介绍了Spring中缓存注解@Cacheable @CacheEvit @CachePut的基本使用,接下来我们将看一下更高级一点的知识点

  • key生成策略
  • 超时时间指定

I. 项目环境

1. 项目依赖

本项目借助SpringBoot 2.2.1.RELEASE + maven 3.5.3 + IDEA + redis5.0进行开发

开一个web服务用于测试

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
</dependencies>

II. 扩展知识点

1. key生成策略

对于@Cacheable注解,有两个参数用于组装缓存的key

  • cacheNames/value: 类似于缓存前缀
  • key: SpEL表达式,通常根据传参来生成最终的缓存key

默认的redisKey = cacheNames::key (注意中间的两个冒号)

/**
 * 没有指定key时,采用默认策略 {@link org.springframework.cache.interceptor.SimpleKeyGenerator } 生成key
 * <p>
 * 对应的key为: k1::id
 * value --> 等同于 cacheNames
 * @param id
 * @return
 */
@Cacheable(value = "k1")
public String key1(int id) {
    return "defaultKey:" + id;
}

缓存key默认采用SimpleKeyGenerator来生成,比如上面的调用,如果id=1, 那么对应的缓存key为 k1::1

如果没有参数,或者多个参数呢?

/**
 * redis_key :  k2::SimpleKey[]
 *
 * @return
 */
@Cacheable(value = "k0")
public String key0() {
    return "key0";
}

/**
 * redis_key :  k2::SimpleKey[id,id2]
 *
 * @param id
 * @param id2
 * @return
 */
@Cacheable(value = "k2")
public String key2(Integer id, Integer id2) {
    return "key1" + id + "_" + id2;
}


@Cacheable(value = "k3")
public String key3(Map map) {
    return "key3" + map;
}

然后写一个测试case

@RestController
@RequestMapping(path = "extend")
public class ExtendRest {
    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    private ExtendDemo extendDemo;

    @GetMapping(path = "default")
    public Map<String, Object> key(int id) {
        Map<String, Object> res = new HashMap<>();
        res.put("key0", extendDemo.key0());
        res.put("key1", extendDemo.key1(id));
        res.put("key2", extendDemo.key2(id, id));
        res.put("key3", extendDemo.key3(res));

        // 这里将缓存key都捞出来
        Set<String> keys = (Set<String>) redisTemplate.execute((RedisCallback<Set<String>>) connection -> {
            Set<byte[]> sets = connection.keys("k*".getBytes());
            Set<String> ans = new HashSet<>();
            for (byte[] b : sets) {
                ans.add(new String(b));
            }
            return ans;
        });

        res.put("keys", keys);
        return res;
    }
}

访问之后,输出结果如下

{
    "key1": "defaultKey:1",
    "key2": "key11_1",
    "key0": "key0",
    "key3": "key3{key1=defaultKey:1, key2=key11_1, key0=key0}",
    "keys": [
        "k2::SimpleKey [1,1]",
        "k1::1",
        "k3::{key1=defaultKey:1, key2=key11_1, key0=key0}",
        "k0::SimpleKey []"
    ]
}

小结一下

  • 单参数:cacheNames::arg
  • 无参数: cacheNames::SimpleKey [], 后面使用 SimpleKey []来补齐
  • 多参数: cacheNames::SimpleKey [arg1, arg2...]
  • 非基础对象:cacheNames::obj.toString()

2. 自定义key生成策略

如果希望使用自定义的key生成策略,只需继承KeyGenerator,并声明为一个bean

@Component("selfKeyGenerate")
public static class SelfKeyGenerate implements KeyGenerator {
    @Override
    public Object generate(Object target, Method method, Object... params) {
        return target.getClass().getSimpleName() + "#" + method.getName() + "(" + JSON.toJSONString(params) + ")";
    }
}

然后在使用的地方,利用注解中的keyGenerator来指定key生成策略

/**
 * 对应的redisKey 为: get  vv::ExtendDemo#selfKey([id])
 *
 * @param id
 * @return
 */
@Cacheable(value = "vv", keyGenerator = "selfKeyGenerate")
public String selfKey(int id) {
    return "selfKey:" + id + " --> " + UUID.randomUUID().toString();
}

测试用例

@GetMapping(path = "self")
public Map<String, Object> self(int id) {
    Map<String, Object> res = new HashMap<>();
    res.put("self", extendDemo.selfKey(id));
    Set<String> keys = (Set<String>) redisTemplate.execute((RedisCallback<Set<String>>) connection -> {
        Set<byte[]> sets = connection.keys("vv*".getBytes());
        Set<String> ans = new HashSet<>();
        for (byte[] b : sets) {
            ans.add(new String(b));
        }
        return ans;
    });
    res.put("keys", keys);
    return res;
}

缓存key放在了返回结果的keys中,输出如下,和预期的一致

{
    "keys": [
        "vv::ExtendDemo#selfKey([1])"
    ],
    "self": "selfKey:1 --> f5f8aa2a-0823-42ee-99ec-2c40fb0b9338"
}

3. 缓存失效时间

以上所有的缓存都没有设置失效时间,实际的业务场景中,不设置失效时间的场景有;但更多的都需要设置一个ttl,对于Spring的缓存注解,原生没有额外提供一个指定ttl的配置,如果我们希望指定ttl,可以通过RedisCacheManager来完成

private RedisCacheConfiguration getRedisCacheConfigurationWithTtl(Integer seconds) {
    // 设置 json 序列化
    Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
    ObjectMapper om = new ObjectMapper();
    om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    jackson2JsonRedisSerializer.setObjectMapper(om);

    RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig();
    redisCacheConfiguration = redisCacheConfiguration.serializeValuesWith(
            RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer)).
            // 设置过期时间
            entryTtl(Duration.ofSeconds(seconds));

    return redisCacheConfiguration;
}

上面是一个设置RedisCacheConfiguration的方法,其中有两个点

  • 序列化方式:采用json对缓存内容进行序列化
  • 失效时间:根据传参来设置失效时间

如果希望针对特定的key进行定制化的配置的话,可以如下操作

private Map<String, RedisCacheConfiguration> getRedisCacheConfigurationMap() {
    Map<String, RedisCacheConfiguration> redisCacheConfigurationMap = new HashMap<>(8);
    // 自定义设置缓存时间
    // 这个k0 表示的是缓存注解中的 cacheNames/value
    redisCacheConfigurationMap.put("k0", this.getRedisCacheConfigurationWithTtl(60 * 60));
    return redisCacheConfigurationMap;
}

最后就是定义我们需要的RedisCacheManager

@Bean
public RedisCacheManager cacheManager(RedisConnectionFactory redisConnectionFactory) {
    return new RedisCacheManager(
            RedisCacheWriter.nonLockingRedisCacheWriter(redisConnectionFactory),
            // 默认策略,未配置的 key 会使用这个
            this.getRedisCacheConfigurationWithTtl(60),
            // 指定 key 策略
            this.getRedisCacheConfigurationMap()
    );
}

在前面的测试case基础上,添加返回ttl的信息

private Object getTtl(String key) {
    return redisTemplate.execute(new RedisCallback() {
        @Override
        public Object doInRedis(RedisConnection connection) throws DataAccessException {
            return connection.ttl(key.getBytes());
        }
    });
}

@GetMapping(path = "default")
public Map<String, Object> key(int id) {
    Map<String, Object> res = new HashMap<>();
    res.put("key0", extendDemo.key0());
    res.put("key1", extendDemo.key1(id));
    res.put("key2", extendDemo.key2(id, id));
    res.put("key3", extendDemo.key3(res));

    Set<String> keys = (Set<String>) redisTemplate.execute((RedisCallback<Set<String>>) connection -> {
        Set<byte[]> sets = connection.keys("k*".getBytes());
        Set<String> ans = new HashSet<>();
        for (byte[] b : sets) {
            ans.add(new String(b));
        }
        return ans;
    });

    res.put("keys", keys);

    Map<String, Object> ttl = new HashMap<>(8);
    for (String key : keys) {
        ttl.put(key, getTtl(key));
    }
    res.put("ttl", ttl);
    return res;
}

返回结果如下,注意返回的ttl失效时间

4. 自定义失效时间扩展

虽然上面可以实现失效时间指定,但是用起来依然不是很爽,要么是全局设置为统一的失效时间;要么就是在代码里面硬编码指定,失效时间与缓存定义的地方隔离,这就很不直观了

接下来介绍一种,直接在注解中,设置失效时间的case

如下面的使用case

/**
 * 通过自定义的RedisCacheManager, 对value进行解析,=后面的表示失效时间
 * @param key
 * @return
 */
@Cacheable(value = "ttl=30")
public String ttl(String key) {
    return "k_" + key;
}

自定义的策略如下:

  • value中,等号左边的为cacheName, 等号右边的为失效时间

要实现这个逻辑,可以扩展一个自定义的RedisCacheManager,如

public class TtlRedisCacheManager extends RedisCacheManager {
    public TtlRedisCacheManager(RedisCacheWriter cacheWriter, RedisCacheConfiguration defaultCacheConfiguration) {
        super(cacheWriter, defaultCacheConfiguration);
    }

    @Override
    protected RedisCache createRedisCache(String name, RedisCacheConfiguration cacheConfig) {
        String[] cells = StringUtils.delimitedListToStringArray(name, "=");
        name = cells[0];
        if (cells.length > 1) {
            long ttl = Long.parseLong(cells[1]);
            // 根据传参设置缓存失效时间
            cacheConfig = cacheConfig.entryTtl(Duration.ofSeconds(ttl));
        }
        return super.createRedisCache(name, cacheConfig);
    }
}

重写createRedisCache逻辑, 根据name解析出失效时间;

注册使用方式与上面一致,声明为Spring的bean对象

@Primary
@Bean
public RedisCacheManager ttlCacheManager(RedisConnectionFactory redisConnectionFactory) {
    return new TtlRedisCacheManager(RedisCacheWriter.lockingRedisCacheWriter(redisConnectionFactory),
            // 默认缓存配置
            this.getRedisCacheConfigurationWithTtl(60));
}

测试case如下

@GetMapping(path = "ttl")
public Map ttl(String k) {
    Map<String, Object> res = new HashMap<>();
    res.put("execute", extendDemo.ttl(k));
    res.put("ttl", getTtl("ttl::" + k));
    return res;
}

验证结果如下

5. 小结

到此基本上将Spring中缓存注解的常用姿势都介绍了一下,无论是几个注解的使用case,还是自定义的key策略,失效时间指定,单纯从使用的角度来看,基本能满足我们的日常需求场景

下面是针对缓存注解的一个知识点抽象

缓存注解

  • @Cacheable: 缓存存在,则从缓存取;否则执行方法,并将返回结果写入缓存
  • @CacheEvit: 失效缓存
  • @CachePut: 更新缓存
  • @Caching: 都注解组合

配置参数

  • cacheNames/value: 可以理解为缓存前缀
  • key: 可以理解为缓存key的变量,支持SpEL表达式
  • keyGenerator: key组装策略
  • condition/unless: 缓存是否可用的条件

默认缓存ke策略y

下面的cacheNames为注解中定义的缓存前缀,两个分号固定

  • 单参数:cacheNames::arg
  • 无参数: cacheNames::SimpleKey [], 后面使用 SimpleKey []来补齐
  • 多参数: cacheNames::SimpleKey [arg1, arg2...]
  • 非基础对象:cacheNames::obj.toString()

缓存失效时间

失效时间,本文介绍了两种方式,一个是集中式的配置,通过设置RedisCacheConfiguration来指定ttl时间

另外一个是扩展RedisCacheManager类,实现自定义的cacheNames扩展解析

Spring缓存注解知识点到此告一段落,我是一灰灰,欢迎关注长草的公众号一灰灰blog

III. 不能错过的源码和相关知识点

0. 项目

系列博文

源码