Redis


这篇Redis笔记是基于 狂神说视频的总结,仅供个人学习和复习用。

前提(重要)

开启安全组端口

1.如果是阿里云服务器需要开启安全组端口,你连接redis的端口。

2.服务器防火墙记得开启并设置ip访问端口。参考https://zskyz233333.github.io/ServerBuilding.html

安全

如果是使用服务器安装Redis千万要记住先做好安全措施避免被入侵挖矿!这是一个惨痛的教训

记住在安装之前先阅读这篇文章Redis的安全配置,将有助于使你的Redis更加安全。

Redis 安装

如果是Fedora系统的请参考这篇安装https://developer.fedoraproject.org/tech/database/redis/about.html

下载

Redis官方推荐在Linux上安装,此次教程也是在服务器中安装。

下载地址:https://redis.io/

将安装包拖拽进服务器个人目录

解压

首先将程序移动到 /usr/local/目录然后解压安装

cd home/zsky233333/
ls
mv redis-6.0.6.tar.gz  /usr/local/
cd /usr/local/
ls
tar -zxvf redis-6.0.6.tar.gz 

基本环境安装

1.安装gcc环境 redis6.0版本以上有点小问题可参考文章: https://blog.csdn.net/weixin_44519874/article/details/112285537

[root@zsky233333 redis-6.0.6]# gcc -v   # 查看gcc版本
[root@zsky233333 redis-6.0.6]# yum -y install centos-release-scl  # 升级到9.1版本
[root@zsky233333 redis-6.0.6]# yum -y install devtoolset-9-gcc devtoolset-9-gcc-c++ devtoolset-9-binutils
[root@zsky233333 redis-6.0.6]# scl enable devtoolset-9 bash

2.执行make命令配置文件

[root@zsky233333 redis-6.0.6]# make install 

成功如下

可以进入/usr/local/bin目录查看

3.将redis.cofig文件复制到bin目录下

[root@zsky233333 local]# cd bin
[root@zsky233333 bin]# mkdir redisconfig
[root@zsky233333 bin]# cp /usr/local/redis-6.0.6/redis.conf  redisconfig

修改配置文件

redis默认不是后台启动的,需要修改配置文件!

[root@zsky233333 redisconfig]# vim redis.conf 

绑定 Redis 监听的网络接口,如果服务器有多个 IP,可限定 Redis server 监听的 IP;通过 Redis 配置项 bind,可同时绑定多个 IP。

bind 127.0.0.1 192.168.13.12

程序运行尽量避免使用熟知端口,降低被初级扫描的风险。

port 6666

开启 Redis 密码认证,并设置高复杂度密码,找到注释#requirepass 换行输入:

requirepass 你的密码

启动Redis服务

切换到 /usr/local/bin目录下

[root@zsky233333 bin]# redis-server redisconfig/redis.conf 
[root@zsky233333 bin]# redis-cli -p 6379

查看redis的进程

[root@zsky233333 bin]# ps -ef|grep redis

关闭Redis服务

shutdown
exit

五大数据类型(常用)

命令查找地址

redis命令中文文档:http://www.redis.cn/commands.html#string

Redis-Key

exists key:判断键是否存在
del key:删除键值对
move key db:将键值对移动到指定数据库
expire key second:设置键值对的过期时间
type key:查看value的数据类型
127.0.0.1:6379> ping #测试连接redis
PONG  #表示连接成功
127.0.0.1:6379> FLUSHALL  #清空所有数据库(默认16个数据库0-15)
OK
127.0.0.1:6379> keys * #查看所有的key
(empty array)
127.0.0.1:6379> set name zskyz # 设置key
OK
127.0.0.1:6379> set age 18
OK
127.0.0.1:6379> keys *
1) "name"
2) "age"
127.0.0.1:6379> EXISTS name #判断key是否存在
(integer) 1
127.0.0.1:6379> EXISTS sex 
(integer) 0 # 1表示存在 0:表示不存在
127.0.0.1:6379> MOVE age 1 #将age移到数据库1
(integer) 1
127.0.0.1:6379> keys *
1) "name"
127.0.0.1:6379> select 1 #转到数据库1
OK
127.0.0.1:6379[1]> keys *
1) "age"
127.0.0.1:6379[1]> select 0
OK
127.0.0.1:6379> set age 1
OK
127.0.0.1:6379> keys *
1) "name"
2) "age"
127.0.0.1:6379> get name # 获取key的值
"zskyz"
127.0.0.1:6379> get age
"1"
127.0.0.1:6379> EXPIRE name 10 #设置10秒后删除key
(integer) 1
127.0.0.1:6379> ttl name #查看key剩余时间
(integer) 1
127.0.0.1:6379> ttl name
(integer) -2 #-2:表示已经删除
127.0.0.1:6379> keys *
1) "age"
127.0.0.1:6379> del age #删除key
(integer) 1
127.0.0.1:6379> keys *
(empty array)
127.0.0.1:6379> select 1
OK
127.0.0.1:6379[1]> keys *
1) "age"
127.0.0.1:6379[1]> flushdb #清空当前数据库
OK
127.0.0.1:6379[1]> keys *
(empty array)
127.0.0.1:6379[1]>clear #与linux命令相同

关于TTL命令Redis的key,通过TTL命令返回key的过期时间,一般来说有3种:

  • 当前key没有设置过期时间,所以会返回-1.
  • 当前key有设置过期时间,而且key已经过期,所以会返回-2.
  • 当前key有设置过期时间,且key还没有过期,故会返回key的正常剩余时间.

String

APPEND key value  #追加字符串
STRLEN key #查询字符串长度
INCR key # 给对应key值加一 (可用于浏览量增加)
DECR key #使对应key值减一
SETEX key seconds value # 设置带过期时间的值
MSET key value [key value ...] # 批量设置值 Mset k1 "v1" k2 "v2"

List

LPUSH key value #将一个值插入列表头部
RPUSH key value #将一个值插入列表尾部
LRANGE key 0 -1 #获取list 全部值
LRANGE key 0 1 # 通过区间获取具体的值
LPOP key #从头部移除值
RPOP key # 从尾部移除值
LINDEX key number #通过下表获得值
LLEN key #返回列表长度
LSET key index element # 替换列表下下表的元素 前提是列表存在且下表存在
LINSERT key BEFORE|AFTER pivot value #在指定列表元素的前/后 插入value
LTRIM key start end #通过下标截取指定范围内的列表

小结

  • list实际上是一个链表,before Node after , left, right 都可以插入值
  • 如果key不存在,则创建新的链表
  • 如果key存在,新增内容
  • 如果移除了所有值,空链表,也代表不存在
  • 在两边插入或者改动值,效率最高!修改中间元素,效率相对较低

应用:

  • 消息排队!消息队列(Lpush Rpop),栈(Lpush Lpop)

Set

Redis的Set是string类型的无序集合。集合成员是唯一的,这就意味着集合中不能出现重复的数据。

Redis中集合是通过哈希表实现的,所以添加,删除,查找的复杂度都是O(1)。
集合中最大的成员数为 232 - 1 (4294967295, 每个集合可存储40多亿个成员)。

SADD key member1[member2..] #向集合中无序增加一个/多个成员
SREM key member1[member2..] #移除集合中一个/多个成员
SCARD key #获取集合的成员数
SMEMBERS key #返回集合中所有的成员
SISMEMBER key member #查询member元素是否是集合的成员,结果是无序的
SRANDMEMBER key [count] #随机返回集合中count个成员,count缺省值为1
SPOP key [count] #随机移除并返回集合中count个成员,count缺省值为1
SMOVE source destination member #将source集合的成员member移动到destination集合
SDIFF key1[key2..]  #返回所有集合的差集 key1- key2 - …
SDIFFSTORE destination key1[key2..] #在SDIFF的基础上,将结果保存到集合中覆盖。不能保存到其他类型key噢!
SINTER key1 [key2..] #返回所有集合的交集
SINTERSTORE destination key1[key2..] #在SINTER的基础上,存储结果到集合中。覆盖
SUNION key1 [key2..] #返回所有集合的并集
SUNIONSTORE destination key1 [key2..] #在SUNION的基础上,存储结果到集合中。覆盖
SSCAN KEY [MATCH pattern] [COUNT count] #在大量数据环境下,使用此命令遍历集合中元素,每次遍历部分

Hash

Redis hash 是一个string类型的field和value的映射表,hash特别适合用于存储对象。

Set就是一种简化的Hash,只变动key,而value使用默认值填充。可以将一个Hash表作为一个对象进行存储,表中存放对象的信息。

HSET key field value #将哈希表 key 中的字段 field 的值设为 value 。重复设置同一个field会覆盖,返回0
HMSET key field1 value1 [field2 value2..] #同时将多个 field-value (域-值)对设置到哈希表 key 中。
HSETNX key field value #只有在字段 field 不存在时,设置哈希表字段的值。
HEXISTS key field #查看哈希表 key 中,指定的字段是否存在。
HGET key field value #获取存储在哈希表中指定字段的值
HMGET key field1 [field2..] #获取所有给定字段的值
HGETALL key #获取在哈希表key 的所有字段和值
HKEYS key #获取哈希表key中所有的字段
HLEN key #获取哈希表中字段的数量
HVALS key #获取哈希表中所有值
HDEL key field1 [field2..] #删除哈希表key中一个/多个field字段
HINCRBY key field n #为哈希表 key 中的指定字段的整数值加上增量n,并返回增量后结果 一样只适用于整数型字段
HINCRBYFLOAT key field n #为哈希表 key 中的指定字段的浮点数值加上增量 n
HSCAN key cursor [MATCH pattern] [COUNT count] #迭代哈希表中的键值对

​ Hash变更的数据user name age,尤其是用户信息之类的,经常变动的信息!Hash更适合于对象的存储,Sring更加适合字符串存储!

Zset(有序集合)

不同的是每个元素都会关联一个double类型的分数(score)。redis正是通过分数来为集合中的成员进行从小到大的排序。
score相同:按字典顺序排序

有序集合的成员是唯一的,但分数(score)却可以重复。

ZADD key score member1 [score2 member2] #向有序集合添加一个或多个成员,或者更新已存在成员的分数
ZCARD key #获取有序集合的成员数
ZCOUNT key min max #计算在有序集合中指定区间score的成员数
ZINCRBY key n member #有序集合中对指定成员的分数加上增量 n
ZSCORE key member #返回有序集中,成员的分数值
ZRANK key member #返回有序集合中指定成员的索引
ZRANGE key start end #通过索引区间返回有序集合指定区间内的成员
ZRANGEBYLEX key min max #通过字典区间返回有序集合的成员
ZLEXCOUNT key min max #在有序集合中计算指定字典区间内成员数量
ZREM key member1 [member2..] #移除有序集合中一个/多个成员
ZREMRANGEBYLEX key min max #移除有序集合中给定的字典区间的所有成员
ZREMRANGEBYRANK key start stop #移除有序集合中给定的排名区间的所有成员
ZREMRANGEBYSCORE key min max #移除有序集合中给定的分数区间的所有成员
ZREVRANGE key start end #返回有序集中指定区间内的成员,通过索引,分数从高到底
ZREVRANGEBYSCORRE key max min #返回有序集中指定分数区间内的成员,分数从高到低排序
ZREVRANGEBYLEX key max min #返回有序集中指定字典区间内的成员,按字典顺序倒序
ZREVRANK key member #返回有序集合中指定成员的排名,有序集成员按分数值递减(从大到小)排序
ZINTERSTORE destination numkeys key1 [key2 ..] #计算给定的一个或多个有序集的交集并将结果集存储在新的有序集合 key 中,numkeys:表示参与运算的集合数,将score相加作为结果的score
ZUNIONSTORE destination numkeys key1 [key2..] #计算给定的一个或多个有序集的交集并将结果集存储在新的有序集合 key 中
ZSCAN key cursor [MATCH pattern\] [COUNT count] #迭代有序集合中的元素(包括元素成员和元素分值)

应用案例:

  • set排序 存储班级成绩表 工资表排序!
  • 普通消息,1.重要消息 2.带权重进行判断
  • 排行榜应用实现,取Top N测试

geospatial

  • 有效的经度从-180度到180度。
  • 有效的纬度从-85.05112878度到85.05112878度。
GEOADD key longitude latitude member [longitude latitude member ...] #将指定的地理空间位置(纬度、经度、名称)添加到指定的key中
GEODIST key member1 member2 [unit] #返回两个给定位置之间的距离 指定单位的参数 unit 必须是以下单位的其中一个:m 表示单位为米。km 表示单位为千米。mi 表示单位为英里。ft 表示单位为英尺。
GEOPOS key member [member ...] #从key里返回所有给定位置元素的位置(经度和纬度)。
GEORADIUS key longitude latitude radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] #以给定的经纬度为中心, 返回键包含的位置元素当中, 与中心的距离不超过给定最大距离的所有位置元素
GEORADIUSBYMEMBER key member radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] #这个命令和 GEORADIUS 命令一样, 都可以找出位于指定范围内的元素, 但是 GEORADIUSBYMEMBER 的中心点是由给定的位置元素决定的, 而不是像 GEORADIUS 那样, 使用输入的经度和纬度来决定中心点

hyperloglog

基数统计的算法 :可用与计算网站访问量,计算非重复数据

PFADD key element [element ...] #添加元素
PFCOUNT key [key ...] # 查看个数
PFMERGE destkey sourcekey [sourcekey ...] #将多个 HyperLogLog 合并(merge)为一个 HyperLogLog  不会有重复数据

bitmap

SETBIT key offset value #设置或者清空key的value(字符串)在offset处的bit值。
GETBIT key offset #返回key对应的string在offset处的bit值
BITCOUNT key [start end] #统计字符串被设置为1的bit数.可通过指定额外的 start 或 end 参数,可以让计数只在特定的位上进行。

统计疫情感染人数:0 1 0 1,统计用户信息,活跃,不活跃,打开,365打卡,两个状态的,都可以使用bitmaps来存储。

事务

  • redis没有原子性
  • Redis单条命令保证原子性,但是事务不保证原子性
  • Redis事务本质:一组命令的集合,一个事务中的所有命令都会被序列化,在事务执行过程中,会按照顺序执行
  • 一次性,顺序性,排他性!执行一系列的命令
127.0.0.1:6379> MULTI #开启事务
OK
# 命令入队
127.0.0.1:6379> set key1 1
QUEUED
127.0.0.1:6379> set key2 2
QUEUED
127.0.0.1:6379> get key1 
QUEUED
127.0.0.1:6379> get key2
QUEUED
127.0.0.1:6379> EXEC #执行事务 
1) OK
2) OK
3) "1"
4) "2"
# DISCARD 是取消事务 只能在事务中执行

watch乐观锁

在事务执行前可以添加watch充当redis乐观锁操作。

watch 用来对修改目标参数进行监控,假如在事物正在操作的过程中,最后一次操作完成事物提交的时候,是先将消息放入执行操作队列中,使用watch操作监控所需要修改的参数值是否与提交时候的值发生改变,如果发生改变,则代表该次事物全部提交失败,返回失败信息,所有事物中的操作将需要重新进行!

Jedis

Jedis是Redis官方推荐的java连接工具!使用java操作Redis中间件!

1.导入依赖

<dependencies>
    <!--导入jedis包-->
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>3.3.0</version>
    </dependency>
    <!--fastjson-->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.75</version>
    </dependency>
</dependencies>

2.测试连接

前提打开阿里云服务器安全组端口,将redis.config 文件中默认端口改为0.0.0.0

public class TestPing {
    public static void main(String[] args) {
        //1.new jedis对象
        Jedis jedis = new Jedis("******",6379);

        System.out.println(jedis.ping());
    }
}

事务

public class T {
    public static void main(String[] args) {
        //1.new jedis对象
        Jedis jedis = new Jedis("******",6379);
        Transaction multi = jedis.multi();
        try {
            multi.set("name","zsky");
            multi.exec();//执行事务
        }catch (Exception e){
            multi.discard();//放弃事务
            e.printStackTrace();
        }finally {
            System.out.println(jedis.get("name"));
            jedis.close();//关闭连接
        }

    }
}

SpringBoot 整合

导入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

application.yaml

# 如需要操作数据库连接池 使用lettuce
spring:
  redis:
    host: "*******"
    port: 6379
    password: 你的密码

测试连接

@SpringBootTest
class RedisSpringbootApplicationTests {
    @Autowired
    private RedisTemplate redisTemplate;
    @Test
    void contextLoads() {
        redisTemplate.opsForValue().set("name","zsky");
        redisTemplate.opsForValue().set("name2","你好,redis!");
        System.out.println(redisTemplate.opsForValue().get("name"));
        System.out.println(redisTemplate.opsForValue().get("name2"));
    }

}

RedisConfig

编写一个自己的RedisTemplate

package com.zsky.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {
    //编写自己的redisTemplate
    @Bean
    @SuppressWarnings("all")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        //开发一般使用<String,Object>
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);
        //配置具体Json序列化方式
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        //String的序列化
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        //key采用String的序列化方式
        template.setKeySerializer(stringRedisSerializer);
        //hash的key采用String的序列化方式
        template.setHashKeySerializer(stringRedisSerializer);
        //value序列化方式采用jackson
        template.setValueSerializer(jackson2JsonRedisSerializer);
        //hash的value序列化方式采用jackson
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();

        return template;
    }
}

测试(前提准备好一个User实体类)

@SpringBootTest
class RedisSpringbootApplicationTests {
    @Autowired
    @Qualifier("redisTemplate")
    private RedisTemplate redisTemplate;
    @Test
    void contextLoads() {
        User user = new User("小明", "123456");
        redisTemplate.opsForValue().set("name","zsky");
        redisTemplate.opsForValue().set("name2","你好,redis!");
        redisTemplate.opsForValue().set("user",user);
        System.out.println(redisTemplate.opsForValue().get("name"));
        System.out.println(redisTemplate.opsForValue().get("name2"));
        System.out.println(redisTemplate.opsForValue().get("user"));
    }
}

RedisUtil

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@Component
public final class RedisUtil {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    // =============================common============================
    /**
     * 指定缓存失效时间
     * @param key  键
     * @param time 时间(秒)
     */
    public boolean expire(String key, long time) {
        try {
            if (time > 0) {
                redisTemplate.expire(key, time, TimeUnit.SECONDS);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据key 获取过期时间
     * @param key 键 不能为null
     * @return 时间(秒) 返回0代表为永久有效
     */
    public long getExpire(String key) {
        return redisTemplate.getExpire(key, TimeUnit.SECONDS);
    }


    /**
     * 判断key是否存在
     * @param key 键
     * @return true 存在 false不存在
     */
    public boolean hasKey(String key) {
        try {
            return redisTemplate.hasKey(key);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 删除缓存
     * @param key 可以传一个值 或多个
     */
    @SuppressWarnings("unchecked")
    public void del(String... key) {
        if (key != null && key.length > 0) {
            if (key.length == 1) {
                redisTemplate.delete(key[0]);
            } else {
                redisTemplate.delete(String.valueOf(CollectionUtils.arrayToList(key)));
            }
        }
    }


    // ============================String=============================

    /**
     * 普通缓存获取
     * @param key 键
     * @return 值
     */
    public Object get(String key) {
        return key == null ? null : redisTemplate.opsForValue().get(key);
    }

    /**
     * 普通缓存放入
     * @param key   键
     * @param value 值
     * @return true成功 false失败
     */

    public boolean set(String key, Object value) {
        try {
            redisTemplate.opsForValue().set(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 普通缓存放入并设置时间
     * @param key   键
     * @param value 值
     * @param time  时间(秒) time要大于0 如果time小于等于0 将设置无限期
     * @return true成功 false 失败
     */

    public boolean set(String key, Object value, long time) {
        try {
            if (time > 0) {
                redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
            } else {
                set(key, value);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 递增
     * @param key   键
     * @param delta 要增加几(大于0)
     */
    public long incr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递增因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, delta);
    }


    /**
     * 递减
     * @param key   键
     * @param delta 要减少几(小于0)
     */
    public long decr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递减因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, -delta);
    }


    // ================================Map=================================

    /**
     * HashGet
     * @param key  键 不能为null
     * @param item 项 不能为null
     */
    public Object hget(String key, String item) {
        return redisTemplate.opsForHash().get(key, item);
    }

    /**
     * 获取hashKey对应的所有键值
     * @param key 键
     * @return 对应的多个键值
     */
    public Map<Object, Object> hmget(String key) {
        return redisTemplate.opsForHash().entries(key);
    }

    /**
     * HashSet
     * @param key 键
     * @param map 对应多个键值
     */
    public boolean hmset(String key, Map<String, Object> map) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * HashSet 并设置时间
     * @param key  键
     * @param map  对应多个键值
     * @param time 时间(秒)
     * @return true成功 false失败
     */
    public boolean hmset(String key, Map<String, Object> map, long time) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key   键
     * @param item  项
     * @param value 值
     * @return true 成功 false失败
     */
    public boolean hset(String key, String item, Object value) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key   键
     * @param item  项
     * @param value 值
     * @param time  时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
     * @return true 成功 false失败
     */
    public boolean hset(String key, String item, Object value, long time) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 删除hash表中的值
     *
     * @param key  键 不能为null
     * @param item 项 可以使多个 不能为null
     */
    public void hdel(String key, Object... item) {
        redisTemplate.opsForHash().delete(key, item);
    }


    /**
     * 判断hash表中是否有该项的值
     *
     * @param key  键 不能为null
     * @param item 项 不能为null
     * @return true 存在 false不存在
     */
    public boolean hHasKey(String key, String item) {
        return redisTemplate.opsForHash().hasKey(key, item);
    }


    /**
     * hash递增 如果不存在,就会创建一个 并把新增后的值返回
     *
     * @param key  键
     * @param item 项
     * @param by   要增加几(大于0)
     */
    public double hincr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, by);
    }


    /**
     * hash递减
     *
     * @param key  键
     * @param item 项
     * @param by   要减少记(小于0)
     */
    public double hdecr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, -by);
    }


    // ============================set=============================

    /**
     * 根据key获取Set中的所有值
     * @param key 键
     */
    public Set<Object> sGet(String key) {
        try {
            return redisTemplate.opsForSet().members(key);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    /**
     * 根据value从一个set中查询,是否存在
     *
     * @param key   键
     * @param value 值
     * @return true 存在 false不存在
     */
    public boolean sHasKey(String key, Object value) {
        try {
            return redisTemplate.opsForSet().isMember(key, value);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 将数据放入set缓存
     *
     * @param key    键
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSet(String key, Object... values) {
        try {
            return redisTemplate.opsForSet().add(key, values);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 将set数据放入缓存
     *
     * @param key    键
     * @param time   时间(秒)
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSetAndTime(String key, long time, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().add(key, values);
            if (time > 0)
                expire(key, time);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 获取set缓存的长度
     *
     * @param key 键
     */
    public long sGetSetSize(String key) {
        try {
            return redisTemplate.opsForSet().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 移除值为value的
     *
     * @param key    键
     * @param values 值 可以是多个
     * @return 移除的个数
     */

    public long setRemove(String key, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().remove(key, values);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    // ===============================list=================================

    /**
     * 获取list缓存的内容
     *
     * @param key   键
     * @param start 开始
     * @param end   结束 0 到 -1代表所有值
     */
    public List<Object> lGet(String key, long start, long end) {
        try {
            return redisTemplate.opsForList().range(key, start, end);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    /**
     * 获取list缓存的长度
     *
     * @param key 键
     */
    public long lGetListSize(String key) {
        try {
            return redisTemplate.opsForList().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 通过索引 获取list中的值
     *
     * @param key   键
     * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
     */
    public Object lGetIndex(String key, long index) {
        try {
            return redisTemplate.opsForList().index(key, index);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     */
    public boolean lSet(String key, Object value) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 将list放入缓存
     * @param key   键
     * @param value 值
     * @param time  时间(秒)
     */
    public boolean lSet(String key, Object value, long time) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            if (time > 0)
                expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

    }


    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @return
     */
    public boolean lSet(String key, List<Object> value) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

    }


    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @param time  时间(秒)
     * @return
     */
    public boolean lSet(String key, List<Object> value, long time) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            if (time > 0)
                expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 根据索引修改list中的某条数据
     *
     * @param key   键
     * @param index 索引
     * @param value 值
     * @return
     */

    public boolean lUpdateIndex(String key, long index, Object value) {
        try {
            redisTemplate.opsForList().set(key, index, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 移除N个值为value
     *
     * @param key   键
     * @param count 移除多少个
     * @param value 值
     * @return 移除的个数
     */

    public long lRemove(String key, long count, Object value) {
        try {
            Long remove = redisTemplate.opsForList().remove(key, count, value);
            return remove;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }

    }

}

配置文件分析

bind 127.0.0.1
#绑定的ip
protected-mode yes
#保护模式
port 6379
#端口
#这些配置之后可能会经常使用

daemonize yes 
#以守护线程的方式开启

#日志
debug、verbose、notice、warning
#设置日志等级
loglevel notice

logfile
#设置日志文件位置

database 16
#16个数据库

always-show-logo yes 
#永远显示logo

snapshotting#快照
	三个方法,在规定时间内,执行了多少次操作,则会持久化到文件  .rdb  .aof
	redis是内存数据库,没有持久化,数据就会丢失
	save 900 1  #900秒内,至少有一个key进行了修改,就进行持久化操作
	save 300 10  #。。。。。
	save 60 10000  #同理
	
	stop-writes-on-bgseve-error yes
	#持久化错误之后是否要继续工作,默认开启
	
	rdbcompression yes
	#是否压缩rdb文件,需要消耗cpu资源
	
	rdbchecksum yes
	#保存rdb文件是否要进行错误检查校验
	
	dir 	./
	#rdb文件保存的目录
	
replication #主从复制,需要搭建多个redis


Security #安全设置
requirepass foobared
#默认没有密码
#通过命令config set requirepass 可以设置密码
#auth password   进行登录

########################################################################
127.0.0.1:6379> config get requirepass
1) "requirepass"
2) ""
127.0.0.1:6379> config set requirepass 123
OK
127.0.0.1:6379> ping 
PONG
127.0.0.1:6379> quit
haoyun@HAOYUN ~ % redis-cli         #设置密码操作
127.0.0.1:6379> ping
(error) NOAUTH Authentication required.
127.0.0.1:6379> auth 123
OK
127.0.0.1:6379> ping
PONG
127.0.0.1:6379> 
########################################################################


maxlients 10000 
#设置能连接上redis的最大客户端数量
maxmemory <bytes>
#redis配置最大的内存数
maxmemory-policy noeviction
#内存到达上限之后的处理策略 redis.conf中的默认的过期策略是 volatile-lru
		#移除一些过期的key
		#报错、、、
		#六种机制
		volatile-lru:设置了过期时间的key进行lru移除
		allkeys-lru:删除
		volatile-random:删除即将过期的key
		allkeys-random:随机删除
		volatile-ttl:删除即将过期的
		noeviction:永远不过期,直接报错
		


Append only模式  aof模式
#持久化的两种方式之一RDB、AOF
appendonly no
#默认是不开启的,默认使用RDB持久化,大部分情况下RDB完全够用

appendfilename "appendonly.aof"
#aof持计划文件名

appendfsync always 
#每次修改都会synch 消耗性能
appendfsync everysec 
#每秒执行一次 synch,可能会丢失那1s的数据
appendfsync no
#不执行sync 这时候操作系统自己同步数据,速度是最快的,一般也不用

Redis持久化

Redis是内存数据库,如果不将内存中的数据库状态保存到磁盘,那么一旦服务器进程退出,服务器中的数据库状态也会消失。所以Redis提供了持久化功能。

RDB(Redis DataBase)

Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中,主进程是不进行任何IO操作的。这就确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失。我们默认的就是RDB,一般情况下不需要修改这个配置!

rdb保存的文件是 dump.rdb

触发机制

  • save的规则满足的情况下,会自动触发rdb规则
  • 执行FLUSHALL命令,也会触发rdb规则
  • 退出Redis,与会产生rdb文件

如何恢复rdb文件

1.只需要将rdb文件放在我们Redis启动目录就可以,Redis启动的时候会自动检查dump.rdb恢复其中的数据。

2.查看需要存在的位置

127.0.0.1:6170> CONFIG GET dir

优点

1.适合大规模的数据恢复!

2.对数据的完整性要求不高!

缺点

1.需要一定的时间间隔进程操作!

2.fork进程的时候,会占用一定的内存空间!

AOF(Append Only File)

以日志的形式来记录每个写操作,将Redis执行过的所有指令记录下来(读操作不记录),只许追加文件但不可以改写文件,Redis启动之初会读取该文件重新构建数据,换言之,Redis重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作。

AOF保存的是appendonly.aof文件(默认是不开启的)

优点

1.每一次修改都同步,文件的完整性更好!

2.每秒同步一次,可能会丢失一秒的数据!

3.从不同步,效率最高!

缺点

1.相对于数据文件来说,AOF远远大于rdb,修复的速度也比rdb慢!

2.AOF运行效率也比rdb慢,所以我们Redis默认的配置就是rdb持久化!

Redis发布订阅

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

命令

  • PSUBSCRIBE pattern [pattern..] 订阅一个或多个符合给定模式的频道。
  • PUNSUBSCRIBE pattern [pattern..] 退订一个或多个符合给定模式的频道。
  • PUBSUB subcommand [argument[argument]] 查看订阅与发布系统状态。
  • PUBLISH channel message 向指定频道发布消息
  • SUBSCRIBE channel [channel..] 订阅给定的一个或多个频道。
  • UNSUBSCRIBE channel [channel..] 退订一个或多个频道

代码示例

------------订阅端----------------------
127.0.0.1:6379> SUBSCRIBE zskyz233333 # 订阅zskyz233333频道
Reading messages... (press Ctrl-C to quit) # 等待接收消息
1) "subscribe" # 订阅成功的消息
2) "zskyz233333"
3) (integer) 1
1) "message" # 接收到来自sakura频道的消息 "hello world"
2) "zskyz233333"
3) "hello world"
1) "message" # 接收到来自sakura频道的消息 "hello zskyz233333"
2) "zskyz233333"
3) "hello zskyz233333"
--------------消息发布端-------------------

127.0.0.1:6379> PUBLISH zskyz233333 "hello world" # zskyz233333

(integer) 1

127.0.0.1:6379> PUBLISH zskyz233333 "hello zskyz233333" # 发布消息

(integer) 1


-----------------查看活跃的频道------------

127.0.0.1:6379> PUBSUB channels
"zskyz233333"

原理

Redis是使用C实现的,通过分析 Redis 源码里的 pubsub.c 文件,了解发布和订阅机制的底层实现,籍此加深对 Redis 的理解。

Redis 通过 PUBLISH 、SUBSCRIBE 和 PSUBSCRIBE 等命令实现发布和订阅功能。

每个 Redis 服务器进程都维持着一个表示服务器状态的 redis.h/redisServer 结构, 结构的 pubsub_channels 属性是一个字典, 这个字典就用于保存订阅频道的信息,其中,字典的键为正在被订阅的频道, 而字典的值则是一个链表, 链表中保存了所有订阅这个频道的客户端。

客户端订阅,就被链接到对应频道的链表的尾部,退订则就是将客户端节点从链表中移除。

缺点

如果一个客户端订阅了频道,但自己读取消息的速度却不够快的话,那么不断积压的消息会使redis输出缓冲区的体积变得越来越大,这可能使得redis本身的速度变慢,甚至直接崩溃。
这和数据传输可靠性有关,如果在订阅方断线,那么他将会丢失所有在短线期间发布者发布的消息。

应用

消息订阅:公众号订阅,微博关注等等(起始更多是使用消息队列来进行实现)
多人在线聊天室。
稍微复杂的场景,我们就会使用消息中间件MQ处理。

Redis主从复制

概念

主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器。前者称为主节点(Master/Leader),后者称为从节点(Slave/Follower), 数据的复制是单向的!只能由主节点复制到从节点(主节点以写为主、从节点以读为主)。默认情况下,每台Redis服务器都是主节点,一个主节点可以有0个或者多个从节点,但每个从节点只能由一个主节点。

作用

  • 数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余的方式。
  • 故障恢复:当主节点故障时,从节点可以暂时替代主节点提供服务,是一种服务冗余的方式
  • 负载均衡:在主从复制的基础上,配合读写分离,由主节点进行写操作,从节点进行读操作,分担服务器的负载;尤其是在多读少写的场景下,通过多个从节点分担负载,提高并发量。
  • 高可用基石:主从复制是哨兵和集群能够实施的基础

为什么需要集群

  • 单台服务器难以负载大量的请求
  • 单台服务器故障率高,系统崩坏概率大
  • 单台服务器内存容量有限。

环境配置

只配置从库,不用配置主库!

127.0.0.1:6170> info replication #查看当前库的信息
# Replication
role:master  # 角色 master
connected_slaves:0 # 从机数量
master_replid:8fd942a0c53924e8f4dbb75d75043b79e472314a
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0

搭建集群(单机)

复制3个配置文件,然后修改对应的信息

1.端口

2.pid名字

3.log文件名

4.dump.rdb名字

一主二从配置

默认情况下,每台Redis服务器都是主节点;我们一般情况下只用配置从机就好了!使用SLAVEOF host port就可以为从机配置主机了(这是暂时的,永久的需要去配置文件设置)。

规则

1..从机只能读,不能写,主机可读可写但是多用于写。

2.当主机断电宕机后,默认情况下从机的角色不会发生变化 ,集群中只是失去了写操作,当主机恢复以后,又会连接上从机恢复原状。

3.当从机断电宕机后,若不是使用配置文件配置的从机,再次启动后作为主机是无法获取之前主机的数据的,若此时重新配置称为从机,又可以获取到主机的所有数据。这里就要提到一个同步原理。

4.第二条中提到,默认情况下,主机故障后,不会出现新的主机,有两种方式可以产生新的主机:

  • 从机手动执行命令slaveof no one,这样执行以后从机会独立出来成为一个主机
  • 使用哨兵模式(自动选举)

复制原理

Slave 启动成功连接到 master 后会发送一个sync同步命令

Master 接到命令,启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令,在后台进程执行完毕之后,master将传送整个数据文件到slave,并完成一次完全同步。

全量复制:而slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。

增量复制:Master 继续将新的所有收集到的修改命令依次传给slave,完成同步

但是只要是重新连接master,一次完全同步(全量复制)将被自动执行! 我们的数据一定可以在从机中看到!

哨兵模式

概述

主从切换技术的方法是:当主服务器宕机后,需要手动把一台从服务器切换为主服务器,这就需要人工干预,费事费力,还会造成一段时间内服务不可用。这不是一种推荐的方式,更多时候,我们优先考虑哨兵模式。Redis从2.8开始正式提供了Sentinel(哨兵) 架构来解决这个问题。能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库。哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行。其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例。

哨兵的作用:

  • 通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器。
  • 当哨兵监测到master宕机,会自动将slave切换成master,然后通过发布订阅模式通知其他的从服务器,修改配置文件,让它们切换主机。

然而一个哨兵进程对Redis服务器进行监控,可能会出现问题,为此,我们可以使用多个哨兵进行监控。各个哨兵之间还会进行监控,这样就形成了多哨兵模式。

假设主服务器宕机,哨兵1先检测到这个结果,系统并不会马上进行failover过程,仅仅是哨兵1主观的认为主服务器不可用,这个现象成为主观下线。当后面的哨兵也检测到主服务器不可用,并且数量达到一定值时,那么哨兵之间就会进行一次投票,投票的结果由一个哨兵发起,进行failover[故障转移]操作。切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为客观下线。

测试

1、配置哨兵配置文件 sentinel.conf

# sentinel monitor 被监控的名称 host port 1
sentinel monitor myredis 127.0.0.1 6379 1

后面的这个数字1,代表主机挂了,slave投票看让谁接替成为主机,票数最多的,就会成为主机!

2.启动 redis-sentinel sentinel.conf

优缺点

优点:

  • 哨兵集群,基于主从复制模式,所有主从复制的优点,它都有
  • 主从可以切换,故障可以转移,系统的可用性更好
  • 哨兵模式是主从模式的升级,手动到自动,更加健壮

缺点:

  • Redis不好在线扩容,集群容量一旦达到上限,在线扩容就十分麻烦
  • 实现哨兵模式的配置其实是很麻烦的,里面有很多配置项

哨兵模式的全部配置

# Example sentinel.conf
哨兵sentinel实例运行的端口 默认26379

port 26379


哨兵sentinel的工作目录

dir /tmp


哨兵sentinel监控的redis主节点的 ip port

master-name  可以自己命名的主节点名字 只能由字母A-z、数字0-9 、这三个字符".-_"组成。

quorum 当这些quorum个数sentinel哨兵认为master主节点失联 那么这时 客观上认为主节点失联了

sentinel monitor <master-name> <ip> <redis-port> <quorum>

sentinel monitor mymaster 127.0.0.1 6379 1


当在Redis实例中开启了requirepass foobared 授权密码 这样所有连接Redis实例的客户端都要提供密码

设置哨兵sentinel 连接主从的密码 注意必须为主从设置一样的验证密码

sentinel auth-pass <master-name> <password>

sentinel auth-pass mymaster MySUPER--secret-0123passw0rd


指定多少毫秒之后 主节点没有应答哨兵sentinel 此时 哨兵主观上认为主节点下线 默认30秒

sentinel down-after-milliseconds <master-name> <milliseconds>

sentinel down-after-milliseconds mymaster 30000


这个配置项指定了在发生failover主备切换时最多可以有多少个slave同时对新的master进行 同步,

这个数字越小,完成failover所需的时间就越长,

但是如果这个数字越大,就意味着越 多的slave因为replication而不可用。

可以通过将这个值设为 1 来保证每次只有一个slave 处于不能处理命令请求的状态。


sentinel parallel-syncs <master-name> <numslaves>

sentinel parallel-syncs mymaster 1


故障转移的超时时间 failover-timeout 可以用在以下这些方面:

1. 同一个sentinel对同一个master两次failover之间的间隔时间。

2. 当一个slave从一个错误的master那里同步数据开始计算时间。直到slave被纠正为向正确的master那里同步数据时。

3.当想要取消一个正在进行的failover所需要的时间。

4.当进行failover时,配置所有slaves指向新的master所需的最大时间。不过,即使过了这个超时,slaves依然会被正确配置为指向master,但是就不按parallel-syncs所配置的规则来了

默认三分钟

sentinel failover-timeout <master-name> <milliseconds>

sentinel failover-timeout mymaster 180000


SCRIPTS EXECUTION

配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮件通知相关人员。

对于脚本的运行结果有以下规则:

若脚本执行后返回1,那么该脚本稍后将会被再次执行,重复次数目前默认为10

若脚本执行后返回2,或者比2更高的一个返回值,脚本将不会重复执行。

如果脚本在执行过程中由于收到系统中断信号被终止了,则同返回值为1时的行为相同。

一个脚本的最大执行时间为60s,如果超过这个时间,脚本将会被一个SIGKILL信号终止,之后重新执行。

通知型脚本:当sentinel有任何警告级别的事件发生时(比如说redis实例的主观失效和客观失效等等),将会去调用这个脚本,

这时这个脚本应该通过邮件,SMS等方式去通知系统管理员关于系统不正常运行的信息。调用该脚本时,将传给脚本两个参数,

一个是事件的类型,

一个是事件的描述。

如果sentinel.conf配置文件中配置了这个脚本路径,那么必须保证这个脚本存在于这个路径,并且是可执行的,否则sentinel无法正常启动成功。

通知脚本

sentinel notification-script <master-name> <script-path>

sentinel notification-script mymaster /var/redis/notify.sh


客户端重新配置主节点参数脚本

当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master地址已经发生改变的信息。

以下参数将会在调用脚本时传给脚本:

<master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>

目前<state>总是“failover”,

<role>是“leader”或者“observer”中的一个。

参数 from-ip, from-port, to-ip, to-port是用来和旧的master和新的master(即旧的slave)通信的

这个脚本应该是通用的,能被多次调用,不是针对性的。

sentinel client-reconfig-script <master-name> <script-path>

sentinel client-reconfig-script mymaster /var/redis/reconfig.sh

缓存穿透与雪崩

缓存穿透(即查询不到)

概念

在默认情况下,用户请求数据时,会先在缓存(Redis)中查找,若没找到即缓存未命中,再在数据库中进行查找,数量少可能问题不大,可是一旦大量的请求数据(例如秒杀场景)缓存都没有命中的话,就会全部转移到数据库上,造成数据库极大的压力,就有可能导致数据库崩溃。网络安全中也有人恶意使用这种手段进行攻击被称为洪水攻击。

解决方案

布隆过滤器

对所有可能查询的参数以Hash的形式存储,以便快速确定是否存在这个值,在控制层先进行拦截校验,校验不通过直接打回,减轻了存储系统的压力。

缓存空对象

一次请求若在缓存和数据库中都没找到,就在缓存中方一个空对象用于处理后续这个请求。

这样做有一个缺陷:存储空对象也需要空间,大量的空对象会耗费一定的空间,存储效率并不高。解决这个缺陷的方式就是设置较短过期时间,即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口的不一致,这对于需要保持一致性的业务会有影响。

缓存击穿(即量太大,缓存过期)

概念
相较于缓存穿透,缓存击穿的目的性更强,一个存在的key,在缓存过期的一刻,同时有大量的请求,这些请求都会击穿到DB,造成瞬时DB请求量大、压力骤增。这就是缓存被击穿,只是针对其中某个key的缓存不可用而导致击穿,但是其他的key依然可以使用缓存响应。(比如热搜排行上,一个热点新闻被同时大量访问就可能导致缓存击穿。)

解决方案

1.设置热点数据永不过期

这样就不会出现热点数据过期的情况,但是当Redis内存空间满的时候也会清理部分数据,而且此种方案会占用空间,一旦热点数据多了起来,就会占用部分空间。

2.加互斥锁(分布式锁)

在访问key之前,采用SETNX(set if not exists)来设置另一个短期key来锁住当前key的访问,访问结束再删除该短期key。保证同时刻只有一个线程访问。这样对锁的要求就十分高。

缓存雪崩

概念

大量的key设置了相同的过期时间,导致在缓存在同一时刻全部失效,造成瞬时DB请求量大、压力骤增,引起雪崩。缓存雪崩,是指在某一个时间段,缓存集中过期失效。Redis 宕机!

产生雪崩的原因之一,比如在写本文的时候,马上就要到双十二零点,很快就会迎来一波抢购,这波商品时间比较集中的放入了缓存,假设缓存一个小时。那么到了凌晨一点钟的时候,这批商品的缓存就都过期了。而对这批商品的访问查询,都落到了数据库上,对于数据库而言,就会产生周期性的压力波峰。于是所有的请求都会达到存储层,存储层的调用量会暴增,造成存储层也会挂掉的情况。

其实集中过期,倒不是非常致命,比较致命的缓存雪崩,是缓存服务器某个节点宕机或断网。因为自然形成的缓存雪崩,一定是在某个时间段集中创建缓存,这个时候,数据库也是可以顶住压力的。无非就是对数据库产生周期性的压力而已。而缓存服务节点的宕机,对数据库服务器造成的压力是不可预知的,很有可能瞬间就把数据库压垮。

解决方案

redis高可用:

这个思想的含义是,既然redis有可能挂掉,那我多增设几台redis,这样一台挂掉之后其他的还可以继续工作,其实就是搭建的集群

限流降级:

这个解决方案的思想是,在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某个key只允许一个线程查询数据和写缓存,其他线程等待。

数据预热:

数据加热的含义就是在正式部署之前,我先把可能的数据先预先访问一遍,这样部分可能大量访问的数据就会加载到缓存中。在即将发生大并发访问前手动触发加载缓存不同的key,设置不同的过期时间,让缓存失效的时间点尽量均匀。


文章作者: Sky
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Sky !
评论
  目录