赏花季。ps:不是很会拍照,狗头一波

img

img

01 Redis 是如何执行的

Redis 是如何执行的?

命令执行流程

一条命令的执行过程有很多细节,但大体可分为:客户端先将用户输入的命令,转化为 Redis 相关的通讯协议,再用 socket 连接的方式将内容发送给服务器端,服务器端在接收到相关内容之后,先将内容转化为具体的执行命令,再判断用户授权信息和其他相关信息,当验证通过之后会执行最终命令,命令执行完之后,会进行相关的信息记录和数据统计,然后再把执行结果发送给客户端,这样一条命令的执行流程就结束了。如果是集群模式的话,主节点还会将命令同步至子节点,下面我们一起来看更加具体的执行流程。

image-20240604094617256

步骤一:用户输入一条命令

步骤二:客户端先将命令转换成 Redis 协议,然后再通过 socket 连接发送给服务器端

客户端和服务器端是基于 socket 通信的,服务器端在初始化时会创建了一个 socket 监听,用于监测链接客户端的 socket 链接,源码如下:

1
2
3
4
5
6
7
8
void initServer(void) {
//......
// 开启 Socket 事件监听
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
exit(1);
//......
}

socket 小知识:每个 socket 被创建后,会分配两个缓冲区,输入缓冲区和输出缓冲区。 写入函数并不会立即向网络中传输数据,而是先将数据写入缓冲区中,再由 TCP 协议将数据从缓冲区发送到目标机器。一旦将数据写入到缓冲区,函数就可以成功返回,不管它们有没有到达目标机器,也不管它们何时被发送到网络,这些都是 TCP 协议负责的事情。 注意:数据有可能刚被写入缓冲区就发送到网络,也可能在缓冲区中不断积压,多次写入的数据被一次性发送到网络,这取决于当时的网络情况、当前线程是否空闲等诸多因素,不由程序员控制。 读取函数也是如此,它也是从输入缓冲区中读取数据,而不是直接从网络中读取。

当 socket 成功连接之后,客户端会先把命令转换成 Redis 通讯协议(RESP 协议,REdis Serialization Protocol)发送给服务器端,这个通信协议是为了保障服务器能最快速的理解命令的含义而制定的如果没有这个通讯协议,那么 Redis 服务器端要遍历所有的空格以确认此条命令的含义,这样会加大服务器的运算量,而直接发送通讯协议,相当于把服务器端的解析工作交给了每一个客户端,这样会很大程度的提高 Redis 的运行速度。例如,当我们输入 set key val 命令时,客户端会把这个命令转换为 *3\r\n$3\r\nSET\r\n$4\r\nKEY\r\n$4\r\nVAL\r\n 协议发送给服务器端。 更多通讯协议,可访问官方文档:https://redis.io/topics/protocol

扩展知识:I/O 多路复用

Redis 使用的是 I/O 多路复用功能来监听多 socket 链接的,这样就可以使用一个线程链接来处理多个请求,减少线程切换带来的开销,同时也避免了 I/O 阻塞操作,从而大大提高了 Redis 的运行效率。

综合来说,此步骤的执行流程如下:

  • 与服务器端以 socket 和 I/O 多路复用的技术建立链接;
  • 将命令转换为 Redis 通讯协议,再将这些协议发送至缓冲区。

步骤三:服务器端接收到命令

服务器会先去输入缓冲中读取数据,然后判断数据的大小是否超过了系统设置的值(默认是 1GB),如果大于此值就会返回错误信息,并关闭客户端连接。 默认大小如下图所示:

image-20240604094816486

当数据大小验证通过之后,服务器端会对输入缓冲区中的请求命令进行分析,提取命令请求中包含的命令参数,存储在 client 对象(服务器端会为每个链接创建一个 Client 对象)的属性中。

步骤四:执行前准备

① 判断是否为退出命令,如果是则直接返回;

② 非 null 判断,检查 client 对象是否为 null,如果是返回错误信息;

③ 获取执行命令,根据 client 对象存储的属性信息去 redisCommand 结构中查询执行命令;

④ 用户权限效验,未通过身份验证的客户端只能执行 AUTH(授权) 命令,未通过身份验证的客户端执行了 AUTH 之外的命令则返回错误信息;

⑤ 集群相关操作,如果是集群模式,把命令重定向到目标节点,如果是 master(主节点) 则不需要重定向;

⑥ 检查服务器端最大内存限制,如果服务器端开启了最大内存限制,会先检查内存大小,如果内存超过了最大值会对内存进行回收操作;

⑦ 持久化检测,检查服务器是否开启了持久化和持久化出错停止写入配置,如果开启了此配置并且有持久化失败的情况,禁止执行写命令;

⑧ 集群模式最少从节点(slave)验证,如果是集群模式并且配置了 replminslavestowrite(最小从节点写入),当从节点的数量少于配置项时,禁止执行写命令;

⑨ 只读从节点验证,当此服务器为只读从节点时,只接受 master 的写命令;

⑩ 客户端订阅判断,当客户端正在订阅频道时,只会执行部分命令(只会执行 SUBSCRIBE、PSUBSCRIBE、UNSUBSCRIBE、PUNSUBSCRIBE,其他命令都会被拒绝)。

⑪ 从节点状态效验,当服务器为 slave 并且没有连接 master 时,只会执行状态查询相关的命令,如 info 等;

⑫ 服务器初始化效验,当服务器正在启动时,只会执行 loading 标志的命令,其他的命令都会被拒绝;

⑬ lua 脚本阻塞效验,当服务器因为执行 lua 脚本阻塞时,只会执行部分命令;

⑭ 事务命令效验,如果执行的是事务命令,则开启事务把命令放入等待队列;

⑮ 监视器 (monitor) 判断,如果服务器打开了监视器功能,那么服务器也会把执行命令和相关参数发送给监视器 (监视器是用于监控服务器运行状态的)。

当服务器经过以上操作之后,就可以执行真正的操作命令了。

步骤五:执行最终命令,调用 redisCommand 中的 proc 函数执行命令。

步骤六:执行完后相关记录和统计 ① 检查慢查询是否开启,如果开启会记录慢查询日志; ② 检查统计信息是否开启,如果开启会记录一些统计信息,例如执行命令所耗费时长和计数器(calls)加1; ③ 检查持久化功能是否开启,如果开启则会记录持久化信息; ④ 如果有其它从服务器正在复制当前服务器,则会将刚刚执行的命令传播给其他从服务器。

步骤七:返回结果给客户端 命令执行完之后,服务器会通过 socket 的方式把执行结果发送给客户端,客户端再把结果展示给用户,至此一条命令的执行就结束了。

小结

当用户输入一条命令之后,客户端会以 socket 的方式把数据转换成 Redis 协议,并发送至服务器端,服务器端在接受到数据之后,会先将协议转换为真正的执行命令,在经过各种验证以保证命令能够正确并安全的执行,但验证处理完之后,会调用具体的方法执行此条命令,执行完成之后会进行相关的统计和记录,然后再把执行结果返回给客户端,整个执行流程,如下图所示:

image-20240604094830172

02 Redis 快速搭建与使用

03 Redis 持久化——RDB

Redis 的读写都是在内存中,所以它的性能较高,但在内存中的数据会随着服务器的重启而丢失,为了保证数据不丢失,我们需要将内存中的数据存储到磁盘,以便 Redis 重启时能够从磁盘中恢复原有的数据,而整个过程就叫做 Redis 持久化。

image-20240604095023010

Redis 持久化也是 Redis 和 Memcached 的主要区别之一,因为 Memcached 不具备持久化功能。

1 持久化的几种方式

Redis 持久化拥有以下三种方式:

  • 快照方式(RDB, Redis DataBase)将某一个时刻的内存数据,以二进制的方式写入磁盘;
  • 文件追加方式(AOF, Append Only File),记录所有的操作命令,并以文本的形式追加到文件中;
  • 混合持久化方式,Redis 4.0 之后新增的方式,混合持久化是结合了 RDB 和 AOF 的优点,在写入的时候,先把当前的数据以 RDB 的形式写入文件的开头,再将后续的操作命令以 AOF 的格式存入文件,这样既能保证 Redis 重启时的速度,又能减低数据丢失的风险。

因为每种持久化方案,都有特定的使用场景,让我们先从 RDB 持久化说起吧。

2 RDB简介

RDB(Redis DataBase)是将某一个时刻的内存快照(Snapshot),以二进制的方式写入磁盘的过程。

3 持久化触发

RDB 的持久化触发方式有两类:一类是手动触发,另一类是自动触发。

1)手动触发

手动触发持久化的操作有两个: savebgsave ,它们主要区别体现在:是否阻塞 Redis 主线程的执行。

① save 命令

在客户端中执行 save 命令,就会触发 Redis 的持久化,但同时也是使 Redis 处于阻塞状态,直到 RDB 持久化完成,才会响应其他客户端发来的命令,所以在生产环境一定要慎用

补充:查看redis安装路径

1
2
3
4
5
6
(base) sv@sv-NF5280M5:/home/sv/reid-strong$ ps -ef |  grep redis
redis 1419 1 0 6月03 ? 00:02:00 /usr/bin/redis-server 127.0.0.1:6379
sv 8986 3312 0 6月03 pts/0 00:00:00 redis-cli
sv 21201 21000 0 09:54 pts/11 00:00:00 grep --color=auto redis
(base) sv@sv-NF5280M5:/home/sv/reid-strong$ ls -l /proc/8986/cwd
lrwxrwxrwx 1 sv sv 0 6月 3 20:54 /proc/8986/cwd -> /home/sv

save 命令使用如下:

image-20240604095845958

从图片可以看出,当执行完 save 命令之后,持久化文件 dump.rdb 的修改时间就变了,这就表示 save 成功的触发了 RDB 持久化。 save 命令执行流程,如下图所示:

image-20240604095859770

② bgsave 命令

bgsave(background save)既后台保存的意思, 它和 save 命令最大的区别就是 bgsave 会 fork() 一个子进程来执行持久化,整个过程中只有在 fork() 子进程时有短暂的阻塞,当子进程被创建之后,Redis 的主进程就可以响应其他客户端的请求了,相对于整个流程都阻塞的 save 命令来说,显然 bgsave 命令更适合我们使用。 bgsave 命令使用,如下图所示:

1
2
127.0.0.1:6379> bgsave
Background saving started

bgsave 执行流程,如下图所示:

image-20240604100015348

2)自动触发

说完了 RDB 的手动触发方式,下面来看如何自动触发 RDB 持久化? RDB 自动持久化主要来源于以下几种情况。

① save m n

save m n 是指在 m 秒内,如果有 n 个键发生改变,则自动触发持久化。 参数 m 和 n 可以在 Redis 的配置文件中找到,例如,save 60 1 则表明在 60 秒内,至少有一个键发生改变,就会触发 RDB 持久化。 自动触发持久化,本质是 Redis 通过判断,如果满足设置的触发条件,自动执行一次 bgsave 命令。 注意:当设置多个 save m n 命令时,满足任意一个条件都会触发持久化。 例如,我们设置了以下两个 save m n 命令:

  • save 60 10
  • save 600 1

当 60s 内如果有 10 次 Redis 键值发生改变,就会触发持久化;如果 60s 内 Redis 的键值改变次数少于 10 次,那么 Redis 就会判断 600s 内,Redis 的键值是否至少被修改了一次,如果满足则会触发持久化。

② flushall

flushall 命令用于清空 Redis 数据库,在生产环境下一定慎用,当 Redis 执行了 flushall 命令之后,则会触发自动持久化,把 RDB 文件清空。 执行结果如下图所示:

1
2
3
4
5
6
127.0.0.1:6379> hget myhash k2
"v2"
127.0.0.1:6379> flushall
OK
127.0.0.1:6379> hget myhash k2
(nil)
③ 主从同步触发

在 Redis 主从复制中,当从节点执行全量复制操作时,主节点会执行 bgsave 命令,并将 RDB 文件发送给从节点,该过程会自动触发 Redis 持久化。

4 配置说明

查看文件所在路径

1
2
(base) sv@sv-NF5280M5:/home/sv$ whereis redis.conf
redis: /etc/redis

合理的设置 RDB 的配置,可以保障 Redis 高效且稳定的运行,下面一起来看 RDB 的配置项都有哪些?

RDB 配置参数可以在 Redis 的配置文件中找见,具体内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# RDB 保存的条件
save 900 1
save 300 10
save 60 10000

# bgsave 失败之后,是否停止持久化数据到磁盘,yes 表示停止持久化,no 表示忽略错误继续写文件。
stop-writes-on-bgsave-error yes

# RDB 文件压缩
rdbcompression yes

# 写入文件和读取文件时是否开启 RDB 文件检查,检查是否有无损坏,如果在启动是检查发现损坏,则停止启动。
rdbchecksum yes

# RDB 文件名
dbfilename dump.rdb

# RDB 文件目录
dir ./

其中比较重要的参数如下列表: ① save 参数 它是用来配置触发 RDB 持久化条件的参数,满足保存条件时将会把数据持久化到硬盘。 默认配置说明如下:

  • save 900 1:表示 900 秒内如果至少有 1 个 key 值变化,则把数据持久化到硬盘;
  • save 300 10:表示 300 秒内如果至少有 10 个 key 值变化,则把数据持久化到硬盘;
  • save 60 10000:表示 60 秒内如果至少有 10000 个 key 值变化,则把数据持久化到硬盘。

② rdbcompression 参数 它的默认值是 yes 表示开启 RDB 文件压缩,Redis 会采用 LZF 算法进行压缩。如果不想消耗 CPU 性能来进行文件压缩的话,可以设置为关闭此功能,这样的缺点是需要更多的磁盘空间来保存文件。 ③ rdbchecksum 参数 它的默认值为 yes 表示写入文件和读取文件时是否开启 RDB 文件检查,检查是否有无损坏,如果在启动是检查发现损坏,则停止启动。

5 配置查询

Redis 中可以使用命令查询当前配置参数。查询命令的格式为:config get xxx ,例如,想要获取 RDB 文件的存储名称设置,可以使用 config get dbfilename ,执行效果如下图所示:

1
2
3
127.0.0.1:6379> config get dbfilename
1) "dbfilename"
2) "dump.rdb"

查询 RDB 的文件目录,可使用命令 config get dir ,执行效果如下图所示:

1
2
3
127.0.0.1:6379> config get dir
1) "dir"
2) "/var/lib/redis"

6 配置设置

设置 RDB 的配置,可以通过以下两种方式:

  • 手动修改 Redis 配置文件;
  • 使用命令行设置,例如,使用 config set dir "/usr/data" 就是用于修改 RDB 的存储目录。

注意:手动修改 Redis 配置文件的方式是全局生效的,即重启 Redis 服务器设置参数也不会丢失,而使用命令修改的方式,在 Redis 重启之后就会丢失。但手动修改 Redis 配置文件,想要立即生效需要重启 Redis 服务器,而命令的方式则不需要重启 Redis 服务器。

小贴士:Redis 的配置文件位于 Redis 安装目录的根路径下,默认名称为 redis.conf。

7 RDB 文件恢复

当 Redis 服务器启动时,如果 Redis 根目录存在 RDB 文件 dump.rdb,Redis 就会自动加载 RDB 文件恢复持久化数据。 如果根目录没有 dump.rdb 文件,请先将 dump.rdb 文件移动到 Redis 的根目录。 验证 RDB 文件是否被加载 Redis 在启动时有日志信息,会显示是否加载了 RDB 文件,我们执行 Redis 启动命令:src/redis-server redis.conf ,如下图所示:

8 RDB 优缺点

1)RDB 优点

  • RDB 的内容为二进制的数据,占用内存更小,更紧凑,更适合做为备份文件;
  • RDB 对灾难恢复非常有用,它是一个紧凑的文件,可以更快的传输到远程服务器进行 Redis 服务恢复;
  • RDB 可以更大程度的提高 Redis 的运行速度,因为每次持久化时 Redis 主进程都会 fork() 一个子进程,进行数据持久化到磁盘,Redis 主进程并不会执行磁盘 I/O 等操作;
  • 与 AOF 格式的文件相比,RDB 文件可以更快的重启。

2)RDB 缺点

  • 因为 RDB 只能保存某个时间间隔的数据,如果中途 Redis 服务被意外终止了,则会丢失一段时间内的 Redis 数据;
  • RDB 需要经常 fork() 才能使用子进程将其持久化在磁盘上。如果数据集很大,fork() 可能很耗时,并且如果数据集很大且 CPU 性能不佳,则可能导致 Redis 停止为客户端服务几毫秒甚至一秒钟。

9 禁用持久化

禁用持久化可以提高 Redis 的执行效率,如果对数据丢失不敏感的情况下,可以在连接客户端的情况下,执行 config set save "" 命令即可禁用 Redis 的持久化,如下图所示:

1
2
127.0.0.1:6379> config set save ""
OK

10 小结

【RDB原理,手动触发(save,bgsave),自动触发,文件名,以及简单设置】

通过本文我们可以得知,RDB 持久化分为手动触发和自动触发两种方式,它的优点是存储文件小,Redis 启动 时恢复数据比较快,缺点是有丢失数据的风险。RDB 文件的恢复也很简单,只需要把 RDB 文件放到 Redis 的根目录,在 Redis 启动时就会自动加载并恢复数据。

04 Redis 持久化——AOF

使用 RDB 持久化有一个风险,它可能会造成最新数据丢失的风险。因为 RDB 的持久化有一定的时间间隔,在这个时间段内如果 Redis 服务意外终止的话,就会造成最新的数据全部丢失。

可能会操作 Redis 服务意外终止的条件:

  • 安装 Redis 的机器停止运行,蓝屏或者系统崩溃;
  • 安装 Redis 的机器出现电源故障,例如突然断电;
  • 使用 kill -9 Redis_PID 等。

那么如何解决以上的这些问题呢?Redis 为我们提供了另一种持久化的方案——AOF。

1 简介

AOF(Append Only File)中文是附加到文件,顾名思义 AOF 可以把 Redis 每个键值对操作都记录到文件(appendonly.aof)中。

2 持久化查询和设置

1)查询 AOF 启动状态

使用 config get appendonly 命令,如下图所示:

1
2
3
127.0.0.1:6379> config get appendonly
1) "appendonly"
2) "no"

其中,第一行为 AOF 文件的名称,而最后一行表示 AOF 启动的状态,yes 表示已启动,no 表示未启动。

2)开启 AOF 持久化

Redis 默认是关闭 AOF 持久化的,想要开启 AOF 持久化,有以下两种方式:

  • 通过命令行的方式;
  • 通过修改配置文件的方式(redis.conf)。

下面分别来看以上两种方式的实现。

① 命令行启动 AOF

命令行启动 AOF,使用 config set appendonly yes 命令,如下图所示:

1
2
127.0.0.1:6379> config set appendonly yes
OK

命令行启动 AOF 的优缺点:命令行启动优点是无需重启 Redis 服务,缺点是如果 Redis 服务重启,则之前使用命令行设置的配置就会失效。

② 配置文件启动 AOF

Redis 的配置文件在它的根路径下的 redis.conf 文件中,获取 Redis 的根目录可以使用命令 config get dir 获取,如下图所示:

1
2
3
127.0.0.1:6379> config get dir
1) "dir"
2) "/var/lib/redis"

只需要在配置文件中设置 appendonly yes 即可,默认 appendonly no 表示关闭 AOF 持久化。 配置文件启动 AOF 的优缺点修改配置文件的缺点是每次修改配置文件都要重启 Redis 服务才能生效,优点是无论重启多少次 Redis 服务,配置文件中设置的配置信息都不会失效

3 触发持久化

AOF 持久化开启之后,只要满足一定条件,就会触发 AOF 持久化。AOF 的触发条件分为两种:自动触发和手动触发。

1)自动触发

有两种情况可以自动触发 AOF 持久化,分为是:满足 AOF 设置的策略触发和**满足 AOF 重写触发。**其中,AOF 重写触发会在本文的后半部分详细介绍,这里重点来说 AOF 持久化策略都有哪些。

AOF 持久化策略,分为以下三种:

  • always:每条 Redis 操作命令都会写入磁盘,最多丢失一条数据;
  • everysec:每秒钟写入一次磁盘,最多丢失一秒的数据;
  • no:不设置写入磁盘的规则,根据当前操作系统来决定何时写入磁盘,Linux 默认 30s 写入一次数据至磁盘。

这三种配置可以在 Redis 的配置文件(redis.conf)中设置,如下代码所示:

1
2
# 开启每秒写入一次的持久化策略
appendfsync everysec

小贴士:因为每次写入磁盘都会对 Redis 的性能造成一定的影响,所以要根据用户的实际情况设置相应的策略,一般设置每秒写入一次磁盘的频率就可以满足大部分的使用场景了。

触发自动持久化的两种情况,如下图所示:

image-20240604101208002

2)手动触发

在客户端执行 bgrewriteaof 命令就可以手动触发 AOF 持久化,如下图所示:

image-20240604101247714

可以看出执行完 bgrewriteaof 命令之后,AOF 持久化就会被触发。

4 AOF 文件重写

AOF 是通过记录 Redis 的执行命令来持久化(保存)数据的,所以随着时间的流逝 AOF 文件会越来越多,这样不仅增加了服务器的存储压力,也会造成 Redis 重启速度变慢,为了解决这个问题 Redis 提供了 AOF 重写的功能。

1)什么是 AOF 重写?

【easy say:直接从内存中取出数据生成命令】

AOF 重写指的是它会直接读取 Redis 服务器当前的状态,并压缩保存为 AOF 文件。例如,我们增加了一个计数器,并对它做了 99 次修改,如果不做 AOF 重写的话,那么持久化文件中就会有 100 条记录执行命令的信息,而 AOF 重写之后,之后记录一条此计数器最终的结果信息,这样就去除了所有的无效信息。

2)AOF 重写实现

触发 AOF 文件重写,要满足两个条件,这两个条件也是配置在 Redis 配置文件中的,它们分别:

  • auto-aof-rewrite-min-size:允许 AOF 重写的最小文件容量,默认是 64mb 。
  • auto-aof-rewrite-percentage:AOF 文件重写的大小比例,默认值是 100,表示 100%,也就是只有当前 AOF 文件,比最后一次(上次)的 AOF 文件大一倍时,才会启动 AOF 文件重写。

查询 auto-aof-rewrite-min-size 和 auto-aof-rewrite-percentage 的值,可使用 config get xxx 命令,如下图所示:

1
2
3
127.0.0.1:6379> config get auto-aof-rewrite-min-size
1) "auto-aof-rewrite-min-size"
2) "67108864"

小贴士:只有同时满足 auto-aof-rewrite-min-size 和 auto-aof-rewrite-percentage 设置的条件,才会触发 AOF 文件重写。

注意:使用 bgrewriteaof 命令,可以自动触发 AOF 文件重写。

3)AOF 重写流程

AOF 文件重写是生成一个全新的文件,并把当前数据的最少操作命令保存到新文件上,当把所有的数据都保存至新文件之后,Redis 会交换两个文件,并把最新的持久化操作命令追加到新文件上。

详细说:在执行BGREWRITEAOF命令时,Redis服务器会维护一个AOF重写缓冲区,该缓冲区会在子进程创建新AOF文件期间,记录服务器执行的所有写命令。当子进程完成创建新AOF文件的工作后,服务器就会重写缓冲区中的所有内容追加到新的AOF文件的末尾,使得新旧两个AOF文件所保存的数据库状态一致。最后,服务器用新的AOF文件替换旧的AOF文件,以此来完成AOF文件重写操作。

5 配置说明

合理的设置 AOF 的配置,可以保障 Redis 高效且稳定的运行,以下是 AOF 的全部配置信息和说明。

AOF 的配置参数在 Redis 的配置文件中,也就是 Redis 根路径下的 redis.conf 文件中,配置参数和说明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 是否开启 AOF,yes 为开启,默认是关闭
appendonly no

# AOF 默认文件名
appendfilename "appendonly.aof"

# AOF 持久化策略配置
# appendfsync always
appendfsync everysec
# appendfsync no

# AOF 文件重写的大小比例,默认值是 100,表示 100%,也就是只有当前 AOF 文件,比最后一次的 AOF 文件大一倍时,才会启动 AOF 文件重写。
auto-aof-rewrite-percentage 100

# 允许 AOF 重写的最小文件容量
auto-aof-rewrite-min-size 64mb

# 是否开启启动时加载 AOF 文件效验,默认值是 yes,表示尽可能的加载 AOF 文件,忽略错误部分信息,并启动 Redis 服务。
# 如果值为 no,则表示,停止启动 Redis,用户必须手动修复 AOF 文件才能正常启动 Redis 服务。
aof-load-truncated yes

其中比较重要的是 appendfsync 参数,用它来设置 AOF 的持久化策略,可以选择按时间间隔或者操作次数来存储 AOF 文件,这个参数的三个值在文章开头有说明,这里就不再复述了。

fsync 的英文全称是 File Synchronization,意为文件同步。

6 数据恢复

1)正常数据恢复

正常情况下,只要开启了 AOF 持久化,并且提供了正常的 appendonly.aof 文件,在 Redis 启动时就会自定加载 AOF 文件并启动,执行如下图所示:

image-20240604102739331

其中 DB loaded from append only file...... 表示 Redis 服务器在启动时,先去加载了 AOF 持久化文件。

小贴士:默认情况下 appendonly.aof 文件保存在 Redis 的根目录下。

持久化文件加载规则

  • 如果只开启了 AOF 持久化,Redis 启动时只会加载 AOF 文件(appendonly.aof),进行数据恢复;
  • 如果只开启了 RDB 持久化,Redis 启动时只会加载 RDB 文件(dump.rdb),进行数据恢复;
  • 如果同时开启了 RDB 和 AOF 持久化,Redis 启动时只会加载 AOF 文件(appendonly.aof),进行数据恢复。

在 AOF 开启的情况下,即使 AOF 文件不存在,只有 RDB 文件,也不会加载 RDB 文件。 AOF 和 RDB 的加载流程如下图所示:

image-20240604102750574

2)简单异常数据恢复

在 AOF 写入文件时如果服务器崩溃,或者是 AOF 存储已满的情况下,AOF 的最后一条命令可能被截断,这就是异常的 AOF 文件。

在 AOF 文件异常的情况下,如果为修改 Redis 的配置文件,也就是使用 aof-load-truncated 等于 yes 的配置,Redis 在启动时会忽略最后一条命令,并顺利启动 Redis,执行结果如下:

1
2
3
4
5
* Reading RDB preamble from AOF file...
* Reading the remaining AOF tail...
# !!! Warning: short read while loading the AOF file !!!
# !!! Truncating the AOF at offset 439 !!!
# AOF loaded anyway because aof-load-truncated is enabled

3)复杂异常数据恢复

AOF 文件可能出现更糟糕的情况,当 AOF 文件不仅被截断,而且中间的命令也被破坏,这个时候再启动 Redis 会提示错误信息并中止运行,错误信息如下:

1
2
* Reading the remaining AOF tail...
# Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>

出现此类问题的解决方案如下:

  1. 首先使用 AOF 修复工具,检测出现的问题,在命令行中输入 redis-check-aof 命令,它会跳转到出现问题的命令行,这个时候可以尝试手动修复此文件;
  2. 如果无法手动修复,我们可以使用 redis-check-aof --fix 自动修复 AOF 异常文件,不过执行此命令,可能会导致异常部分至文件末尾的数据全部被丢弃。

7 优缺点

AOF 优点

  • AOF 持久化保存的数据更加完整,AOF 提供了三种保存策略:每次操作保存、每秒钟保存一次、跟随系统的持久化策略保存,其中每秒保存一次,从数据的安全性和性能两方面考虑是一个不错的选择,也是 AOF 默认的策略,即使发生了意外情况,最多只会丢失 1s 钟的数据;
  • AOF 采用的是命令追加的写入方式,所以不会出现文件损坏的问题,即使由于某些意外原因,导致了最后操作的持久化数据写入了一半,也可以通过 redis-check-aof 工具轻松的修复;
  • AOF 持久化文件,非常容易理解和解析,它是把所有 Redis 键值操作命令,以文件的方式存入了磁盘。即使不小心使用 flushall 命令删除了所有键值信息,只要使用 AOF 文件,删除最后的 flushall 命令,重启 Redis 即可恢复之前误删的数据。

AOF 缺点

  • 对于相同的数据集来说,AOF 文件要大于 RDB 文件;
  • 在 Redis 负载比较高的情况下,RDB 比 AOF 性能更好;
  • RDB 使用快照的形式来持久化整个 Redis 数据,而 AOF 只是将每次执行的命令追加到 AOF 文件中,因此从理论上说,RDB 比 AOF 更健壮。

8 小结

AOF 保存数据更加完整,它可以记录每次 Redis 的键值变化,或者是选择每秒保存一次数据。AOF 的持久化文件更加易读,但相比与二进制的 RDB 来说,所占的存储空间也越大,为了解决这个问题,AOF 提供自动化重写机制,最大程度的减少了 AOF 占用空间大的问题。同时 AOF 也提供了很方便的异常文件恢复命令: redis-check-aof --fix ,为使用 AOF 提供了很好的保障。

【AOF,开启方式,触发持久化,AOF重写,config文件配置文件名】

总结下:两种情况可以自动触发 AOF 持久化

满足 AOF 设置的策略触发满足 AOF 重写触发

满足策略触发(不会fork子进程):

  1. 每次有写操作就同步(always):这种策略下,每次执行写命令后,Redis会立即将该命令写入硬盘。这种模式下,数据的安全性最高,但性能影响也最大。这种情况下,Redis主进程直接进行写操作,不会新启动子进程。
  2. 每秒同步一次(everysec):这种策略下,Redis会每秒将缓冲区中的写命令同步到硬盘。这种模式下,如果Redis意外宕机,最多可能丢失一秒内的写命令。这种模式下,性能和数据安全性比较平衡。这种情况下,Redis主进程直接进行写操作,不会新启动子进程。
  3. 完全由操作系统控制(no):这种策略下,Redis不主动进行同步操作,完全交给操作系统来决定何时同步数据到硬盘。这种模式下,性能最好,但数据安全性最差。这种情况下,Redis主进程直接进行写操作,不会新启动子进程。

满足重写触发

  • 在执行BGREWRITEAOF命令时,Redis服务器会维护一个AOF重写缓冲区,该缓冲区会在子进程创建新AOF文件期间,记录服务器执行的所有写命令。当子进程完成创建新AOF文件的工作后,服务器就会重写缓冲区中的所有内容追加到新的AOF文件的末尾,使得新旧两个AOF文件所保存的数据库状态一致。最后,服务器用新的AOF文件替换旧的AOF文件,以此来完成AOF文件重写操作。

05 Redis 持久化——混合持久化

RDB 和 AOF 持久化各有利弊,RDB 可能会导致一定时间内的数据丢失,而 AOF 由于文件较大则会影响 Redis 的启动速度,为了能同时使用 RDB 和 AOF 各种的优点,Redis 4.0 之后新增了混合持久化的方式。

在开启混合持久化的情况下,AOF 重写时会把 Redis 的持久化数据,以 RDB 的格式写入到 AOF 文件的开头,之后的数据再以 AOF 的格式化追加的文件的末尾。

混合持久化的数据存储结构如下图所示:

image-20240604104522833

1 开启混合持久化

查询是否开启混合持久化可以使用 config get aof-use-rdb-preamble 命令,执行结果如下图所示:

1
2
3
127.0.0.1:6379> config get aof-use-rdb-preamble
1) "aof-use-rdb-preamble"
2) "yes"

其中 yes 表示已经开启混合持久化,no 表示关闭,Redis 5.0 默认值为 yes。 如果是其他版本的 Redis 首先需要检查一下,是否已经开启了混合持久化,如果关闭的情况下,可以通过以下两种方式开启:

  • 通过命令行开启
  • 通过修改 Redis 配置文件开启

1)通过命令行开启

使用命令 config set aof-use-rdb-preamble yes 执行结果如下图所示:

1
2
127.0.0.1:6379>  config set aof-use-rdb-preamble yes
OK

小贴士:命令行设置配置的缺点是重启 Redis 服务之后,设置的配置就会失效。

2)通过修改 Redis 配置文件开启

在 Redis 的根路径下找到 redis.conf 文件,把配置文件中的 aof-use-rdb-preamble no 改为 aof-use-rdb-preamble yes 如下图所示:

2 实例运行

当在混合持久化关闭的情况下,使用 bgrewriteaof 触发 AOF 文件重写之后,查看 appendonly.aof 文件的持久化日志,如下图所示:

image-20240604104702630

可以看出,当混合持久化关闭的情况下 AOF 持久化文件存储的为标准的 AOF 格式的文件。 当混合持久化开启的模式下,使用 bgrewriteaof 命令触发 AOF 文件重写,得到 appendonly.aof 的文件内容如下图所示:

3 数据恢复和源码解析

混合持久化的数据恢复和 AOF 持久化过程是一样的,只需要把 appendonly.aof 放到 Redis 的根目录,在 Redis 启动时,只要开启了 AOF 持久化,Redis 就会自动加载并恢复数据。 Redis 启动信息如下图所示:

image-20240604104723137

可以看出 Redis 在服务器初始化的时候加载了 AOF 文件的内容。

1)混合持久化的加载流程

混合持久化的加载流程如下:

  1. 判断是否开启 AOF 持久化,开启继续执行后续流程,未开启执行加载 RDB 文件的流程;
  2. 判断 appendonly.aof 文件是否存在,文件存在则执行后续流程;
  3. 判断 AOF 文件开头是 RDB 的格式, 先加载 RDB 内容再加载剩余的 AOF 内容;
  4. 判断 AOF 文件开头不是 RDB 的格式,直接以 AOF 格式加载整个文件。

AOF 加载流程图如下图所示:

image-20240604104735691

2)源码解析

Redis 判断 AOF 文件的开头是否是 RDB 格式的,是通过关键字 REDIS 判断的,RDB 文件的开头一定是 REDIS 关键字开头的,判断源码在 Redis 的 src/aof.c 中,核心代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
char sig[5]; /* "REDIS" */
if (fread(sig,1,5,fp) != 5 || memcmp(sig,"REDIS",5) != 0) {
// AOF 文件开头非 RDB 格式,非混合持久化文件
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
} else {
/* RDB preamble. Pass loading the RDB functions. */
rio rdb;

serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
rioInitWithFile(&rdb,fp);
// AOF 文件开头是 RDB 格式,先加载 RDB 再加载 AOF
if (rdbLoadRio(&rdb,NULL,1) != C_OK) {
serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
goto readerr;
} else {
serverLog(LL_NOTICE,"Reading the remaining AOF tail...");
}
}
// 加载 AOF 格式的数据

可以看出 Redis 是通过判断 AOF 文件的开头是否是 REDIS 关键字,来确定此文件是否为混合持久化文件的。

小贴士:AOF 格式的开头是 *,而 RDB 格式的开头是 REDIS。

4 优缺点

混合持久化优点:

  • 混合持久化结合了 RDB 和 AOF 持久化的优点,开头为 RDB 的格式,使得 Redis 可以更快的启动,同时结合 AOF 的优点,有减低了大量数据丢失的风险。

混合持久化缺点:

  • AOF 文件中添加了 RDB 格式的内容,使得 AOF 文件的可读性变得很差;
  • 兼容性差,如果开启混合持久化,那么此混合持久化 AOF 文件,就不能用在 Redis 4.0 之前版本了。

5 持久化最佳实践

持久化虽然保证了数据不丢失,但同时拖慢了 Redis 的运行速度,那怎么更合理的使用 Redis 的持久化功能呢? Redis 持久化的最佳实践可从以下几个方面考虑。

1)控制持久化开关

使用者可根据实际的业务情况考虑,如果对数据的丢失不敏感的情况下,可考虑关闭 Redis 的持久化,这样所以的键值操作都在内存中,就可以保证最高效率的运行 Redis 了。 持久化关闭操作:

  • 关闭 RDB 持久化,使用命令: config set save ""
  • 关闭 AOF 和 混合持久化,使用命令: config set appendonly no

2)主从部署

使用主从部署,一台用于响应主业务,一台用于数据持久化,这样就可能让 Redis 更加高效的运行。

3)使用混合持久化

混合持久化结合了 RDB 和 AOF 的优点,Redis 5.0 默认是开启的。

4)使用配置更高的机器

Redis 对 CPU 的要求并不高,反而是对内存和磁盘的要求很高,因为 Redis 大部分时候都在做读写操作,使用更多的内存和更快的磁盘,对 Redis 性能的提高非常有帮助。

06 字符串使用与内部实现原理

Redis 发展到现在已经有 9 种数据类型了,其中最基础、最常用的数据类型有 5 种,它们分别是:字符串类型、列表类型、哈希表类型、集合类型、有序集合类型,而在这 5 种数据类型中最常用的是字符串类型,所以本文我们先从字符串的使用开始说起。

字符串类型的全称是 Simple Dynamic Strings 简称 SDS,中文意思是:简单动态字符串。它是以键值对 key-value 的形式进行存储的,根据 key 来存储和获取 value 值,它的使用相对来说比较简单,但在实际项目中应用非常广泛。

1 字符串类型能做什么?

字符串类型的使用场景有很多,但从功能的角度来区分,大致可分为以下两种:

  • 字符串存储和操作;
  • 整数类型和浮点类型的存储和计算。

字符串最常用的业务场景有以下几个。

1)页面数据缓存

我们知道,一个系统最宝贵的资源就是数据库资源,随着公司业务的发展壮大,数据库的存储量也会越来越大,并且要处理的请求也越来越多,当数据量和并发量到达一定级别之后,数据库就变成了拖慢系统运行的“罪魁祸首”,为了避免这种情况的发生,我们可以把查询结果放入缓存(Redis)中,让下次同样的查询直接去缓存系统取结果,而非查询数据库,这样既减少了数据库的压力,同时也提高了程序的运行速度。

介于以上这个思路,我们可以把文章详情页的数据放入缓存系统。具体的做法是先将文章详情页序列化为字符串存入缓存,再从缓存中读取到字符串,反序列化成对象,然后再赋值到页面进行显示 (当然也可以用哈希类型进行存储,这会在下一篇文章中讲到),这样我们就实现了文章详情页的缓存功能,架构流程对比图如下所示。

原始系统运行流程图:

image-20240604104932243

引入缓存系统后的流程图:

image-20240604104943641

2)数字计算与统计

Redis 可以用来存储整数和浮点类型的数据,并且可以通过命令直接累加并存储整数信息,这样就省去了每次先要取数据、转换数据、拼加数据、再存入数据的麻烦,只需要使用一个命令就可以完成此流程,具体实现过程本文下半部分会讲。这样我们就可以使用此功能来实现访问量的统计,当有人访问时访问量 +1 就可以了。

3)共享 Session 信息

通常我们在开发后台管理系统时,会使用 Session 来保存用户的会话(登录)状态,这些 Session 信息会被保存在服务器端,但这只适用于单系统应用,如果是分布式系统此模式将不再适用。

例如用户一的 Session 信息被存储在服务器一,但第二次访问时用户一被分配到服务器二,这个时候服务器并没有用户一的 Session 信息,就会出现需要重复登录的问题。分布式系统每次会把请求随机分配到不同的服务器,因此我们需要借助缓存系统对这些 Session 信息进行统一的存储和管理,这样无论请求发送到那台服务器,服务器都会去统一的缓存系统获取相关的 Session 信息,这样就解决了分布式系统下 Session 存储的问题。

分布式系统单独存储 Session 流程图:

image-20240604104957031

分布式系统使用同一的缓存系统存储 Session 流程图:

image-20240604105008408

2 字符串如何使用?

通常我们会使用两种方式来操作 Redis:第一种是使用命令行来操作,例如 redis-cli;另一种是使用代码的方式来操作,下面我们分别来看。

1)命令行操作方式

字符串的操作命令有很多,但大体可分为以下几类:

  • 单个键值对操作
  • 多个键值对操作
  • 数字统计

我们本文使用 redis-cli 来实现对 Redis 的操作,在使用命令之前,先输入 redis-cli 来链接到 Redis 服务器。

① 单个键值对操作
a.添加键值对

语法:set key value [expiration EX seconds|PX milliseconds] [NX|XX] 示例:

1
2
127.0.0.1:6379> set k1 val1
OK
b.获取键值对

语法:get key 示例:

1
2
127.0.0.1:6379> get k1
"val1"
c.给元素追加值

语法:append key value 示例:

1
2
3
4
5
6
127.0.0.1:6379> get k1
"v1"
127.0.0.1:6379> append k1 append
(integer) 5
127.0.0.1:6379> get k1
"v1append"
d.查询字符串的长度

语法:strlen key 示例:

1
2
127.0.0.1:6379> strlen k1
(integer) 5
② 多个键值对操作
a.创建一个或多个键值对

语法:mset key value [key value …] 示例:

1
2
127.0.0.1:6379> mset k2 v2 k3 v3
OK

小贴士:mset 是一个原子性(atomic)操作,所有给定 key 都会在同一时间内被设置,不会出现某些 key 被更新,而另一些 key 没被更新的情况。

b.查询一个或多个元素

语法:mget key [key …] 示例:

1
2
3
127.0.0.1:6379> mget k2 k3
1) "v2"
2) "v3"
③ 数字统计

在 Redis 中可以直接操作整型和浮点型,例如可以直接使用命令来加、减值。

a.给整数类型的值加 1

语法:incr key 示例:

1
2
3
4
5
6
127.0.0.1:6379> get k1
"3"
127.0.0.1:6379> incr k1
(integer) 4
127.0.0.1:6379> get k1
"4"
b.给整数类型的值减 1

语法:decr key 示例:

1
2
3
4
5
6
127.0.0.1:6379> get k1
"4"
127.0.0.1:6379> decr k1
(integer) 3
127.0.0.1:6379> get k1
"3"
c.根据 key 减去指定的值

语法:decrby key decrement 示例:

1
2
3
4
5
6
127.0.0.1:6379> get k1
"3"
127.0.0.1:6379> decrby k1 2
(integer) 1
127.0.0.1:6379> get k1
"1"

如果 key 不存在,则会先初始化此 key 为 0 ,然后再执行减法操作:

1
2
3
4
5
6
127.0.0.1:6379> get k2
(nil)
127.0.0.1:6379> decrby k2 3
(integer) -3
127.0.0.1:6379> get k2
"-3"
d.根据 key 加指定的整数值

语法:incrby key increment 示例:

1
2
3
4
5
6
127.0.0.1:6379> get k1
"1"
127.0.0.1:6379> incrby k1 2
(integer) 3
127.0.0.1:6379> get k1
"3"

如果 key 不存在,则会先初始化此 key 为 0 ,然后再执行加整数值的操作:

1
2
3
4
5
6
127.0.0.1:6379> get k3
(nil)
127.0.0.1:6379> incrby k3 5
(integer) 5
127.0.0.1:6379> get k3
"5"
e.根据 key 加上指定的浮点数

语法:incrbyfloat key increment 示例:

1
2
3
4
5
6
127.0.0.1:6379> get k3
"5"
127.0.0.1:6379> incrbyfloat k3 4.9
"9.9"
127.0.0.1:6379> get k3
"9.9"

如果 key 不存在,则会先初始化此 key 为 0 ,然后再执行加浮点数的操作:

1
2
3
4
5
6
127.0.0.1:6379> get k4
(nil)
127.0.0.1:6379> incrbyfloat k4 4.4
"4.4"
127.0.0.1:6379> get k4
"4.4"

更多使用命令,详见附录部分。

2)代码操作方式

本文我们使用 Java 语言来实现对 Redis 的操作,首先我们在项目中添加对 Jedis 框架的引用,如果是 Maven 项目,我们会在 pom.xml 文件中添加如下信息:

1
2
3
4
5
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${version}</version>
</dependency>

Jedis 是 Redis 官方推荐的 Java 客户端开发包,用于实现快速简单的操作 Redis。添加完 Jedis 之后,我们来写具体的操作代码,操作函数与命令方式的调用比较相似,如下代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import redis.clients.jedis.Jedis;
import java.util.List;

public class StringExample {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// jedis.auth("xxx"); // 输入密码,没有密码,可以不设置
// 添加一个元素
jedis.set("mystr", "redis");
// 获取元素
String myStr = jedis.get("mystr");
System.out.println(myStr); // 输出:redis
// 添加多个元素(key,value,key2,value2)
jedis.mset("db", "redis", "lang", "java");
// 获取多个元素
List<String> mlist = jedis.mget("db", "lang");
System.out.println(mlist); // 输出:[redis, java]
// 给元素追加字符串
jedis.append("db", ",mysql");
// 打印追加的字符串
System.out.println(jedis.get("db")); // 输出:redis,mysql
// 当 key 不存在时,赋值键值
Long setnx = jedis.setnx("db", "db2");
// 因为 db 元素已经存在,所以会返回 0 条修改
System.out.println(setnx); // 输出:0
// 字符串截取
String range = jedis.getrange("db", 0, 2);
System.out.println(range); // 输出:red
// 添加键值并设置过期时间(单位:毫秒)
String setex = jedis.setex("db", 1000, "redis");
System.out.println(setex); // 输出:ok
// 查询键值的过期时间
Long ttl = jedis.ttl("db");
System.out.println(ttl); // 输出:1000
}
}

3 代码实战

本文的上半部分我们讲到了字符串的很多种使用场景,本小节就以字符串存储用户对象信息为例,我们先将用户对象信息序列化为字符串存储在 Redis,再从 Redis 中取出字符串并反序列化为对象信息为例,使用 Java 语言来实现。

首先添加 JSON 转换类,用于对象和字符串之间的序列化和反序列化,我们这里采用 Google 的 Gson 来实现,首先在 pom.xml 文件中添加如下引用:

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>

添加完 Gson 引用之后,我们来写具体的业务代码,先见用户信息序列化之后存储在 Redis 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
Jedis jedis = new Jedis("xxx.xxx.xxx.xxx", 6379);
jedis.auth("xxx");
Gson gson = new Gson();
// 构建用户数据
User user = new User();
user.setId(1);
user.setName("Redis");
user.setAge(10);
String jsonUser = gson.toJson(user);
// 打印用户信息(json)
System.out.println(jsonUser); // 输出:{"id":1,"name":"Redis","age":10}
// 把字符串存入 Redis
jedis.set("user", jsonUser);

当使用用户信息时,我们从 Redis 反序列化出来,代码如下:

1
2
3
4
String getUserData = jedis.get("user");
User userData = gson.fromJson(getUserData, User.class);
// 打印对象属性信息
System.out.println(userData.getId() + ":" + userData.getName()); // 输出结果:1:Redis

以上两个步骤就完成了用户信息存放至 Redis 中的过程,也是常用的经典使用场景之一。

4 字符串的内部实现

1)源码分析

Redis 3.2 之前 SDS 源码如下:

1
2
3
4
5
struct sds{
int len; // 已占用的字节数
int free; // 剩余可以字节数
char buf[]; // 存储字符串的数据空间
}

可以看出 Redis 3.2 之前 SDS 内部是一个带有长度信息的字节数组,存储结构如下图所示:

image-20240604105046151

为了更加有效的利用内存,Redis 3.2 优化了 SDS 的存储结构,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
typedef char *sds;

struct __attribute__ ((__packed__)) sdshdr5 { // 对应的字符串长度小于 1<<5
unsigned char flags;
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr8 { // 对应的字符串长度小于 1<<8
uint8_t len; /* 已使用长度,1 字节存储 */
uint8_t alloc; /* 总长度 */
unsigned char flags;
char buf[]; // 真正存储字符串的数据空间
};
struct __attribute__ ((__packed__)) sdshdr16 { // 对应的字符串长度小于 1<<16
uint16_t len; /* 已使用长度,2 字节存储 */
uint16_t alloc;
unsigned char flags;
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr32 { // 对应的字符串长度小于 1<<32
uint32_t len; /* 已使用长度,4 字节存储 */
uint32_t alloc;
unsigned char flags;
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr64 { // 对应的字符串长度小于 1<<64
uint64_t len; /* 已使用长度,8 字节存储 */
uint64_t alloc;
unsigned char flags;
char buf[];
};

这样就可以针对不同长度的字符串申请相应的存储类型,从而有效的节约了内存使用。

2)数据类型

我们可以使用 object encoding key 命令来查看对象(键值对)存储的数据类型,当我们使用此命令来查询 SDS 对象时,发现 SDS 对象竟然包含了三种不同的数据类型:int、embstr 和 raw。

① int 类型
1
2
3
4
127.0.0.1:6379> set key 666
OK
127.0.0.1:6379> object encoding key
"int"
② embstr 类型
1
2
3
4
127.0.0.1:6379> set key abc
OK
127.0.0.1:6379> object encoding key
"embstr"
③ raw 类型
1
2
3
4
127.0.0.1:6379> set key abcdefghigklmnopqrstyvwxyzabcdefghigklmnopqrs
OK
127.0.0.1:6379> object encoding key
"raw"

int 类型很好理解,整数类型对应的就是 int 类型,而字符串则对应是 embstr 类型,当字符串长度大于 44 字节时,会变为 raw 类型存储。

raw和embstr类型的区别

这个问题涉及到了Redis内存分配的细节。在Redis中,字符串类型的对象可以使用三种不同的编码方式:int、raw和embstr。

  1. int编码:当字符串可以被解析为数字时,Redis会选择int编码。
  2. raw编码:当字符串长度超过39字节时,Redis会选择raw编码。
  3. embstr编码:当字符串长度小于等于39字节时,Redis会选择embstr编码。

对于raw编码,Redis会先创建一个redisObject结构,然后再创建一个sdshdr结构来存储实际的字符串。这两个结构是分开的,所以Redis需要调用两次内存分配函数。

而对于embstr编码,Redis会一次性分配一块连续的内存,这块内存包含了redisObject结构和sdshdr结构。这样做的好处是减少了内存碎片,提高了内存使用效率。但是,这种方式只适用于较小的字符串,因为大字符串会导致内存分配的开销过大。

所以,这里的"两次"和"一次",实际上是指Redis在进行内存分配时的次数

3)为什么是 44 字节?

在 Redis 中,如果 SDS 的存储值大于 64 字节时,Redis 的内存分配器会认为此对象为大字符串,并使用 raw 类型来存储,当数据小于 64 字节时(字符串类型),会使用 embstr 类型存储。既然内存分配器的判断标准是 64 字节,那为什么 embstr 类型和 raw 类型的存储判断值是 44 字节?

这是因为 Redis 在存储对象时,会创建此对象的关联信息,redisObject 对象头和 SDS 自身属性信息,这些信息都会占用一定的存储空间,因此长度判断标准就从 64 字节变成了 44 字节。

在 Redis 中,所有的对象都会包含 redisObject 对象头。我们先来看 redisObject 对象的源码:

1
2
3
4
5
6
7
typedef struct redisObject {
unsigned type:4; // 4 bit
unsigned encoding:4; // 4 bit
unsigned lru:LRU_BITS; // 3 个字节
int refcount; // 4 个字节
void *ptr; // 8 个字节
} robj;

它的参数说明如下:

  • type:对象的数据类型,例如:string、list、hash 等,占用 4 bits 也就是半个字符的大小;
  • encoding:对象数据编码,占用 4 bits;
  • lru:记录对象的 LRU(Least Recently Used 的缩写,即最近最少使用)信息,内存回收时会用到此属性,占用 24 bits(3 字节);
  • refcount:引用计数器,占用 32 bits(4 字节);
  • *ptr:对象指针用于指向具体的内容,占用 64 bits(8 字节)。

redisObject 总共占用 0.5 bytes + 0.5 bytes + 3 bytes + 4 bytes + 8 bytes = 16 bytes(字节)。

了解了 redisObject 之后,我们再来看 SDS 自身的数据结构,从 SDS 的源码可以看出,SDS 的存储类型一共有 5 种:SDSTYPE5、SDSTYPE8、SDSTYPE16、SDSTYPE32、SDSTYPE64,在这些类型中最小的存储类型为 SDSTYPE5,但 SDSTYPE5 类型会默认转成 SDSTYPE8,以下源码可以证明

那我们直接来看 SDSTYPE8 的源码:

1
2
3
4
5
6
struct __attribute__ ((__packed__)) sdshdr8 {
uint8_t len; // 1 byte
uint8_t alloc; // 1 byte
unsigned char flags; // 1 byte
char buf[];
};

可以看出除了内容数组(buf)之外,其他三个属性分别占用了 1 个字节,最终分隔字符等于 64 字节,减去 redisObject 的 16 个字节,再减去 SDS 自身的 3 个字节,再减去结束符 \0 结束符占用 1 个字节,最终的结果是 44 字节(64-16-3-1=44),内存占用如下图所示:

image-20240604105115478

5 小结

本文介绍了字符串的定义及其使用,它的使用主要分为:单键值对操作、多键值对操作、数字统计、键值对过期操作、字符串操作进阶等。同时也介绍了字符串使用的三个场景,字符串类型可用作为:页面数据缓存,可以缓存一些文章详情信息等;数字计算与统计,例如计算页面的访问次数;也可以用作 Session 共享,用来记录管理员的登录信息等。同时我们深入的介绍了字符串的五种数据存储结构,以及字符串的三种内部数据类型,如下图所示

image-20240604105128355

07 附录:更多字符串操作命令

键值对过期操作

a.添加键值对并设置过期时间

语法:set key value [expiration EX seconds|PX milliseconds] [NX|XX] 示例:

1
2
127.0.0.1:6379> set k1 val1 ex 1000
OK

设置键值对 k1=val1,过期时间为 1000 秒。 查询键的过期时间可以使用 ttl key,如下代码所示:

1
2
127.0.0.1:6379> ttl k1
(integer) 997

b.赋值字符串,并设置过期时间(单位/秒)

语法:setex key seconds value 示例:

1
2
3
4
5
6
127.0.0.1:6379> setex k1 1000 v1
OK
127.0.0.1:6379> ttl k1
(integer) 999
127.0.0.1:6379> get k1
"v1"

如果 key 已经存在,setex 命令将会覆写原来的旧值。

c.赋值字符串,并设置过期时间(单位/毫秒)

与 setex 用法类似,只不过 psetex 设置的单位是毫秒。 语法:psetex key milliseconds value 示例:

1
2
3
4
5
6
127.0.0.1:6379> psetex k1 100000 v11
OK
127.0.0.1:6379> ttl k1
(integer) 97
127.0.0.1:6379> get k1
"v11"

字符串操作进阶

a.根据指定的范围截取字符串

语法:getrange key start end 示例:

1
2
3
4
5
6
7
8
127.0.0.1:6379> get hello
"hello world"
127.0.0.1:6379> getrange hello 0 4
"hello"
127.0.0.1:6379> getrange hello 0 -1
"hello world"
127.0.0.1:6379> getrange hello 0 -2
"hello worl"

负数表示从字符串最后开始计数, -1 表示最后一个字符, -2 表示倒数第二个,以此类推。

b.设置字符串新值并返回旧值

语法:getset key value 示例:

1
2
3
4
5
6
127.0.0.1:6379> get db
"redis"
127.0.0.1:6379> getset db mysql
"redis"
127.0.0.1:6379> get db
"mysql"

使用 getset 命令时,如果 key 不为字符串会报错,如下效果所示:

1
2
3
4
127.0.0.1:6379> type myset
set
127.0.0.1:6379> getset myset v1
(error) WRONGTYPE Operation against a key holding the wrong kind of value

根据 type 命令可以查询出 key 所对应的数据类型为非字符串,在使用 getset 命令就会报错。

c.赋值(创建)键值对,当 key 不存在时

如果 key 已经存在,则执行命令无效,不会修改原来的值,否则会创建新的键值对。 语法:setnx key value 示例:

1
2
3
4
5
6
7
8
127.0.0.1:6379> setnx k9 v9
(integer) 1
127.0.0.1:6379> get k9
"v9"
127.0.0.1:6379> setnx k9 v99
(integer) 0
127.0.0.1:6379> get k9
"v9"

d.设置一个或多个键值,当所有键值都不存在时

语法:msetnx key value [key value …] 示例:

1
2
3
4
5
127.0.0.1:6379> msetnx k5 v5 k6 v6
(integer) 1
127.0.0.1:6379> mget k5 k6
1) "v5"
2) "v6"

注意:msetnx 是一个原子操作,当一个操作失败时,其他操作也会失败。例如,如果有一个已经存在的值,那么全部键值都会设置失败,效果如下:

1
2
3
4
5
6
7
8
9
10
127.0.0.1:6379> get k1
"val1"
127.0.0.1:6379> get k8
(nil)
127.0.0.1:6379> msetnx k1 v1 k8 v8
(integer) 0
127.0.0.1:6379> get k1
"val1"
127.0.0.1:6379> get k8
(nil)

e.截取字符串并赋值

语法:setrange key offset value 示例:

1
2
3
4
5
6
127.0.0.1:6379> get hello
"hello java"
127.0.0.1:6379> setrange hello 6 redis
(integer) 11
127.0.0.1:6379> get hello
"hello redis"

如果待截取的键不存在,会当作空白字符串处理,效果如下:

1
2
3
4
127.0.0.1:6379> setrange mystr 3 mystring
(integer) 11
127.0.0.1:6379> get mystring
(nil)

以上这些命令基本涵盖了所有的字符串操作,有些不常用,但很好用,例如 setnx key value 命令,当 key 已经存在,则执行命令无效,并不会覆盖原有的值,如果没有此 key 则会新创建一个键值对。

08 字典使用与内部实现原理

字典类型 (Hash) 又被成为散列类型或者是哈希表类型,它是将一个键值 (key) 和一个特殊的“哈希表”关联起来,这个“哈希表”表包含两列数据:字段和值。例如我们使用字典类型来存储一篇文章的详情信息,存储结构如下图所示:

image-20240604105548439

同理我们也可以使用字典类型来存储用户信息,并且使用字典类型来存储此类信息,是不需要手动序列化和反序列化数据的,所以使用起来更加的方便和高效。

1.基础使用

首先我们使用命令行工具 redis-cli,来对字典类型进行相关的操作。

1)插入单个元素

语法:hset key field value 示例:【hset表示hash】

1
2
3
4
127.0.0.1:6379> hset myhash key1 value1
(integer) 1
127.0.0.1:6379> hset myhash key2 value2
(integer) 1

2)当某键不存在时,插入数据

语法:hsetnx key field value 示例:

1
2
3
4
127.0.0.1:6379> hsetnx myhash k4 v4
(integer) 1
127.0.0.1:6379> hget myhash k4
"v4"

如果尝试插入已存在的键,不会改变原来的值,示例如下:

1
2
3
4
127.0.0.1:6379> hsetnx myhash k4 val4
(integer) 0
127.0.0.1:6379> hget myhash k4
"v4"

尝试修改已经存在的 k4 赋值为 val4,但并没有生效,查询 k4 的结果依然是原来的值 v4。

3)查询单个元素

语法:hget key field 示例:

1
2
127.0.0.1:6379> hget myhash key1
"value1"

4)删除 key 中的一个或多个元素

语法:hdel myhash field [field …] 示例:

1
2
127.0.0.1:6379> hdel myhash key1 key2
(integer) 1

注意:不能使用类似于 hdel myhash 的命令删除整个 Hash 值的。

5)某个整数值累加计算

语法:hincrby key field increment 示例:

1
2
3
4
5
6
127.0.0.1:6379> hset myhash k3 3
(integer) 1
127.0.0.1:6379> hincrby myhash k3 2
(integer) 5
127.0.0.1:6379> hget myhash k3
"5"

更多操作命令,详见附录部分。

2.代码实战

接下来我们用 Java 代码实现对 Redis 的操作,同样我们先引入 Jedis 框架 ,接下来再用代码来对字典类型进行操作,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import redis.clients.jedis.Jedis;
import java.util.Map;

public class HashExample {
public static void main(String[] args) throws InterruptedException {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 把 Key 值定义为变量
final String REDISKEY = "myhash";
// 插入单个元素
jedis.hset(REDISKEY, "key1", "value1");
// 查询单个元素
Map<String, String> singleMap = jedis.hgetAll(REDISKEY);
System.out.println(singleMap.get("key1")); // 输出:value1
// 查询所有元素
Map<String, String> allMap = jedis.hgetAll(REDISKEY);
System.out.println(allMap.get("k2")); // 输出:val2
System.out.println(allMap); // 输出:{key1=value1, k1=val1, k2=val2, k3=9.2, k4=v4...}
// 删除单个元素
Long delResult = jedis.hdel(REDISKEY, "key1");
System.out.println("删除结果:" + delResult); // 输出:删除结果:1
// 查询单个元素
System.out.println(jedis.hget(REDISKEY, "key1")); // 输出:返回 null
}
}

从代码中可以看出,在 Jedis 中我们可以直接使用 Map 来接收 Redis 中读取的字典类型的数据,省去了手动转化的麻烦,还是比较方便的。

3.数据结构

字典类型本质上是由数组和链表结构组成的,来看字典类型的源码实现:

1
2
3
4
5
6
7
8
9
10
typedef struct dictEntry { // dict.h
void *key;
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next; // 下一个 entry
} dictEntry;

字典类型的数据结构,如下图所示:

image-20240604105603601

通常情况下字典类型会使用数组的方式来存储相关的数据,但发生哈希冲突时才会使用链表的结构来存储数据。

4.哈希冲突

字典类型的存储流程是先将键值进行 Hash 计算,得到存储键值对应的数组索引,再根据数组索引进行数据存储,但在小概率事件下可能会出完全不相同的键值进行 Hash 计算之后,得到相同的 Hash 值,这种情况我们称之为哈希冲突

哈希冲突一般通过链表的形式解决,相同的哈希值会对应一个链表结构,每次有哈希冲突时,就把新的元素插入到链表的尾部,请参考上面数据结构的那张图。

键值查询的流程如下:

  • 通过算法 (Hash,计算和取余等) 操作获得数组的索引值,根据索引值找到对应的元素;
  • 判断元素和查找的键值是否相等,相等则成功返回数据,否则需要查看 next 指针是否还有对应其他元素,如果没有,则返回 null,如果有的话,重复此步骤。

键值查询流程,如下图所示:

image-20240604105621709

5.渐进式rehash

Redis 为了保证应用的高性能运行,提供了一个重要的机制——渐进式 rehash。 渐进式 rehash 是用来保证字典缩放效率的,也就是说在字典进行扩容或者缩容是会采取渐进式 rehash 的机制。

1)扩容

当元素数量等于数组长度时就会进行扩容操作,源码在 dict.c 文件中,核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int dictExpand(dict *d, unsigned long size)
{
/* 需要的容量小于当前容量,则不需要扩容 */
if (dictIsRehashing(d) || d->ht[0].used > size)
return DICT_ERR;
dictht n;
unsigned long realsize = _dictNextPower(size); // 重新计算扩容后的值
/* 计算新的扩容大小等于当前容量,不需要扩容 */
if (realsize == d->ht[0].size) return DICT_ERR;
/* 分配一个新的哈希表,并将所有指针初始化为NULL */
n.size = realsize;
n.sizemask = realsize-1;
n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;
if (d->ht[0].table == NULL) {
// 第一次初始化
d->ht[0] = n;
return DICT_OK;
}
d->ht[1] = n; // 把增量输入放入新 ht[1] 中
d->rehashidx = 0; // 非默认值 -1,表示需要进行 rehash
return DICT_OK;
}

从以上源码可以看出,如果需要扩容则会申请一个新的内存地址赋值给 ht[1],并把字典的 rehashindex 设置为 0,表示之后需要进行 rehash 操作。

2)缩容

当字典的使用容量不足总空间的 10% 时就会触发缩容,Redis 在进行缩容时也会把 rehashindex 设置为 0,表示之后需要进行 rehash 操作。

3)渐进式rehash流程

在进行渐进式 rehash 时,会同时保留两个 hash 结构,新键值对加入时会直接插入到新的 hash 结构中,并会把旧 hash 结构中的元素一点一点的移动到新的 hash 结构中,当移除完最后一个元素时,清空旧 hash 结构,主要的执行流程如下:

  • 扩容或者缩容时把字典中的字段 rehashidx 标识为 0;
  • 在执行定时任务或者执行客户端的 hset、hdel 等操作指令时,判断是否需要触发 rehash 操作(通过 rehashidx 标识判断),如果需要触发 rehash 操作,也就是调用 dictRehash 函数,dictRehash 函数会把 ht[0] 中的元素依次添加到新的 Hash 表 ht[1] 中;
  • rehash 操作完成之后,清空 Hash 表 ht[0],然后对调 ht[1] 和 ht[0] 的值,把新的数据表 ht[1] 更改为 ht[0],然后把字典中的 rehashidx 标识为 -1,表示不需要执行 rehash 操作。

总结:

渐进式rehash的设计主要是为了解决一次性rehash可能带来的大量计算阻塞问题。在一次性rehash过程中,需要一次性将所有的键值对从旧的哈希表迁移到新的哈希表,如果哈希表的键值对数量非常大,那么这个过程可能会消耗大量的CPU时间,导致Redis服务器在一段时间内无法处理其他命令,这就是所说的"操作阻塞"。

而渐进式rehash采取了一种分步进行的策略,它并不是一次性完成所有键值对的迁移,而是分多次、逐步地将旧哈希表的键值对迁移到新哈希表。在每次Redis执行命令时,都会顺带做一部分的迁移工作,比如迁移10个键值对。这样,即使是在需要rehash的情况下,Redis也可以保持对外的服务,处理客户端的命令请求。

同时,为了解决哈希表的冲突问题,Redis采用了开放寻址法和链地址法等处理冲突的方式,通过扩大哈希表的容量,减少哈希冲突,从而提高哈希表操作的效率。

所以,渐进式rehash和冲突处理策略共同解决了哈希表操作变慢的问题。

补充

Redis中的字典是一种关联数组,它的结构体包含一些重要的元素:

  1. type:字典私有数据的处理方法
  2. privdata:私有数据
  3. ht[2]:包含两个哈希表的数组,通常我们使用 ht[0],在rehash时使用 ht[1]
  4. rehashidx:记录rehash进度的标志,如果为-1表示rehash未进行

具体的哈希表(ht)中包含以下元素:

  1. table:数组,数组中的每个元素都是一个链表
  2. size:哈希表table的大小
  3. sizemask:哈希表大小掩码,用于计算索引值
  4. used:哈希表已有节点的数量

这是一个基本的数据结构,可以高效地进行数据查找、插入和删除操作。

6.使用场景

哈希字典的典型使用场景如下:

  • 商品购物车,购物车非常适合用哈希字典表示,使用人员唯一编号作为字典的 key,value 值可以存储商品的 id 和数量等信息;
  • 存储用户的属性信息,使用人员唯一编号作为字典的 key,value 值为属性字段和对应的值;
  • 存储文章详情页信息等。

7.小结

本文我们学习了字典类型的操作命令和在代码中的使用,也明白了字典类型实际是由数组和链表组成的,当字典进行扩容或者缩容时会进行渐进式 rehash 操作,渐进式 rehash 是用来保证 Redis 运行效率的,它的执行流程是同时保留两个哈希表,把旧表中的元素一点一点的移动到新表中,查询的时候会先查询两个哈希表,当所有元素都移动到新的哈希表之后,就会删除旧的哈希表。

09 附录:更多字典操作命令

插入一个或多个元素

语法:hmset key field value [field value …] 示例:

1
2
3
4
5
127.0.0.1:6379> hmset myhash k1 val1 k2 val2
OK
127.0.0.1:6379> hmget myhash k1 k2
1) "val1"
2) "val2"

查询一个或多个元素

语法:hmget key field [field …] 示例:

1
2
3
127.0.0.1:6379> hmget myhash k1 k2
1) "v1"
2) "v2"

查询某个 key 的所有字段

语法:hkeys key 示例:

1
2
3
127.0.0.1:6379> hkeys myhash
1) "key1"
2) "key2"

查询某个 key 的所有值

语法:hvals key 示例:

1
2
3
127.0.0.1:6379> hvals myhash
1) "value1"
2) "value2"

查询某个 key 的所有字段和值

语法:hgetall key 示例:

1
2
3
4
5
127.0.0.1:6379> hgetall myhash
1) "k1"
2) "v1"
3) "k2"
4) "v2"

某个浮点值累加计算

语法:hincrbyfloat key field increment 示例:

1
2
127.0.0.1:6379> hincrbyfloat myhash k3 2.2
"9.2"

查询元素是否存在

语法:hexists key field 示例:

1
2
127.0.0.1:6379> hexists myhash key1
(integer) 1

查询元素个数

语法:hlen key 示例:

1
2
127.0.0.1:6379> hlen myhash
(integer) 2

10 列表使用与内部实现原理

列表类型 (List) 是一个使用链表结构存储的有序结构,它的元素插入会按照先后顺序存储到链表结构中,因此它的元素操作 (插入\删除) 时间复杂度为 O(1),所以相对来说速度还是比较快的,但它的查询时间复杂度为 O(n),因此查询可能会比较慢。

1 基础使用

列表类型的使用相对来说比较简单,对它的操作就相当操作一个没有任何 key 值的 value 集合,如下图所示:

image-20240604105813094

1)给列表添加一个或多个元素

语法:lpush key value [value …] 示例:

1
2
127.0.0.1:6379> lpush list 1 2 3
(integer) 3

2)给列表尾部添加一个或多个元素

语法:rpush key value [value …] 示例:

1
2
127.0.0.1:6379> rpush list2 1 2 3
(integer) 3

3)返回列表指定区间内的元素

语法:lrange key start stop 示例:

1
2
3
4
5
6
7
8
127.0.0.1:6379> lrange list 0 -1
"3"
"2"
"1"
127.0.0.1:6379> lrange list2 0 -1
"1"
"2"
"3"

其中 -1 代表列表中的最后一个元素。

4)获取并删除列表的第一个元素

语法:lpop key 示例:

1
2
3
4
5
6
7
8
9
10
11
127.0.0.1:6379> lrange list 0 -1
1) "d"
2) "c"
3) "b"
4) "a"
127.0.0.1:6379> lpop list
"d"
127.0.0.1:6379> lrange list 0 -1
1) "c"
2) "b"
3) "a"

5)获取并删除列表的最后一个元素

语法:rpop key 示例:

1
2
3
4
5
6
7
8
9
127.0.0.1:6379> lrange list 0 -1
1) "c"
2) "b"
3) "a"
127.0.0.1:6379> rpop list
"a"
127.0.0.1:6379> lrange list 0 -1
1) "c"
2) "b"

6)根据下标获取对应的元素

语法:lindex key index 示例:

1
2
3
4
127.0.0.1:6379> rpush list3 a b c
(integer) 3
127.0.0.1:6379> lindex list3 0
"a"

更多操作命令,详见附录部分。

2 代码实战

下面来看列表类型在 Java 中的使用,同样先添加 Jedis 框架,使用代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ListExample {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 声明 Redis key
final String REDISKEY = "list";
// 在头部插入一个或多个元素
Long lpushResult = jedis.lpush(REDISKEY, "Java", "Sql");
System.out.println(lpushResult); // 输出:2
// 获取第 0 个元素的值
String idValue = jedis.lindex(REDISKEY, 0);
System.out.println(idValue); // 输出:Sql
// 查询指定区间的元素
List<String> list = jedis.lrange(REDISKEY, 0, -1);
System.out.println(list); // 输出:[Sql, Java]
// 在元素 Java 前面添加 MySQL 元素
jedis.linsert(REDISKEY, ListPosition.BEFORE, "Java", "MySQL");
System.out.println(jedis.lrange(REDISKEY, 0, -1)); // 输出:[Sql, MySQL, Java]
jedis.close();
}
}

程序运行结果如下:

2 Sql [Sql, Java] [Sql, MySQL, Java]

3 内部实现

我们先用 debug encoding key 来查看列表类型的内部存储类型,如下所示:

1
2
127.0.0.1:6379> object encoding list
"quicklist"

从结果可以看出,列表类型的底层数据类型是 quicklist。

quicklist (快速列表) 是 Redis 3.2 引入的数据类型,早期的列表类型使用的是ziplist (压缩列表) 和双向链表组成的,Redis 3.2 改为用 quicklist 来存储列表元素。

我们来看下 quicklist 的实现源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
typedef struct quicklist { // src/quicklist.h
quicklistNode *head;
quicklistNode *tail;
unsigned long count; /* ziplist 的个数 */
unsigned long len; /* quicklist 的节点数 */
unsigned int compress : 16; /* LZF 压缩算法深度 */
//...
} quicklist;
typedef struct quicklistNode {
struct quicklistNode *prev;
struct quicklistNode *next;
unsigned char *zl; /* 对应的 ziplist */
unsigned int sz; /* ziplist 字节数 */
unsigned int count : 16; /* ziplist 个数 */
unsigned int encoding : 2; /* RAW==1 or LZF==2 */
unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */
unsigned int recompress : 1; /* 该节点先前是否被压缩 */
unsigned int attempted_compress : 1; /* 节点太小无法压缩 */
//...
} quicklistNode;
typedef struct quicklistLZF {
unsigned int sz;
char compressed[];
} quicklistLZF;

从以上源码可以看出 quicklist 是一个双向链表,链表中的每个节点实际上是一个 ziplist,它们的结构如下图所示:

image-20240604105827673

ziplist 作为 quicklist 的实际存储结构,它本质是一个字节数组,ziplist 数据结构如下图所示:

image-20240604105837234

【连锁更新问题】

其中的字段含义如下:

  • zlbytes:压缩列表字节长度,占 4 字节;
  • zltail:压缩列表尾元素相对于起始元素地址的偏移量,占 4 字节;
  • zllen:压缩列表的元素个数;
  • entryX:压缩列表存储的所有元素,可以是字节数组或者是整数;
  • zlend:压缩列表的结尾,占 1 字节。

4 源码解析

下面我们来看一下更多关于列表类型的源码实现。

1)添加功能源码分析

quicklist 添加操作对应函数是 quicklistPush,源码如下:

1
2
3
4
5
6
7
8
9
10
void quicklistPush(quicklist *quicklist, void *value, const size_t sz,
int where) {
if (where == QUICKLIST_HEAD) {
// 在列表头部添加元素
quicklistPushHead(quicklist, value, sz);
} else if (where == QUICKLIST_TAIL) {
// 在列表尾部添加元素
quicklistPushTail(quicklist, value, sz);
}
}

以 quicklistPushHead 为例,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_head = quicklist->head;
if (likely(
_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
// 在头部节点插入元素
quicklist->head->zl =
ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
quicklistNodeUpdateSz(quicklist->head);
} else {
// 头部节点不能继续插入,需要新建 quicklistNode、ziplist 进行插入
quicklistNode *node = quicklistCreateNode();
node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
quicklistNodeUpdateSz(node);
// 将新建的 quicklistNode 插入到 quicklist 结构中
_quicklistInsertNodeBefore(quicklist, quicklist->head, node);
}
quicklist->count++;
quicklist->head->count++;
return (orig_head != quicklist->head);
}

quicklistPushHead 函数的执行流程,先判断 quicklist 的 head 节点是否可以插入数据,如果可以插入则使用 ziplist 的接口进行插入,否则就新建 quicklistNode 节点进行插入。

函数的入参是待插入的 quicklist,还有需要插入的值 value 以及他的大小 sz。

函数的返回值为 int,0 表示没有新建 head,1 表示新建了 head。 quicklistPushHead 执行流程,如下图所示:

image-20240604105930222

2)删除功能源码分析

quicklist 元素删除分为两种情况:单一元素删除和区间元素删除,它们都位于 src/quicklist.c 文件中。

① 单一元素删除

单一元素的删除函数是 quicklistDelEntry,源码如下:

1
2
3
4
5
6
7
8
void quicklistDelEntry(quicklistIter *iter, quicklistEntry *entry) {
quicklistNode *prev = entry->node->prev;
quicklistNode *next = entry->node->next;
// 删除指定位置的元素
int deleted_node = quicklistDelIndex((quicklist *)entry->quicklist,
entry->node, &entry->zi);
//...
}

可以看出 quicklistDelEntry 函数的底层,依赖 quicklistDelIndex 函数进行元素删除。

② 区间元素删除

区间元素删除的函数是 quicklistDelRange,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// start 表示开始删除的下标,count 表示要删除的个数
int quicklistDelRange(quicklist *quicklist, const long start,
const long count) {
if (count <= 0)
return 0;
unsigned long extent = count;
if (start >= 0 && extent > (quicklist->count - start)) {
// 删除的元素个数大于已有元素
extent = quicklist->count - start;
} else if (start < 0 && extent > (unsigned long)(-start)) {
// 删除指定的元素个数
extent = -start; /* c.f. LREM -29 29; just delete until end. */
}
//...
// extent 为剩余需要删除的元素个数,
while (extent) {
// 保存下个 quicklistNode,因为本节点可能会被删除
quicklistNode *next = node->next;
unsigned long del;
int delete_entire_node = 0;
if (entry.offset == 0 && extent >= node->count) {
// 删除整个 quicklistNode
delete_entire_node = 1;
del = node->count;
} else if (entry.offset >= 0 && extent >= node->count) {
// 删除本节点的所有元素
del = node->count - entry.offset;
} else if (entry.offset < 0) {
// entry.offset<0 表示从后向前,相反则表示从前向后剩余的元素个数
del = -entry.offset;
if (del > extent)
del = extent;
} else {
// 删除本节点部分元素
del = extent;
}
D("[%ld]: asking to del: %ld because offset: %d; (ENTIRE NODE: %d), "
"node count: %u",
extent, del, entry.offset, delete_entire_node, node->count);
if (delete_entire_node) {
__quicklistDelNode(quicklist, node);
} else {
quicklistDecompressNodeForUse(node);
node->zl = ziplistDeleteRange(node->zl, entry.offset, del);
quicklistNodeUpdateSz(node);
node->count -= del;
quicklist->count -= del;
quicklistDeleteIfEmpty(quicklist, node);
if (node)
quicklistRecompressOnly(quicklist, node);
}
// 剩余待删除元素的个数
extent -= del;
// 下个 quicklistNode
node = next;
// 从下个 quicklistNode 起始位置开始删除
entry.offset = 0;
}
return 1;
}

从上面代码可以看出,quicklist 在区间删除时,会先找到 start 所在的 quicklistNode,计算删除的元素是否小于要删除的 count,如果不满足删除的个数,则会移动至下一个 quicklistNode 继续删除,依次循环直到删除完成为止。

quicklistDelRange 函数的返回值为 int 类型,当返回 1 时表示成功的删除了指定区间的元素,返回 0 时表示没有删除任何元素。

3)更多源码

除了上面介绍的几个常用函数之外,还有一些更多的函数,例如:

  • quicklistCreate:创建 quicklist;
  • quicklistInsertAfter:在某个元素的后面添加数据;
  • quicklistInsertBefore:在某个元素的前面添加数据;
  • quicklistPop:取出并删除列表的第一个或最后一个元素;
  • quicklistReplaceAtIndex:替换某个元素。

5 使用场景

列表的典型使用场景有以下两个:

  • 消息队列:列表类型可以使用 rpush 实现先进先出的功能,同时又可以使用 lpop 轻松的弹出(查询并删除)第一个元素,所以列表类型可以用来实现消息队列;
  • 文章列表:对于博客站点来说,当用户和文章都越来越多时,为了加快程序的响应速度,我们可以把用户自己的文章存入到 List 中,因为 List 是有序的结构,所以这样又可以完美的实现分页功能,从而加速了程序的响应速度。

6 小结

通过本文我们可以知道列表类型并不是简单的双向链表,而是采用了 quicklist 的数据结构对数据进行存取,quicklist 是 Redis 3.2 新增的数据类型,它的底层采取的是压缩列表加双向链表的存储结构,quicklist 为了存储更多的数据,会对每个 quicklistNode 节点进行压缩,这样就可以有效的存储更多的消息队列或者文章的数据了。

11 附录:更多列表操作命令

在某值之前/之后添加某个元素

语法:linsert key before|after pivot value 示例:

1
2
3
4
5
6
7
127.0.0.1:6379> linsert list3 before b A
(integer) 4
127.0.0.1:6379> lrange list3 0 -1
"a"
"A"
"b"
"c"

根据下标修改元素

语法:lset key index value 示例*:*

1
2
3
4
5
6
127.0.0.1:6379> lindex list3 0
"a"
127.0.0.1:6379> lset list3 0 A
OK
127.0.0.1:6379> lindex list3 0
"A"

根据下标删除元素

语法:ltrim key start stop 示例:

1
2
3
4
5
6
7
127.0.0.1:6379> lpush list a b c
(integer) 3
127.0.0.1:6379> ltrim list 0 1
OK
127.0.0.1:6379> lrange list 0 -1
1) "c"
2) "b"

查询列表的长度

语法:llen key 示例:

1
2
127.0.0.1:6379> llen list
(integer) 2

删除指定个数的元素

语法:lrem key count value 示例:

1
2
3
4
5
6
7
8
9
10
127.0.0.1:6379> lpush list a a b b c c
(integer) 6
127.0.0.1:6379> lrem list 2 a
(integer) 2
127.0.0.1:6379> lrem list 1 b
(integer) 1
127.0.0.1:6379> lrange list 0 -1
1) "c"
2) "c"
3) "b"

12 集合使用与内部实现原理

集合类型 (Set) 是一个无序并唯一的键值集合。

之所以说集合类型是一个无序集合,是因为它的存储顺序不会按照插入的先后顺序进行存储,如下代码所示:

1
2
3
4
5
6
127.0.0.1:6379> sadd myset v2 v1 v3 #插入数据 v2、v1、v3 
(integer) 3
127.0.0.1:6379> smembers myset #查询数据
1) "v1"
2) "v3"
3) "v2"

从上面代码执行结果可以看出,myset 的存储顺序并不是以插入的先后顺序进行存储的。

集合类型和列表类型的区别如下:

  • 列表可以存储重复元素,集合只能存储非重复元素;
  • 列表是按照元素的先后顺序存储元素的,而集合则是无序方式存储元素的。

1 基础使用

集合类型的功能比列表类型丰富一些,集合类型可以用来统计多个集合的交集、错集和并集,如下代码所示。

1)添加一个或多个元素

语法:sadd key member [member …] 示例:

1
2
127.0.0.1:6379> sadd myset v1 v2 v3
(integer) 3

2)查询集合所有元素

语法:smembers key 示例:

1
2
3
4
127.0.0.1:6379> smembers myset
1) "v1"
2) "v3"
3) "v2"

3)查询集合的成员数量

语法:scard key 示例:

1
2
127.0.0.1:6379> scard myset
(integer) 3

4)查询集合中是否包含某个元素

语法:sismember key member 示例:

1
2
3
4
127.0.0.1:6379> sismember myset v1
(integer) 1
127.0.0.1:6379> sismember myset v4
(integer) 0

5)从一个集合中移动一个元素到另一个集合

语法:smove source destination member 示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
127.0.0.1:6379> smembers myset
1) "v1"
2) "v3"
3) "v2"
127.0.0.1:6379> smembers myset2
1) "v1"
2) "v8"
127.0.0.1:6379> smove myset myset2 v3
(integer) 1
127.0.0.1:6379> smembers myset2
1) "v1"
2) "v8"
3) "v3"
127.0.0.1:6379> smembers myset
1) "v1"
2) "v2"

6)移除集合中一个或多个元素

语法:srem key member [member …] 示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
127.0.0.1:6379> smembers myset
1) "v4"
2) "v1"
3) "v3"
4) "v2"
5) "v5"
127.0.0.1:6379> srem myset v5
(integer) 1
127.0.0.1:6379> smembers myset
1) "v3"
2) "v2"
3) "v1"
4) "v4"

注意:使用 srem 指令,不存在的元素将会被忽略。 更多操作命令,详见附录部分。

2 代码实战

下面来看集合类型在 Java 中的使用,同样先添加 Jedis 框架,使用代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import redis.clients.jedis.Jedis;
import java.util.Set;

public class SetExample {
public static void main(String[] args) {
Jedis jedis = new Jedis("xxx.xxx.xxx.xxx", 6379);
jedis.auth("xxx");
// 创建集合并添加元素
jedis.sadd("set1", "java", "golang");
// 查询集合中的所有元素
Set<String> members = jedis.smembers("set1");
System.out.println(members); // 输出:[java, golang]
// 查询集合中的元素数量
System.out.println(jedis.scard("set1"));
// 移除集合中的一个元素
jedis.srem("set1", "golang");
System.out.println(jedis.smembers("set1")); // 输出:[java]
// 创建集合 set2 并添加元素
jedis.sadd("set2", "java", "golang");
// 查询两个集合中交集
Set<String> inters = jedis.sinter("set1", "set2");
System.out.println(inters); // 输出:[java]
// 查询两个集合中并集
Set<String> unions = jedis.sunion("set1", "set2");
System.out.println(unions); // 输出:[java,golang]
// 查询两个集合的错集
Set<String> diffs = jedis.sdiff("set2", "set1");
System.out.println(diffs); // 输出:[golang]
}
}

3 内部实现

集合类型是由 intset (整数集合) 或 hashtable (普通哈希表) 组成的。当集合类型以 hashtable 存储时,哈希表的 key 为要插入的元素值,而哈希表的 value 则为 Null,如下图所示:

image-20240604110227735

当集合中所有的值都为整数时,Redis 会使用 intset 结构来存储,如下代码所示:

1
2
3
4
127.0.0.1:6379> sadd myset 1 9 3 -2
(integer) 4
127.0.0.1:6379> object encoding myset
"intset"

从上面代码可以看出,当所有元素都为整数时,集合会以 intset 结构进行(数据)存储。 当发生以下两种情况时,会导致集合类型使用 hashtable 而非 intset 存储: 1)当元素的个数超过一定数量时,默认是 512 个,该值可通过命令 set-max-intset-entries xxx 来配置。 2)当元素为非整数时,集合将会使用 hashtable 来存储,如下代码所示:

1
2
3
4
127.0.0.1:6379> sadd myht "redis" "db"
(integer) 2
127.0.0.1:6379> object encoding myht
"hashtable"

从上面代码可以看出,当元素为非整数时,集合会使用 hashtable 进行存储

4 源码解析

集合源码在 t_set.c 文件中,核心源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/* 
* 添加元素到集合
* 如果当前值已经存在,则返回 0 不作任何处理,否则就添加该元素,并返回 1。
*/
int setTypeAdd(robj *subject, sds value) {
long long llval;
if (subject->encoding == OBJ_ENCODING_HT) { // 字典类型
dict *ht = subject->ptr;
dictEntry *de = dictAddRaw(ht,value,NULL);
if (de) {
// 把 value 作为字典到 key,将 Null 作为字典到 value,将元素存入到字典
dictSetKey(ht,de,sdsdup(value));
dictSetVal(ht,de,NULL);
return 1;
}
} else if (subject->encoding == OBJ_ENCODING_INTSET) { // inset 数据类型
if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
uint8_t success = 0;
subject->ptr = intsetAdd(subject->ptr,llval,&success);
if (success) {
// 超过 inset 的最大存储数量,则使用字典类型存储
if (intsetLen(subject->ptr) > server.set_max_intset_entries)
setTypeConvert(subject,OBJ_ENCODING_HT);
return 1;
}
} else {
// 转化为整数类型失败,使用字典类型存储
setTypeConvert(subject,OBJ_ENCODING_HT);

serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);
return 1;
}
} else {
// 未知编码(类型)
serverPanic("Unknown set encoding");
}
return 0;
}

以上这些代码验证了,我们上面所说的内容,当元素都为整数并且元素的个数没有到达设置的最大值时,键值的存储使用的是 intset 的数据结构,反之到元素超过了一定的范围,又或者是存储的元素为非整数时,集合会选择使用 hashtable 的数据结构进行存储。

5 使用场景

集合类型的经典使用场景如下:

  • 微博关注我的人和我关注的人都适合用集合存储,可以保证人员不会重复;
  • 中奖人信息也适合用集合类型存储,这样可以保证一个人不会重复中奖。

6 小结

通过本文我们知道了,集合类型是由整数集合 (intset) 或者是哈希表 (hashtable) 组成的,集合类型比较适合用来数据去重和保障数据的唯一性,除此之外,集合类型还可以用来统计多个集合的交集、错集和并集 (见附录)。当我们存储的数据是无序并且需要去重的情况下,比较适合使用集合类型进行存储。

13 附录:更多集合操作命令

移除并返回集合中的一个随机元素

语法:spop key [count] 示例:

1
2
3
4
5
6
7
127.0.0.1:6379> smembers myset
1) "v1"
2) "v2"
127.0.0.1:6379> spop myset 1
1) "v2"
127.0.0.1:6379> smembers myset
1) "v1"

随机返回集合中指定数量的元素列表

语法:srandmember key [count] 示例:

1
2
3
127.0.0.1:6379> srandmember myset 2
1) "v4"
2) "v2"

返回一个集合或多个集合的交集

语法:sinter key [key …] 示例:

1
2
3
4
5
6
7
8
9
127.0.0.1:6379> smembers myset
1) "v1"
2) "v3"
3) "v2"
127.0.0.1:6379> smembers myset2
1) "v1"
2) "v8"
127.0.0.1:6379> sinter myset myset2
1) "v1"

把集合的交集复制到新的集合中

语法:sinterstore destination key [key …] 示例:

1
2
3
4
5
6
7
8
9
10
11
127.0.0.1:6379> smembers myset
1) "v1"
2) "v3"
3) "v2"
127.0.0.1:6379> smembers myset2
1) "v1"
2) "v8"
127.0.0.1:6379> sinterstore myset3 myset myset2
(integer) 1
127.0.0.1:6379> smembers myset3
1) "v1"

命令解析:从以上代码可以看出,我们把集合 myset 和 集合 myset2 的合集元素 v1 复制到了新的集合 myset3 中,但 v1 并不会从原有集合中移除。

查询一个或多个集合的并集

语法:sunion key [key …] 示例:

1
2
3
4
5
6
7
127.0.0.1:6379> smembers group1
1) "java"
127.0.0.1:6379> smembers group2
1) "golang"
127.0.0.1:6379> sunion group1 group2
1) "java"
2) "golang"

把一个或多个集合的并集复制到新集合中

语法:sunionstore destination key [key …] 示例:

1
2
3
4
5
6
7
8
9
127.0.0.1:6379> smembers group1
1) "java"
127.0.0.1:6379> smembers group2
1) "golang"
127.0.0.1:6379> sunionstore group3 group1 group2
(integer) 2
127.0.0.1:6379> smembers group3
1) "java"
2) "golang"

注意:只是把一个或多个集合的并集复制到新集合中,并不会在原集合中删除复制的元素。

查询一个或多个集合的错集

语法:sdiff key [key …] 示例:

1
2
3
4
5
6
7
127.0.0.1:6379> smembers group1
1) "java"
2) "golang"
127.0.0.1:6379> smembers group2
1) "golang"
127.0.0.1:6379> sdiff group1 group2
1) "java"

注意:执行命令时集合的先后顺序会影响返回的结果,如下所示:

1
2
3
4
127.0.0.1:6379> sdiff group1 group2
1) "java"
127.0.0.1:6379> sdiff group2 group1
(empty list or set)

这是因为查询错集是以第一个集合为主的,当第二个元素包含第一个元素时,查询的错集结果就是空。

把一个或多个集合的错集复制到新集合

语法:sdiffstore destination key [key …] 示例:

1
2
3
4
5
6
7
8
9
127.0.0.1:6379> smembers group1
1) "java"
2) "golang"
127.0.0.1:6379> smembers group2
1) "golang"
127.0.0.1:6379> sdiffstore group3 group1 group2
(integer) 1
127.0.0.1:6379> smembers group3
1) "java"

14 有序集合使用与内部实现原理

有序集合类型 (Sorted Set) 相比于集合类型多了一个排序属性 score(分值),对于有序集合 ZSet 来说,每个存储元素相当于有两个值组成的,一个是有序结合的元素值,一个是排序值。有序集合的存储元素值也是不能重复的,但分值是可以重复的。

当我们把学生的成绩存储在有序集合中时,它的存储结构如下图所示:

image-20240604110443985

下面我们先从有序集合的使用开始说起。

1 基础使用

1)添加一个或多个元素

语法:zadd key [NX|XX] [CH] [INCR] score member [score member …] 示例:

1
2
3
4
127.0.0.1:6379> zadd zset1 10 java
(integer) 1
127.0.0.1:6379> zadd zset1 3 golang 4 sql 1 redis
(integer) 3

可以看出有序集合的添加是 zadd 键值 分值1 元素值1 分值2 元素值2 的形式添加的。

2)查询所有元素列表

语法:zrange key start stop [WITHSCORES] 示例:

1
2
3
4
127.0.0.1:6379> zrange zset 0 -1
1) "redis"
2) "mysql"
3) "java"

其中 -1 表示最后一个元素,查询结果包含开始和结束元素。

3)删除一个或多个元素(根据元素值)

语法:zrem key member [member …] 示例:

1
2
3
4
5
6
7
8
9
10
127.0.0.1:6379> zrangebyscore zset1 0 -1 #查询所有元素
1) "golang"
2) "redis"
3) "sql"
4) "java"
127.0.0.1:6379> zrem zset1 redis sql #删除元素:reids、sql
(integer) 2
127.0.0.1:6379> zrange zset1 0 -1 #查询所有元素
1) "golang"
2) "java"

删除命令中如果包含了不存在的元素,并不会影响命令的正常执行,不存在的元素将会被忽略。

4)查询某元素的 score 值

语法:zscore key member 示例:

1
2
127.0.0.1:6379> zscore zset1 redis
"1"

5)查询 score 区间内元素

语法:zrangebyscore key min max [WITHSCORES] [LIMIT offset count] 示例:

1
2
3
4
5
127.0.0.1:6379> zrangebyscore zset1 3 10
1) "golang"
2) "redis"
3) "sql"
4) "java"

6)查询某元素排名

语法:zrank key member 示例:

1
2
3
4
5
6
127.0.0.1:6379> zadd zset 5 redis 10 java 8 mysql #创建有序集合
(integer) 3
127.0.0.1:6379> zrank zset java #查询元素排序
(integer) 2
127.0.0.1:6379> zrank zset redis
(integer) 0

可以看出,排名是从 0 开始的,排名可以理解为元素排序后的下标值。

更多操作命令,详见附录部分。

2 代码实战

下面来看有序集合在 Java 中的使用,同样先添加 Jedis 框架,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import redis.clients.jedis.Jedis;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class ZSetExample {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
Map<String, Double> map = new HashMap<>();
map.put("小明", 80.5d);
map.put("小红", 75d);
map.put("老王", 85d);
// 为有序集合(ZSet)添加元素
jedis.zadd("grade", map);
// 查询分数在 80 分到 100 分之间的人(包含 80 分和 100 分)
Set<String> gradeSet = jedis.zrangeByScore("grade", 80, 100);
System.out.println(gradeSet); // 输出:[小明, 老王]
// 查询小红的排名(排名从 0 开始)
System.out.println(jedis.zrank("grade", "小明")); // 输出:1
// 从集合中移除老王
jedis.zrem("grade", "老王");
// 查询有序集合中的所有元素(根据排名从小到大)
Set<String> range = jedis.zrange("grade", 0, -1);
System.out.println(range); // 输出:[小红, 小明]
// 查询有序集合中的所有元素(根据 score 从小到大)
Set<String> rangeByScore = jedis.zrangeByScore("grade", 0, 100);
System.out.println(rangeByScore);
}
}

3 内部实现

有序集合是由 ziplist (压缩列表) 或 skiplist (跳跃表) 组成的。

1)ziplist

当数据比较少时,有序集合使用的是 ziplist 存储的,如下代码所示:

1
2
3
4
127.0.0.1:6379> zadd myzset 1 db 2 redis 3 mysql
(integer) 3
127.0.0.1:6379> object encoding myzset
"ziplist"

从结果可以看出,有序集合把 myset 键值对存储在 ziplist 结构中了。 有序集合使用 ziplist 格式存储必须满足以下两个条件:

  • 有序集合保存的元素个数要小于 128 个;
  • 有序集合保存的所有元素成员的长度都必须小于 64 字节。

如果不能满足以上两个条件中的任意一个,有序集合将会使用 skiplist 结构进行存储。 接下来我们来测试以下,当有序集合中某个元素长度大于 64 字节时会发生什么情况? 代码如下:

1
2
3
4
5
6
7
8
127.0.0.1:6379> zadd zmaxleng 1.0 redis
(integer) 1
127.0.0.1:6379> object encoding zmaxleng
"ziplist"
127.0.0.1:6379> zadd zmaxleng 2.0 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
(integer) 1
127.0.0.1:6379> object encoding zmaxleng
"skiplist"

通过以上代码可以看出,当有序集合保存的所有元素成员的长度大于 64 字节时,有序集合就会从 ziplist 转换成为 skiplist。

小贴士:可以通过配置文件中的 zset-max-ziplist-entries(默认 128)和 zset-max-ziplist-value(默认 64)来设置有序集合使用 ziplist 存储的临界值。

2)skiplist

skiplist 数据编码底层是使用 zset 结构实现的,而 zset 结构中包含了一个字典和一个跳跃表,源码如下:

1
2
3
4
typedef struct zset {
dict *dict;
zskiplist *zsl;
} zset;

更多关于跳跃表的源码实现,会在后面的章节详细介绍。

① 跳跃表实现原理

跳跃表的结构如下图所示:

image-20240604110525193

根据以上图片展示,当我们在跳跃表中查询值 32 时,执行流程如下:

  • 从最上层开始找,1 比 32 小,在当前层移动到下一个节点进行比较;
  • 7 比 32 小,当前层移动下一个节点比较,由于下一个节点指向 Null,所以以 7 为目标,移动到下一层继续向后比较;
  • 18 小于 32,继续向后移动查找,对比 77 大于 32,以 18 为目标,移动到下一层继续向后比较;
  • 对比 32 等于 32,值被顺利找到。

从上面的流程可以看出,跳跃表会想从最上层开始找起,依次向后查找,如果本层的节点大于要找的值,或者本层的节点为 Null 时,以上一个节点为目标,往下移一层继续向后查找并循环此流程,直到找到该节点并返回,如果对比到最后一个元素仍未找到,则返回 Null。

② 为什么是跳跃表?而非红黑树?

因为跳跃表的性能和红黑树基本相近,但却比红黑树更好实现,所有 Redis 的有序集合会选用跳跃表来实现存储。

4 使用场景

有序集合的经典使用场景如下:

  • 学生成绩排名
  • 粉丝列表,根据关注的先后时间排序

5 小结

通过本文的学习我们了解到,有序集合具有唯一性和排序的功能,排序功能是借助分值字段 score 实现的,score 字段不仅可以实现排序功能,还可以实现数据的赛选与过滤的功能。我们还了解到了有序集合是由 压缩列表 (ziplist) 或跳跃列表 (skiplist) 来存储的,当元素个数小于 128 个,并且所有元素的值都小于 64 字节时,有序集合会采取 ziplist 来存储,反之则会用 skiplist 来存储,其中 skiplist 是从上往下、从前往后进行元素查找的,相比于传统的普通列表,可能会快很多,因为普通列表只能从前往后依次查找。

15 附录:更多有序集合操作命令

查询有序集合的总个数

语法:zcard key 示例:

1
2
127.0.0.1:6379> zcard zset1
(integer) 4

查询 score 区间内的元素个数

语法:zcount key min max 示例:

1
2
127.0.0.1:6379> zcount zset1 0 10
(integer) 4

累加元素的 score 值

语法:zincrby key increment member 示例:

1
2
3
4
5
6
127.0.0.1:6379> zscore zset1 redis #查询 zset1 的 score 值
"1"
127.0.0.1:6379> zincrby zset1 2 redis #累加 score 值
"3"
127.0.0.1:6379> zscore zset1 redis
"3"

查询某元素倒序排名

语法:zrevrank key member 示例:

1
2
3
4
5
6
7
127.0.0.1:6379> zrevrank zset1 python #倒序查询
(integer) 0
127.0.0.1:6379> zrange zset1 0 -1 #正序列表
1) "redis"
2) "java"
3) "golang"
4) "python"

根据排名删除元素

语法:zremrangebyrank key start stop 示例:

1
2
3
4
5
6
7
8
9
127.0.0.1:6379> zrange zset1 0 -1 #查询所有元素
1) "redis"
2) "java"
3) "golang"
4) "python"
127.0.0.1:6379> zremrangebyrank zset1 0 2 #删除元素
(integer) 3
127.0.0.1:6379> zrange zset1 0 -1 #查询所有元素
1) "python"

删除 score 区间内的元素

语法:zremrangebyscore key min max 示例:

1
2
3
4
5
6
127.0.0.1:6379> zscore zset1 python
"4"
127.0.0.1:6379> zremrangebyscore zset1 4 5
(integer) 1
127.0.0.1:6379> zscore zset1 python
(nil)

复制交集元素到新集合

语法:zinterstore destination numkeys key [key …] [WEIGHTS weight] [AGGREGATE SUM|MIN|MA 参数 numkeys 表示需要几个集合参与查询。 示例:

1
2
3
4
5
6
7
8
9
10
11
12
127.0.0.1:6379> zrange zset1 0 -1
1) "redis"
2) "java"
3) "golang"
4) "python"
127.0.0.1:6379> zrange zset2 0 -1
1) "redis"
2) "db"
127.0.0.1:6379> zinterstore zset3 2 zset1 zset2
(integer) 1
127.0.0.1:6379> zrange zset3 0 -1
1) "redis"

复制并集元素到新集合

语法:zunionstore destination numkeys key [key …] [WEIGHTS weight] [AGGREGATE SUM|MIN|MA 示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
127.0.0.1:6379> zrange zset1 0 -1
1) "redis"
2) "java"
3) "golang"
4) "python"
127.0.0.1:6379> zrange zset2 0 -1
1) "redis"
2) "db"
127.0.0.1:6379> zunionstore zset3 2 zset1 zset2
(integer) 5
127.0.0.1:6379> zrange zset3 0 -1
1) "java"
2) "golang"
3) "redis"
4) "python"
5) "db"

【有序集合:字典+跳表

集合对象的编码可以是 intset或者 hashtable 。

哈希对象的编码可以是 ziplist或者 hashtable

列表:双向链表/压缩列表

字符串对象:字符串对象

16 Redis 事务深入解析

作为关系型数据库中一项非常重要的基础功能——事务,在 Redis 中是如何处理并使用的?

前言

事务指的是提供一种将多个命令打包,一次性按顺序地执行的机制,并且保证服务器只有在执行完事务中的所有命令后,才会继续处理此客户端的其他命令。

事务也是其他关系型数据库所必备的基础功能,以支付的场景为例,正常情况下只有正常消费完成之后,才会减去账户余额。但如果没有事务的保障,可能会发生消费失败了,但依旧会把账户的余额给扣减了,我想这种情况应该任何人都无法接受吧?所以事务是数据库中一项非常重要的基础功能。

事务基本使用

事务在其他语言中,一般分为以下三个阶段:

  • 开启事务——Begin Transaction
  • 执行业务代码,提交事务——Commit Transaction
  • 业务处理中出现异常,回滚事务——Rollback Transaction

以 Java 中的事务执行为例:

1
2
3
4
5
6
7
8
9
10
// 开启事务
begin();
try {
//......
// 提交事务
commit();
} catch(Exception e) {
// 回滚事务
rollback();
}

Redis 中的事务从开始到结束也是要经历三个阶段:

  • 开启事务
  • 命令入列
  • 执行事务/放弃事务

其中,开启事务使用 multi 命令,事务执行使用 exec 命令,放弃事务使用 discard 命令。

开启事务

multi 命令用于开启事务,实现代码如下:

1
2
> multi
OK

multi 命令可以让客户端从非事务模式状态,变为事务模式状态。

注意[※]multi 命令不能嵌套使用,如果已经开启了事务的情况下,再执行 multi 命令,会提示如下错误:

1
(error) ERR MULTI calls can not be nested

执行效果,如下代码所示:

1
2
3
4
127.0.0.1:6379> multi
OK
127.0.0.1:6379> multi
(error) ERR MULTI calls can not be nested

当客户端是非事务状态时,使用 multi 命令,客户端会返回结果 OK,如果客户端已经是事务状态,再执行 multi 命令会 multi 命令不能嵌套的错误,但不会终止客户端为事务的状态,如下图所示:

image-20240604163012445

命令入列

客户端进入事务状态之后,执行的所有常规 Redis 操作命令(非触发事务执行或放弃和导致入列异常的命令)会依次入列,命令入列成功后会返回 QUEUED,如下代码所示:

1
2
3
4
5
6
> multi
OK
> set k v
QUEUED
> get k
QUEUED

执行流程如下图所示:

image-20240604163052756

注意:命令会按照先进先出(FIFO)的顺序出入列,也就是说事务会按照命令的入列顺序,从前往后依次执行。

执行事务/放弃事务

执行事务的命令是 exec,放弃事务的命令是 discard。

执行事务示例代码如下:

1
2
3
4
5
6
7
8
> multi
OK
> set k v2
QUEUED
> exec
1) OK
> get k
"v2"

放弃事务示例代码如下:

1
2
3
4
5
6
7
8
> multi
OK
> set k v3
QUEUED
> discard
OK
> get k
"v2"

执行流程如下图所示:

image-20240604163121196

事务错误&回滚

【※】事务执行中的错误分为以下三类:

  • 执行时才会出现的错误(简称:执行时错误);
  • 入列时错误,不会终止整个事务;
  • 入列时错误,会终止整个事务。

执行时错误

示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
> get k
"v"
> multi
OK
> set k v2
QUEUED
> expire k 10s
QUEUED
> exec
1) OK
2) (error) ERR value is not an integer or out of range
> get k
"v2"

执行命令解释如下图所示:

image-20240604163213007

从以上结果可以看出,即使事务队列中某个命令在执行期间发生了错误,事务也会继续执行,直到事务队列中所有命令执行完成。

入列错误不会导致事务结束

示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
> get k
"v"
> multi
OK
> set k v2
QUEUED
> multi
(error) ERR MULTI calls can not be nested
> exec
1) OK
> get k
"v2"

执行命令解释如下图所示:

image-20240604163235276

可以看出,重复执行 multi 会导致入列错误,但不会终止事务,最终查询的结果是事务执行成功了。除了重复执行 multi 命令,还有在事务状态下执行 watch 也是同样的效果,下文会详细讲解关于 watch 的内容。

入列错误导致事务结束

示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
> get k
"v2"
> multi
OK
> set k v3
QUEUED
> set k
(error) ERR wrong number of arguments for 'set' command
> exec
(error) EXECABORT Transaction discarded because of previous errors.
> get k
"v2"

执行命令解释如下图所示:

image-20240604163251187

为什么不支持事务回滚?

Redis 官方文档的解释如下:

If you have a relational databases background, the fact that Redis commands can fail during a transaction, but still Redis will execute the rest of the transaction instead of rolling back, may look odd to you.

However there are good opinions for this behavior:

  • Redis commands can fail only if called with a wrong syntax (and the problem is not detectable during the command queueing), or against keys holding the wrong data type: this means that in practical terms a failing command is the result of a programming errors, and a kind of error that is very likely to be detected during development, and not in production.
  • Redis is internally simplified and faster because it does not need the ability to roll back.

An argument against Redis point of view is that bugs happen, however it should be noted that in general the roll back does not save you from programming errors. For instance if a query increments a key by 2 instead of 1, or increments the wrong key, there is no way for a rollback mechanism to help. Given that no one can save the programmer from his or her errors, and that the kind of errors required for a Redis command to fail are unlikely to enter in production, we selected the simpler and faster approach of not supporting roll backs on errors.

大概的意思是,作者不支持事务回滚的原因有以下两个:

  • 他认为 Redis 事务的执行时,错误通常都是编程错误造成的,这种错误通常只会出现在开发环境中,而很少会在实际的生产环境中出现,所以他认为没有必要为 Redis 开发事务回滚功能;
  • 不支持事务回滚是因为这种复杂的功能和 Redis 追求的简单高效的设计主旨不符合。

这里不支持事务回滚,指的是不支持运行时错误的事务回滚。

【✳。redis只有事务结束,没有事务回滚!】

监控

watch 命令用于客户端并发情况下,为事务提供一个乐观锁(CAS,Check And Set),也就是可以用 watch 命令来监控一个或多个变量,如果在事务的过程中,某个监控项被修改了,那么整个事务就会终止执行

watch 基本语法如下:

1
watch key [key ...]

watch 示例代码如下:

1
2
3
4
5
6
7
8
9
10
> watch k
OK
> multi
OK
> set k v2
QUEUED
> exec
(nil)
> get k
"v"

注意:以上事务在执行期间,也就是开启事务(multi)之后,执行事务(exec)之前,模拟多客户端并发操作了变量 k 的值,这个时候再去执行事务,才会出现如上结果,exec 执行的结果为 nil。

可以看出,当执行 exec 返回的结果是 nil 时,表示 watch 监控的对象在事务执行的过程中被修改了。从 get k 的结果也可以印证,因为事务中设置的值 set k v2 并未正常执行。

执行流程如下图所示:

image-20240604170547497

注意: watch 命令只能在客户端开启事务之前执行,在事务中执行 watch 命令会引发错误,但不会造成整个事务失败,如下代码所示:

1
2
3
4
5
6
7
8
9
10
> multi
OK
> set k v3
QUEUED
> watch k
(error) ERR WATCH inside MULTI is not allowed
> exec
1) OK
> get k
"v3"

执行命令解释如下图所示:

image-20240604170600546

unwatch 命令用于清除所有之前监控的所有对象(键值对)。

unwatch 示例如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
> set k v
OK
> watch k
OK
> multi
OK
> unwatch
QUEUED
> set k v2
QUEUED
> exec
1) OK
2) OK
> get k
"v2"

可以看出,即使在事务的执行过程中,k 值被修改了,因为调用了 unwatch 命令,整个事务依然会顺利执行。

代码实战

以下是事务在 Java 中的使用,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;

public class TransactionExample {
public static void main(String[] args) {
// 创建 Redis 连接
Jedis jedis = new Jedis("xxx.xxx.xxx.xxx", 6379);
// 设置 Redis 密码
jedis.auth("xxx");
// 设置键值
jedis.set("k", "v");
// 开启监视 watch
jedis.watch("k");
// 开始事务
Transaction tx = jedis.multi();
// 命令入列
tx.set("k", "v2");
// 执行事务
tx.exec();
System.out.println(jedis.get("k"));
jedis.close();
}
}

知识点练习

以下两个客户端交替执行的结果是?

客户端一,执行如下命令:

1
2
3
4
5
6
7
8
> set k v
OK
> watch k
OK
> multi
OK
> set k v2
QUEUED

客户端二,执行如下命令:

1
2
> set k v
OK

客户端一,再执行如下命令:

1
> exec

此时 k 的值为多少?

答: k 的值为 v,而非 v2。

题目解析:本题考查的是 watch 命令监控时,即使把原对象的值重新赋值给了原对象,这个时候 watch 命令也会认为监控对象还是被修改了。

小结

事务为多个命令提供一次性按顺序执行的机制,与 Redis 事务相关的命令有以下五个:

  • multi:开启事务
  • exec:执行事务
  • discard:丢弃事务
  • watch:为事务提供乐观锁实现
  • unwatch:取消监控(取消事务中的乐观锁)

正常情况下 Redis 事务分为三个阶段:开启事务、命令入列、执行事务。Redis 事务并不支持运行时错误的事务回滚,但在某些入列错误,如 set key 或者是 watch 监控项被修改时,提供整个事务回滚的功能。

17 Redis 键值过期操作

过期设置

Redis 中设置过期时间主要通过以下四种方式:

  • expire key seconds:设置 key 在 n 秒后过期;
  • pexpire key milliseconds:设置 key 在 n 毫秒后过期;
  • expireat key timestamp:设置 key 在某个时间戳(精确到秒)之后过期;
  • pexpireat key millisecondsTimestamp:设置 key 在某个时间戳(精确到毫秒)之后过期;

下面分别来看以上这些命令的具体实现。

expire:N 秒后过期

1
2
3
4
5
6
127.0.0.1:6379> set key value
OK
127.0.0.1:6379> expire key 100
(integer) 1
127.0.0.1:6379> ttl key
(integer) 97

其中命令 ttl 的全称是 Time To Live,表示此键值在 n 秒后过期。例如,上面的结果 97 表示 key 在 97s 后过期。

pexpire:N 毫秒后过期

1
2
3
4
5
6
127.0.0.1:6379> set key2 value2
OK
127.0.0.1:6379> pexpire key2 100000
(integer) 1
127.0.0.1:6379> pttl key2
(integer) 94524

其中 pexpire key2 100000 表示设置 key2 在 100000 毫秒(100 秒)后过期。

expireat:过期时间戳精确到秒

1
2
3
4
5
6
127.0.0.1:6379> set key3 value3
OK
127.0.0.1:6379> expireat key3 1573472683
(integer) 1
127.0.0.1:6379> ttl key3
(integer) 67

其中 expireat key3 1573472683 表示 key3 在时间戳 1573472683 后过期(精确到秒),使用 ttl 查询可以发现在 67s 后 key3 会过期。

小贴士:在 Redis 可以使用 time 命令查询当前时间的时间戳(精确到秒),示例如下:

127.0.0.1:6379> time

  1. “1573472563”
  2. “248426”

pexpireat:过期时间戳精确到毫秒

1
2
3
4
5
6
127.0.0.1:6379> set key4 value4
OK
127.0.0.1:6379> pexpireat key4 1573472683000
(integer) 1
127.0.0.1:6379> pttl key4
(integer) 3522

其中 pexpireat key4 1573472683000 表示 key4 在时间戳 1573472683000 后过期(精确到毫秒),使用 ttl 查询可以发现在 3522ms 后 key4 会过期。

字符串中的过期操作

字符串中几个直接操作过期时间的方法,如下列表:

  • set key value ex seconds:设置键值对的同时指定过期时间(精确到秒);
  • set key value px milliseconds:设置键值对的同时指定过期时间(精确到毫秒);
  • setex key seconds valule:设置键值对的同时指定过期时间(精确到秒)。

实现示例如下。

1. set key value ex seconds

1
2
3
4
127.0.0.1:6379> set k v ex 100
OK
127.0.0.1:6379> ttl k
(integer) 97

2. set key value ex milliseconds

1
2
3
4
127.0.0.1:6379> set k2 v2 px 100000
OK
127.0.0.1:6379> pttl k2
(integer) 92483

3. setex key seconds valule

1
2
3
4
127.0.0.1:6379> setex k3 100 v3
OK
127.0.0.1:6379> ttl k3
(integer) 91

移除过期时间

使用命令: persist key 可以移除键值的过期时间,如下代码所示。

1
2
3
4
5
6
127.0.0.1:6379> ttl k3
(integer) 97
127.0.0.1:6379> persist k3
(integer) 1
127.0.0.1:6379> ttl k3
(integer) -1

可以看出第一次使用 ttl 查询 k3 会在 97s 后过期,当使用了 persist 命令之后,在查询 k3 的存活时间发现结果是 -1,它表示 k3 永不过期。

Java实现过期操作

本文将使用 Jedis 框架来实现对 Redis 过期时间的操作,如下代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class TTLTest {
public static void main(String[] args) throws InterruptedException {
// 创建 Redis 连接
Jedis jedis = new Jedis("xxx.xxx.xxx.xxx", 6379);
// 设置 Redis 密码(如果没有密码,此行可省略)
jedis.auth("xxx");
// 存储键值对(默认情况下永不过期)
jedis.set("k", "v");
// 查询 TTL(过期时间)
Long ttl = jedis.ttl("k");
// 打印过期日志
System.out.println("过期时间:" + ttl);
// 设置 100s 后过期
jedis.expire("k", 100);
// 等待 1s 后执行
Thread.sleep(1000);
// 打印过期日志
System.out.println("执行 expire 后的 TTL=" + jedis.ttl("k"));
}
}

程序的执行结果为:

1
2
过期时间:-1
执行 expire 后的 TTL=99

可以看出使用 Jedis 来操作 Redis 的过期时间还是很方便的,可直接使用 jedis.ttl("k") 查询键值的生存时间,使用 jedis.expire("k",seconds) 方法设置过期时间(精确到秒)。

小贴士:使用 Jedis 之前,先要把 Jedis 引入到程序中,如果使用的是 Maven 项目的,直接在 pom.xml 文件中添加以下引用:

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>version</version>
</dependency>

更多过期操作方法,如下列表:

  • pexpire(String key, long milliseconds):设置 n 毫秒后过期;
  • expireAt(String key, long unixTime):设置某个时间戳后过期(精确到秒);
  • pexpireAt(String key, long millisecondsTimestamp):设置某个时间戳后过期(精确到毫秒);
  • persist(String key):移除过期时间。

完整示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class TTLTest {
public static void main(String[] args) throws InterruptedException {
// 创建 Redis 连接
Jedis jedis = new Jedis("xxx.xxx.xxx.xxx", 6379);
// 设置 Redis 密码(如果没有密码,此行可省略)
jedis.auth("xxx");
// 存储键值对(默认情况下永不过期)
jedis.set("k", "v");
// 查询 TTL(过期时间)
Long ttl = jedis.ttl("k");
// 打印过期日志
System.out.println("过期时间:" + ttl);
// 设置 100s 后过期
jedis.expire("k", 100);
// 等待 1s 后执行
Thread.sleep(1000);
// 打印过期日志
System.out.println("执行 expire 后的 TTL=" + jedis.ttl("k"));
// 设置 n 毫秒后过期
jedis.pexpire("k", 100000);
// 设置某个时间戳后过期(精确到秒)
jedis.expireAt("k", 1573468990);
// 设置某个时间戳后过期(精确到毫秒)
jedis.pexpireAt("k", 1573468990000L);
// 移除过期时间
jedis.persist("k");
}
}

持久化中的过期键

上面我们讲了过期键在 Redis 正常运行中一些使用案例,接下来,我们来看 Redis 在持久化的过程中是如何处理过期键的。

Redis 持久化文件有两种格式:RDB(Redis Database)和 AOF(Append Only File),下面我们分别来看过期键在这两种格式中的呈现状态。

RDB 中的过期键

RDB 文件分为两个阶段,RDB 文件生成阶段和加载阶段。

1. RDB 文件生成

从内存状态持久化成 RDB(文件)的时候,会对 key 进行过期检查,过期的键不会被保存到新的 RDB 文件中,因此 Redis 中的过期键不会对生成新 RDB 文件产生任何影响。

2. RDB 文件加载

RDB 加载分为以下两种情况:

  • 如果 Redis 是主服务器运行模式的话,在载入 RDB 文件时,程序会对文件中保存的键进行检查,过期键不会被载入到数据库中。所以过期键不会对载入 RDB 文件的主服务器造成影响;
  • 如果 Redis 是从服务器运行模式的话,在载入 RDB 文件时,不论键是否过期都会被载入到数据库中。但由于主从服务器在进行数据同步时,从服务器的数据会被清空。所以一般来说,过期键对载入 RDB 文件的从服务器也不会造成影响。

RDB 文件加载的源码可以在 rdb.c 文件的 rdbLoad() 函数中找到,源码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* Check if the key already expired. This function is used when loading
* an RDB file from disk, either at startup, or when an RDB was
* received from the master. In the latter case, the master is
* responsible for key expiry. If we would expire keys here, the
* snapshot taken by the master may not be reflected on the slave.
*
* 如果服务器为主节点的话,
* 那么在键已经过期的时候,不再将它们关联到数据库中去
*/
if (server.masterhost == NULL && expiretime != -1 && expiretime < now) {
decrRefCount(key);
decrRefCount(val);
// 跳过
continue;
}

AOF 中的过期键

1. AOF 文件写入

当 Redis 以 AOF 模式持久化时,如果数据库某个过期键还没被删除,那么 AOF 文件会保留此过期键,当此过期键被删除后,Redis 会向 AOF 文件追加一条 DEL 命令来显式地删除该键值。

2. AOF 重写

执行 AOF 重写时,会对 Redis 中的键值对进行检查已过期的键不会被保存到重写后的 AOF 文件中,因此不会对 AOF 重写造成任何影响。

主从库的过期键

当 Redis 运行在主从模式下时,从库不会进行过期扫描,从库对过期的处理是被动的。也就是即使从库中的 key 过期了,如果有客户端访问从库时,依然可以得到 key 对应的值,像未过期的键值对一样返回。

从库的过期键处理依靠主服务器控制,主库在 key 到期时,会在 AOF 文件里增加一条 del 指令,同步到所有的从库,从库通过执行这条 del 指令来删除过期的 key。

小结

本文我们知道了 Redis 中的四种设置过期时间的方式:expire、pexpire、expireat、pexpireat,其中比较常用的是 expire 设置键值 n 秒后过期。

字符串中可以在添加键值的同时设置过期时间,并可以使用 persist 命令移除过期时间。同时我们也知道了过期键在 RDB 写入和 AOF 重写时都不会被记录。

过期键在主从模式下,从库对过期键的处理要完全依靠主库,主库删除过期键之后会发送 del 命令给所有的从库。

image-20240604171410049

18 Redis 过期策略与源码分析

在 Redis 中我们可以给一些元素设置过期时间,那当它过期之后 Redis 是如何处理这些过期键呢?

过期键执行流程

Redis 之所以能知道那些键值过期,是因为在 Redis 中维护了一个字典,存储了所有设置了过期时间的键值,我们称之为过期字典。

过期键判断流程如下图所示:

image-20240604171941410

过期键源码分析

过期键存储在 redisDb 结构中,源代码在 src/server.h 文件中:

1
2
3
4
5
6
7
8
9
10
11
12
13
/* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */
typedef struct redisDb {
dict *dict; /* 数据库键空间,存放着所有的键值对 */
dict *expires; /* 键的过期时间 */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;

小贴士:本文的所有源码都是基于 Redis 5。

过期键数据结构如下图所示:

image-20240604171958365

过期策略

Redis 会删除已过期的键值,以此来减少 Redis 的空间占用,但因为 Redis 本身是单线的,如果因为删除操作而影响主业务的执行就得不偿失了,为此 Redis 需要制定多个(过期)删除策略来保证糟糕的事情不会发生。

常见的过期策略有以下三种:

  • 定时删除
  • 惰性删除
  • 定期删除

下面分别来看每种策略有何不同。

定时删除

在设置键值过期时间时,创建一个定时事件,当过期时间到达时,由事件处理器自动执行键的删除操作。

  • **优点:**保证内存可以被尽快地释放。
  • **缺点:**在 Redis 高负载的情况下或有大量过期键需要同时处理时,会造成 Redis 服务器卡顿,影响主业务执行。

惰性删除

不主动删除过期键,每次从数据库获取键值时判断是否过期,如果过期则删除键值,并返回 null。

  • **优点:**因为每次访问时,才会判断过期键,所以此策略只会使用很少的系统资源。
  • **缺点:**系统占用空间删除不及时,导致空间利用率降低,造成了一定的空间浪费。

源码解析

惰性删除的源码位于 src/db.c 文件的 expireIfNeeded 方法中,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
int expireIfNeeded(redisDb *db, robj *key) {
// 判断键是否过期
if (!keyIsExpired(db,key)) return 0;
if (server.masterhost != NULL) return 1;
/* 删除过期键 */
// 增加过期键个数
server.stat_expiredkeys++;
// 传播键过期的消息
propagateExpire(db,key,server.lazyfree_lazy_expire);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",key,db->id);
// server.lazyfree_lazy_expire 为 1 表示异步删除(懒空间释放),反之同步删除
return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
dbSyncDelete(db,key);
}
// 判断键是否过期
int keyIsExpired(redisDb *db, robj *key) {
mstime_t when = getExpire(db,key);
if (when < 0) return 0; /* No expire for this key */
/* Don't expire anything while loading. It will be done later. */
if (server.loading) return 0;
mstime_t now = server.lua_caller ? server.lua_time_start : mstime();
return now > when;
}
// 获取键的过期时间
long long getExpire(redisDb *db, robj *key) {
dictEntry *de;
/* No expire? return ASAP */
if (dictSize(db->expires) == 0 ||
(de = dictFind(db->expires,key->ptr)) == NULL) return -1;
/* The entry was found in the expire dict, this means it should also
* be present in the main dict (safety check). */
serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
return dictGetSignedIntegerVal(de);
}

所有对数据库的读写命令在执行之前,都会调用 expireIfNeeded 方法判断键值是否过期,过期则会从数据库中删除,反之则不做任何处理。

惰性删除执行流程,如下图所示:

image-20240604172032518

定期删除

每隔一段时间检查一次数据库,随机删除一些过期键。

Redis 默认每秒进行 10 次过期扫描,此配置可通过 Redis 的配置文件 redis.conf 进行配置,配置键为 hz 它的默认值是 hz 10

需要注意的是:Redis 每次扫描并不是遍历过期字典中的所有键,而是采用随机抽取判断并删除过期键的形式执行的。

定期删除流程

  1. 从过期字典中随机取出 20 个键;
  2. 删除这 20 个键中过期的键;
  3. 如果过期 key 的比例超过 25%,重复步骤 1。

同时为了保证过期扫描不会出现循环过度,导致线程卡死现象,算法还增加了扫描时间的上限,默认不会超过 25ms。

定期删除执行流程,如下图所示:

image-20240604172045333

  • **优点:**通过限制删除操作的时长和频率,来减少删除操作对 Redis 主业务的影响,同时也能删除一部分过期的数据减少了过期键对空间的无效占用。
  • **缺点:**内存清理方面没有定时删除效果好,同时没有惰性删除使用的系统资源少。

源码解析

定期删除的核心源码在 src/expire.c 文件下的 activeExpireCycle 方法中,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
void activeExpireCycle(int type) {
static unsigned int current_db = 0; /* 上次定期删除遍历到的数据库ID */
static int timelimit_exit = 0; /* Time limit hit in previous call? */
static long long last_fast_cycle = 0; /* 上一次执行快速定期删除的时间点 */
int j, iteration = 0;
int dbs_per_call = CRON_DBS_PER_CALL; // 每次定期删除,遍历的数据库的数量
long long start = ustime(), timelimit, elapsed;
if (clientsArePaused()) return;
if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
if (!timelimit_exit) return;
// ACTIVE_EXPIRE_CYCLE_FAST_DURATION 是快速定期删除的执行时长
if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return;
last_fast_cycle = start;
}
if (dbs_per_call > server.dbnum || timelimit_exit)
dbs_per_call = server.dbnum;
// 慢速定期删除的执行时长
timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100;
timelimit_exit = 0;
if (timelimit <= 0) timelimit = 1;
if (type == ACTIVE_EXPIRE_CYCLE_FAST)
timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* 删除操作的执行时长 */
long total_sampled = 0;
long total_expired = 0;
for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
int expired;
redisDb *db = server.db+(current_db % server.dbnum);
current_db++;
do {
// .......
expired = 0;
ttl_sum = 0;
ttl_samples = 0;
// 每个数据库中检查的键的数量
if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;
// 从数据库中随机选取 num 个键进行检查
while (num--) {
dictEntry *de;
long long ttl;
if ((de = dictGetRandomKey(db->expires)) == NULL) break;
ttl = dictGetSignedInteger
// 过期检查,并对过期键进行删除
if (activeExpireCycleTryExpire(db,de,now)) expired++;
if (ttl > 0) {
/* We want the average TTL of keys yet not expired. */
ttl_sum += ttl;
ttl_samples++;
}
total_sampled++;
}
total_expired += expired;
if (ttl_samples) {
long long avg_ttl = ttl_sum/ttl_samples;
if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
}
if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
elapsed = ustime()-start;
if (elapsed > timelimit) {
timelimit_exit = 1;
server.stat_expired_time_cap_reached_count++;
break;
}
}
/* 每次检查只删除 ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4 个过期键 */
} while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4);
}
// .......
}

activeExpireCycle 方法在规定的时间,分多次遍历各个数据库,从过期字典中随机检查一部分过期键的过期时间,删除其中的过期键。

这个函数有两种执行模式,一个是快速模式一个是慢速模式,体现是代码中的 timelimit 变量,这个变量是用来约束此函数的运行时间的。快速模式下 timelimit 的值是固定的,等于预定义常量 ACTIVE_EXPIRE_CYCLE_FAST_DURATION,慢速模式下,这个变量的值是通过 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100 计算的。

Redis 使用的过期策略

Redis 使用的是惰性删除加定期删除的过期策略。

小结

通过本文可知 Redis 是通过设置过期字典的形式来判断过期键的,Redis 采用的是惰性删除和定期删除的形式删除过期键的,Redis 的定期删除策略并不会遍历删除每个过期键,而是采用随机抽取的方式删除过期键,同时为了保证过期扫描不影响 Redis 主业务,Redis 的定期删除策略中还提供了最大执行时间,以保证 Redis 正常并高效地运行。

19 Redis 管道技术——Pipeline

管道技术(Pipeline)是客户端提供的一种批处理技术,用于一次处理多个 Redis 命令,从而提高整个交互的性能。

通常情况下 Redis 是单行执行的,客户端先向服务器发送请求,服务端接收并处理请求后再把结果返回给客户端,这种处理模式在非频繁请求时不会有任何问题。

但如果出现集中大批量的请求时,因为每个请求都要经历先请求再响应的过程,这就会造成网络资源浪费,此时就需要管道技术来把所有的命令整合一次发给服务端,再一次响应给客户端,这样就能大大的提升了 Redis 的响应速度。

普通命令模式,如下图所示:

image-20240604172200176

管道模式,如下图所示:

image-20240604172808877

小贴士:管道中命令越多,管道技术的作用就更大,相比于普通模式来说执行效率就越高。

管道技术解决了什么问题?

管道技术解决了多个命令集中请求时造成网络资源浪费的问题,加快了 Redis 的响应速度,让 Redis 拥有更高的运行速度。但要注意的一点是,管道技术本质上是客户端提供的功能,而非 Redis 服务器端的功能。

管道技术使用

本文我们使用 Jedis 客户端提供的 Pipeline 对象来实现管道技术。首先先获取 Pipeline 对象,再为 Pipeline 对象设置需要执行的命令,最后再使用 sync() 方法或 syncAndReturnAll() 方法来统一执行这些命令,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class PipelineExample {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 记录执行开始时间
long beginTime = System.currentTimeMillis();
// 获取 Pipeline 对象
Pipeline pipe = jedis.pipelined();
// 设置多个 Redis 命令
for (int i = 0; i < 100; i++) {
pipe.set("key" + i, "val" + i);
pipe.del("key"+i);
}
// 执行命令
pipe.sync();
// 记录执行结束时间
long endTime = System.currentTimeMillis();
System.out.println("执行耗时:" + (endTime - beginTime) + "毫秒");
}
}

以上程序执行结果如下:

1
执行耗时:297毫秒

如果要接收管道所有命令的执行结果,可使用 syncAndReturnAll() 方法,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class PipelineExample {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 获取 Pipeline 对象
Pipeline pipe = jedis.pipelined();
// 设置多个 Redis 命令
for (int i = 0; i < 100; i++) {
pipe.set("key" + i, "val" + i);
}
// 执行命令并返回结果
List<Object> res = pipe.syncAndReturnAll();
for (Object obj : res) {
// 打印结果
System.out.println(obj);
}
}
}

管道技术 VS 普通命令

上面使用管道技术执行一个 for 循环所用的时间为 297 毫秒,接下来我们用普通的命令执行此循环,看下程序的执行时间,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class PipelineExample {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 记录执行开始时间
long beginTime = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
jedis.set("key" + i, "val" + i);
jedis.del("key"+i);
}
// 记录执行结束时间
long endTime = System.currentTimeMillis();
System.out.println("执行耗时:" + (endTime - beginTime) + "毫秒");
}
}

以上程序执行结果如下:

1
执行耗时:17276毫秒

结论

从上面的结果可以看出,管道的执行时间是 297 毫秒,而普通命令执行时间是 17276 毫秒,管道技术要比普通的执行快了 58 倍。

管道技术需要注意的事项

管道技术虽然有它的优势,但在使用时还需注意以下几个细节:

  • 发送的命令数量不会被限制,但输入缓存区也就是命令的最大存储体积为 1GB,当发送的命令超过此限制时,命令不会被执行,并且会被 Redis 服务器端断开此链接;
  • 如果管道的数据过多可能会导致客户端的等待时间过长,导致网络阻塞;
  • 部分客户端自己本身也有缓存区大小的设置,如果管道命令没有没执行或者是执行不完整,可以排查此情况或较少管道内的命令重新尝试执行。

小结

使用管道技术可以解决多个命令执行时的网络等待,它是把多个命令整合到一起发送给服务器端处理之后统一返回给客户端,这样就免去了每条命令执行后都要等待的情况,从而有效地提高了程序的执行效率,但使用管道技术也要注意避免发送的命令过大,或管道内的数据太多而导致的网络阻塞。

【Redis管道的工作原理是这样的:在同一个TCP连接上,客户端可以连续发送多个命令而无需等待服务器的回复,同时服务器也可以连续接收并处理这些命令。当所有命令都处理完毕后,服务器将所有命令的结果一次性发送回客户端。】

20 查询附近的人——GEO

受过高等教育的我们都知道,我们所处的任何位置都可以用经度和纬度来标识,经度的范围 -180 到 180,纬度的范围为 -90 到 90。纬度以赤道为界,赤道以南为负数,赤道以北为正数;经度以本初子午线(英国格林尼治天文台)为界,东边为正数,西边为负数。

Redis 在 3.2 版本中增加了 GEO 类型用于存储和查询地理位置,关于 GEO 的命令不多,主要包含以下 6 个:

  1. geoadd:添加地理位置
  2. geopos:查询位置信息
  3. geodist:距离统计
  4. georadius:查询某位置内的其他成员信息
  5. geohash:查询位置的哈希值
  6. zrem:删除地理位置

下面我们分别来看这些命令的使用。

基础使用

添加地理位置

我们先用百度地图提供的经纬度查询工具,地址:

http://api.map.baidu.com/lbsapi/getpoint/index.html

如下图所示:

image-20240604172934996

找了以下 4 个地点,添加到 Redis 中:

  1. 天安门:116.404269,39.913164
  2. 月坛公园:116.36,39.922461
  3. 北京欢乐谷:116.499705,39.874635
  4. 香山公园:116.193275,39.996348

代码如下:

1
2
3
4
5
6
7
8
127.0.0.1:6379> geoadd site 116.404269 39.913164 tianan
(integer) 1
127.0.0.1:6379> geoadd site 116.36 39.922461 yuetan
(integer) 1
127.0.0.1:6379> geoadd site 116.499705 39.874635 huanle
(integer) 1
127.0.0.1:6379> geoadd site 116.193275 39.996348 xiangshan
(integer) 1

相关语法:

1
geoadd key longitude latitude member [longitude latitude member ...]

重点参数说明如下:

  • longitude 表示经度
  • latitude 表示纬度
  • member 是为此经纬度起的名字

此命令支持一次添加一个或多个位置信息。

查询位置信息

1
2
3
127.0.0.1:6379> geopos site tianan
1) 1) "116.40541702508926392"
2) "39.91316289865137179"

相关语法:

1
geopos key member [member ...]

此命令支持查看一个或多个位置信息。

距离统计

1
2
127.0.0.1:6379> geodist site tianan yuetan km
"3.9153"

可以看出天安门距离月坛公园的直线距离大概是 3.9 km,我们打开地图使用工具测试一下咱们的统计结果是否准确,如下图所示:

image-20240604172949414

可以看出 Redis 的统计和使用地图工具统计的距离是完全吻合的。

注意:此命令统计的距离为两个位置的直线距离。

相关语法:

1
geodist key member1 member2 [unit]

unit 参数表示统计单位,它可以设置以下值:

  • m:以米为单位,默认单位;
  • km:以千米为单位;
  • mi:以英里为单位;
  • ft:以英尺为单位。

查询某位置内的其他成员信息

1
2
3
127.0.0.1:6379> georadius site 116.405419 39.913164 5 km
1) "tianan"
2) "yuetan"

此命令的意思是查询天安门(116.405419,39.913164)附近 5 公里范围内的成员列表。

相关语法:

1
georadius key longitude latitude radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] [ASC|DESC]

可选参数说明如下。

1. WITHCOORD

说明:返回满足条件位置的经纬度信息。

示例代码:

1
2
3
4
5
6
7
127.0.0.1:6379> georadius site 116.405419 39.913164 5 km withcoord
1) 1) "tianan"
2) 1) "116.40426903963088989"
2) "39.91316289865137179"
2) 1) "yuetan"
2) 1) "116.36000186204910278"
2) "39.92246025586381819"

2. WITHDIST

说明:返回满足条件位置与查询位置的直线距离。

示例代码:

1
2
3
4
5
127.0.0.1:6379> georadius site 116.405419 39.913164 5 km withdist
1) 1) "tianan"
2) "0.0981"
2) 1) "yuetan"
2) "4.0100"

3. WITHHASH

说明:返回满足条件位置的哈希信息。

示例代码:

1
2
3
4
5
127.0.0.1:6379> georadius site 116.405419 39.913164 5 km withhash
1) 1) "tianan"
2) (integer) 4069885552230465
2) 1) "yuetan"
2) (integer) 4069879797297521

4. COUNT count

说明:指定返回满足条件位置的个数。

例如,指定返回一条满足条件的信息,代码如下:

1
2
127.0.0.1:6379> georadius site 116.405419 39.913164 5 km count 1
1) "tianan"

5. ASC|DESC

说明:从近到远|从远到近排序返回。

示例代码:

1
2
3
4
5
6
127.0.0.1:6379> georadius site 116.405419 39.913164 5 km desc
1) "yuetan"
2) "tianan"
127.0.0.1:6379> georadius site 116.405419 39.913164 5 km asc
1) "tianan"
2) "yuetan"

当然以上这些可选参数也可以一起使用,例如以下代码:

1
2
3
4
5
127.0.0.1:6379> georadius site 116.405419 39.913164 5 km withdist desc
1) 1) "yuetan"
2) "4.0100"
2) 1) "tianan"
2) "0.0981"

5. 查询哈希值

1
2
127.0.0.1:6379> geohash site tianan
1) "wx4g0cgp000"

相关语法:

1
geohash key member [member ...]

此命令支持查询一个或多个地址的哈希值。

6. 删除地理位置

1
2
127.0.0.1:6379> zrem site xiaoming
(integer) 1

相关语法:

1
zrem key member [member ...]

此命令支持删除一个或多个位置信息。

代码实战

下面我们用 Java 代码,来实现查询附近的人,完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import redis.clients.jedis.GeoCoordinate;
import redis.clients.jedis.GeoRadiusResponse;
import redis.clients.jedis.GeoUnit;
import redis.clients.jedis.Jedis;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class GeoHashExample {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
Map<String, GeoCoordinate> map = new HashMap<>();
// 添加小明的位置
map.put("xiaoming", new GeoCoordinate(116.404269, 39.913164));
// 添加小红的位置
map.put("xiaohong", new GeoCoordinate(116.36, 39.922461));
// 添加小美的位置
map.put("xiaomei", new GeoCoordinate(116.499705, 39.874635));
// 添加小二
map.put("xiaoer", new GeoCoordinate(116.193275, 39.996348));
jedis.geoadd("person", map);
// 查询小明和小红的直线距离
System.out.println("小明和小红相距:" + jedis.geodist("person", "xiaoming",
"xiaohong", GeoUnit.KM) + " KM");
// 查询小明附近 5 公里的人
List<GeoRadiusResponse> res = jedis.georadiusByMemberReadonly("person", "xiaoming",
5, GeoUnit.KM);
for (int i = 1; i < res.size(); i++) {
System.out.println("小明附近的人:" + res.get(i).getMemberByString());
}
}
}

以上程序执行的结果如下:

1
2
小明和小红相距:3.9153 KM
小明附近的人:xiaohong

应用场景

Redis 中的 GEO 经典使用场景如下:

  1. 查询附近的人、附近的地点等;
  2. 计算相关的距离信息。

小结

GEO 是 Redis 3.2 版本中新增的功能,只有升级到 3.2+ 才能使用,GEO 本质上是基于 ZSet 实现的,这点在 Redis 源码找到相关信息,我们可以 GEO 使用实现查找附近的人或者附近的地点,还可以用它来计算两个位置相隔的直线距离。

21 游标迭代器(过滤器)——Scan

一个问题引发的「血案」

曾经发生过这样一件事,我们的 Redis 服务器存储了海量的数据,其中登录用户信息是以 user_token_id 的形式存储的。运营人员想要当前所有的用户登录信息,然后悲剧就发生了:因为我们的工程师使用了 keys user_token_* 来查询对应的用户,结果导致 Redis 假死不可用,以至于影响到线上的其他业务接连发生问题,然后就收到了一堆的系统预警短信。并且这个假死的时间是和存储的数据成正比的,数据量越大假死的时间就越长,导致的故障时间也越长。

那如何避免这个问题呢?

问题的解决方案

在 Redis 2.8 之前,我们只能使用 keys 命令来查询我们想要的数据,但这个命令存在两个缺点:

  1. 此命令没有分页功能,我们只能一次性查询出所有符合条件的 key 值,如果查询结果非常巨大,那么得到的输出信息也会非常多;
  2. keys 命令是遍历查询,因此它的查询时间复杂度是 o(n),所以数据量越大查询时间就越长。

然而,比较幸运的是在 Redis 2.8 时推出了 Scan,解决了我们这些问题,下面来看 Scan 的具体使用。

Scan 命令使用

我们先来模拟海量数据,使用 Pipeline 添加 10w 条数据,Java 代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import utils.JedisUtils;

public class ScanExample {
public static void main(String[] args) {
// 添加 10w 条数据
initData();
}
public static void initData(){
Jedis jedis = JedisUtils.getJedis();
Pipeline pipe = jedis.pipelined();
for (int i = 1; i < 100001; i++) {
pipe.set("user_token_" + i, "id" + i);
}
// 执行命令
pipe.sync();
System.out.println("数据插入完成");
}
}

我们来查询用户 id 为 9999* 的数据,Scan 命令使用如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
127.0.0.1:6379> scan 0 match user_token_9999* count 10000
1) "127064"
2) 1) "user_token_99997"
127.0.0.1:6379> scan 127064 match user_token_9999* count 10000
1) "1740"
2) 1) "user_token_9999"
127.0.0.1:6379> scan 1740 match user_token_9999* count 10000
1) "21298"
2) 1) "user_token_99996"
127.0.0.1:6379> scan 21298 match user_token_9999* count 10000
1) "65382"
2) (empty list or set)
127.0.0.1:6379> scan 65382 match user_token_9999* count 10000
1) "78081"
2) 1) "user_token_99998"
2) "user_token_99992"
127.0.0.1:6379> scan 78081 match user_token_9999* count 10000
1) "3993"
2) 1) "user_token_99994"
2) "user_token_99993"
127.0.0.1:6379> scan 3993 match user_token_9999* count 10000
1) "13773"
2) 1) "user_token_99995"
127.0.0.1:6379> scan 13773 match user_token_9999* count 10000
1) "47923"
2) (empty list or set)
127.0.0.1:6379> scan 47923 match user_token_9999* count 10000
1) "59751"
2) 1) "user_token_99990"
2) "user_token_99991"
3) "user_token_99999"
127.0.0.1:6379> scan 59751 match user_token_9999* count 10000
1) "0"
2) (empty list or set)

从以上的执行结果,我们看出两个问题:

  1. 查询的结果为空,但游标值不为 0,表示遍历还没结束;
  2. 设置的是 count 10000,但每次返回的数量都不是 10000,且不固定,这是因为 count 只是限定服务器单次遍历的字典槽位数量(约等于),而不是规定返回结果的 count 值。

相关语法:

1
scan cursor [MATCH pattern] [COUNT count]

其中:

  • cursor:光标位置,整数值,从 0 开始,到 0 结束,查询结果是空,但游标值不为 0,表示遍历还没结束;
  • match pattern:正则匹配字段;
  • count:限定服务器单次遍历的字典槽位数量(约等于),只是对增量式迭代命令的一种提示(hint),并不是查询结果返回的最大数量,它的默认值是 10。

代码实战

本文我们使用 Java 代码来实现 Scan 的查询功能,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;
import utils.JedisUtils;

public class ScanExample {
public static void main(String[] args) {
Jedis jedis = JedisUtils.getJedis();
// 定义 match 和 count 参数
ScanParams params = new ScanParams();
params.count(10000);
params.match("user_token_9999*");
// 游标
String cursor = "0";
while (true) {
ScanResult<String> res = jedis.scan(cursor, params);
if (res.getCursor().equals("0")) {
// 表示最后一条
break;
}
cursor = res.getCursor(); // 设置游标
for (String item : res.getResult()) {
// 打印查询结果
System.out.println("查询结果:" + item);
}
}
}
}

以上程序执行结果如下:

1
2
3
4
5
6
7
8
9
10
11
查询结果:user_token_99997
查询结果:user_token_9999
查询结果:user_token_99996
查询结果:user_token_99998
查询结果:user_token_99992
查询结果:user_token_99994
查询结果:user_token_99993
查询结果:user_token_99995
查询结果:user_token_99990
查询结果:user_token_99991
查询结果:user_token_99999

Scan 相关命令

Scan 是一个系列指令,除了 Scan 之外,还有以下 3 个命令:

  1. HScan 遍历字典游标迭代器
  2. SScan 遍历集合的游标迭代器
  3. ZScan 遍历有序集合的游标迭代器

来看这些命令的具体使用。

HScan 使用

1
2
3
4
127.0.0.1:6379> hscan myhash 0 match k2* count 10
1) "0"
2) 1) "k2"
2) "v2"

相关语法:

1
hscan key cursor [MATCH pattern] [COUNT count]

SScan 使用

1
2
3
127.0.0.1:6379> sscan myset 0 match v2* count 20
1) "0"
2) 1) "v2"

相关语法:

1
sscan key cursor [MATCH pattern] [COUNT count]

ZScan 使用

1
2
3
4
127.0.0.1:6379> zscan zset 0 match red* count 20
1) "0"
2) 1) "redis"
2) "10"

相关语法:

1
zscan key cursor [MATCH pattern] [COUNT count]

Scan 说明

官方对 Scan 命令的描述信息如下。

Scan guarantees

The SCAN command, and the other commands in the SCAN family, are able to provide to the user a set of guarantees associated to full iterations.

  • A full iteration always retrieves all the elements that were present in the collection from the start to the end of a full iteration. This means that if a given element is inside the collection when an iteration is started, and is still there when an iteration terminates, then at some point SCANreturned it to the user.
  • A full iteration never returns any element that was NOT present in the collection from the start to the end of a full iteration. So if an element was removed before the start of an iteration, and is never added back to the collection for all the time an iteration lasts, SCAN ensures that this element will never be returned.

However because SCAN has very little state associated (just the cursor) it has the following drawbacks:

  • A given element may be returned multiple times. It is up to the application to handle the case of duplicated elements, for example only using the returned elements in order to perform operations that are safe when re-applied multiple times.
  • Elements that were not constantly present in the collection during a full iteration, may be returned or not: it is undefined.

官方文档地址:

https://redis.io/commands/scan

翻译为中文的含义是:Scan 及它的相关命令可以保证以下查询规则。

  • 它可以完整返回开始到结束检索集合中出现的所有元素,也就是在整个查询过程中如果这些元素没有被删除,且符合检索条件,则一定会被查询出来;
  • 它可以保证不会查询出,在开始检索之前删除的那些元素。

然后,Scan 命令包含以下缺点:

  • 一个元素可能被返回多次,需要客户端来实现去重;
  • 在迭代过程中如果有元素被修改,那么修改的元素能不能被遍历到不确定。

小结

通过本文我们可以知道 Scan 包含以下四个指令:

  1. Scan:用于检索当前数据库中所有数据;
  2. HScan:用于检索哈希类型的数据;
  3. SScan:用于检索集合类型中的数据;
  4. ZScan:由于检索有序集合中的数据。

Scan 具备以下几个特点:

  1. Scan 可以实现 keys 的匹配功能;
  2. Scan 是通过游标进行查询的不会导致 Redis 假死;
  3. Scan 提供了 count 参数,可以规定遍历的数量;
  4. Scan 会把游标返回给客户端,用户客户端继续遍历查询;
  5. Scan 返回的结果可能会有重复数据,需要客户端去重;
  6. 单次返回空值且游标不为 0,说明遍历还没结束;
  7. Scan 可以保证在开始检索之前,被删除的元素一定不会被查询出来;
  8. 在迭代过程中如果有元素被修改, Scan 不保证能查询出相关的元素。

Scan命令的主要用途是允许用户在不阻塞服务器的情况下遍历数据库的键。在大规模数据集中,如果你需要遍历所有的键,使用Scan命令比使用Keys命令更加高效,因为Keys命令在处理大量数据时可能会阻塞服务器。

此外,Scan命令还支持一个可选的匹配模式参数(match),可以让用户只迭代那些与给定模式匹配的键。这个特性让Scan命令在搜索特定的键时变得非常有用。

22 优秀的基数统计算法——HyperLogLog

为什么要使用 HyperLogLog?

在我们实际开发的过程中,可能会遇到这样一个问题,当我们需要统计一个大型网站的独立访问次数时,该用什么的类型来统计?

如果我们使用 Redis 中的集合来统计,当它每天有数千万级别的访问时,将会是一个巨大的问题。因为这些访问量不能被清空,我们运营人员可能会随时查看这些信息,那么随着时间的推移,这些统计数据所占用的空间会越来越大,逐渐超出我们能承载最大空间。

例如,我们用 IP 来作为独立访问的判断依据,那么我们就要把每个独立 IP 进行存储,以 IP4 来计算,IP4 最多需要 15 个字节来存储信息,例如:110.110.110.110。当有一千万个独立 IP 时,所占用的空间就是 15 bit*10000000 约定于 143MB,但这只是一个页面的统计信息,假如我们有 1 万个这样的页面,那我们就需要 1T 以上的空间来存储这些数据,而且随着 IP6 的普及,这个存储数字会越来越大,那我们就不能用集合的方式来存储了,这个时候我们需要开发新的数据类型 HyperLogLog 来做这件事了。

HyperLogLog 介绍

HyperLogLog(下文简称为 HLL)是 Redis 2.8.9 版本添加的数据结构,它用于高性能的基数(去重)统计功能,它的缺点就是存在极低的误差率。

HLL 具有以下几个特点:

  • 能够使用极少的内存来统计巨量的数据,它只需要 12K 空间就能统计 2^64 的数据;
  • 统计存在一定的误差,误差率整体较低,标准误差为 0.81%;
  • 误差可以被设置辅助计算因子进行降低。

基础使用

HLL 的命令只有 3 个,但都非常的实用,下面分别来看。

添加元素

1
2
3
4
127.0.0.1:6379> pfadd key "redis"
(integer) 1
127.0.0.1:6379> pfadd key "java" "sql"
(integer) 1

相关语法:

1
pfadd key element [element ...]

此命令支持添加一个或多个元素至 HLL 结构中。

统计不重复的元素

1
2
3
4
5
6
7
8
127.0.0.1:6379> pfadd key "redis"
(integer) 1
127.0.0.1:6379> pfadd key "sql"
(integer) 1
127.0.0.1:6379> pfadd key "redis"
(integer) 0
127.0.0.1:6379> pfcount key
(integer) 2

从 pfcount 的结果可以看出,在 HLL 结构中键值为 key 的元素,有 2 个不重复的值:redis 和 sql,可以看出结果还是挺准的。

相关语法:

1
pfcount key [key ...]

此命令支持统计一个或多个 HLL 结构。

合并一个或多个 HLL 至新结构

新增 k 和 k2 合并至新结构 k3 中,代码如下:

1
2
3
4
5
6
7
8
127.0.0.1:6379> pfadd k "java" "sql"
(integer) 1
127.0.0.1:6379> pfadd k2 "redis" "sql"
(integer) 1
127.0.0.1:6379> pfmerge k3 k k2
OK
127.0.0.1:6379> pfcount k3
(integer) 3

相关语法:

1
pfmerge destkey sourcekey [sourcekey ...]

pfmerge 使用场景

当我们需要合并两个或多个同类页面的访问数据时,我们可以使用 pfmerge 来操作。

代码实战

接下来我们使用 Java 代码来实现 HLL 的三个基础功能,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import redis.clients.jedis.Jedis;

public class HyperLogLogExample {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 添加元素
jedis.pfadd("k", "redis", "sql");
jedis.pfadd("k", "redis");
// 统计元素
long count = jedis.pfcount("k");
// 打印统计元素
System.out.println("k:" + count);
// 合并 HLL
jedis.pfmerge("k2", "k");
// 打印新 HLL
System.out.println("k2:" + jedis.pfcount("k2"));
}
}

以上代码执行结果如下:

1
2
k:2
k2:2

HLL 算法原理

HyperLogLog 算法来源于论文 HyperLogLog the analysis of a near-optimal cardinality estimation algorithm,想要了解 HLL 的原理,先要从伯努利试验说起,伯努利实验说的是抛硬币的事。一次伯努利实验相当于抛硬币,不管抛多少次只要出现一个正面,就称为一次伯努利实验。

我们用 k 来表示每次抛硬币的次数,n 表示第几次抛的硬币,用 k_max 来表示抛硬币的最高次数,最终根据估算发现 n 和 k_max 存在的关系是 n=2^(k_max),但同时我们也发现了另一个问题当试验次数很小的时候,这种估算方法的误差会很大,例如我们进行以下 3 次实验:

  • 第 1 次试验:抛 3 次出现正面,此时 k=3,n=1;
  • 第 2 次试验:抛 2 次出现正面,此时 k=2,n=2;
  • 第 3 次试验:抛 6 次出现正面,此时 k=6,n=3。

对于这三组实验来说,k_max=6,n=3,但放入估算公式明显 3≠2^6。为了解决这个问题 HLL 引入了分桶算法和调和平均数来使这个算法更接近真实情况。

分桶算法是指把原来的数据平均分为 m 份,在每段中求平均数在乘以 m,以此来消减因偶然性带来的误差,提高预估的准确性,简单来说就是把一份数据分为多份,把一轮计算,分为多轮计算。

而调和平均数指的是使用平均数的优化算法,而非直接使用平均数。

例如小明的月工资是 1000 元,而小王的月工资是 100000 元,如果直接取平均数,那小明的平均工资就变成了 (1000+100000)/2=50500‬ 元,这显然是不准确的,而使用调和平均数算法计算的结果是 2/(1⁄1000+1⁄100000)≈1998 元,显然此算法更符合实际平均数。

所以综合以上情况,在 Redis 中使用 HLL 插入数据,相当于把存储的值经过 hash 之后,再将 hash 值转换为二进制,存入到不同的桶中,这样就可以用很小的空间存储很多的数据,统计时再去相应的位置进行对比很快就能得出结论,这就是 HLL 算法的基本原理,想要更深入的了解算法及其推理过程,可以看去原版的论文,链接地址在文末。

小结

当需要做大量数据统计时,普通的集合类型已经不能满足我们的需求了,这个时候我们可以借助 Redis 2.8.9 中提供的 HyperLogLog 来统计,它的优点是只需要使用 12k 的空间就能统计 2^64 的数据,但它的缺点是存在 0.81% 的误差,HyperLogLog 提供了三个操作方法 pfadd 添加元素、pfcount 统计元素和 pfmerge 合并元素。

参考文献

23 内存淘汰机制与算法

在本文开始之前,我们先要明白:在 Redis 中,过期策略和内存淘汰策略两个完全不同的概念,但很多人会把两者搞混。

首先,Redis 过期策略指的是 Redis 使用那种策略,来删除已经过期的键值对;而 Redis 内存淘汰机制指的是,当 Redis 运行内存已经超过 Redis 设置的最大内存之后,将采用什么策略来删除符合条件的键值对,以此来保障 Redis 高效的运行。

过期策略前面的文章,我们已经详细地讲过了,本文我们重点来看 Redis 的内存淘汰机制。

Redis 最大运行内存

只有在 Redis 的运行内存达到了某个阀值,才会触发内存淘汰机制,这个阀值就是我们设置的最大运行内存,此值在 Redis 的配置文件中可以找到,配置项为 maxmemory。

内存淘汰执行流程,如下图所示:

image-20240604174034161

查询最大运行内存

我们可以使用命令 config get maxmemory 来查看设置的最大运行内存,命令如下:

1
2
3
127.0.0.1:6379> config get maxmemory
1) "maxmemory"
2) "0"

我们发现此值竟然是 0,这是 64 位操作系统默认的值,当 maxmemory 为 0 时,表示没有内存大小限制。

小贴士:32 位操作系统,默认的最大内存值是 3GB。

内存淘汰策略

查看 Redis 内存淘汰策略

我们可以使用 config get maxmemory-policy 命令,来查看当前 Redis 的内存淘汰策略,命令如下:

1
2
3
127.0.0.1:6379> config get maxmemory-policy
1) "maxmemory-policy"
2) "noeviction"

可以看出此 Redis 使用的是 noeviction 类型的内存淘汰机制,它表示当运行内存超过最大设置内存时,不淘汰任何数据,但新增操作会报错。

内存淘汰策略分类

早期版本的 Redis 有以下 6 种淘汰策略:

  1. noeviction:不淘汰任何数据,当内存不足时,新增操作会报错,Redis 默认内存淘汰策略;
  2. allkeys-lru:淘汰整个键值中最久未使用的键值;
  3. allkeys-random:随机淘汰任意键值;
  4. volatile-lru:淘汰所有设置了过期时间的键值中最久未使用的键值;
  5. volatile-random:随机淘汰设置了过期时间的任意键值;
  6. volatile-ttl:优先淘汰更早过期的键值。

在 Redis 4.0 版本中又新增了 2 种淘汰策略:

  1. volatile-lfu:淘汰所有设置了过期时间的键值中,最少使用的键值;
  2. allkeys-lfu:淘汰整个键值中最少使用的键值。

其中 allkeys-xxx 表示从所有的键值中淘汰数据,而 volatile-xxx 表示从设置了过期键的键值中淘汰数据。

修改 Redis 内存淘汰策略

设置内存淘汰策略有两种方法,这两种方法各有利弊,需要使用者自己去权衡。

  • 方式一:通过“config set maxmemory-policy 策略”命令设置。它的优点是设置之后立即生效,不需要重启 Redis 服务,缺点是重启 Redis 之后,设置就会失效。
  • 方式二:通过修改 Redis 配置文件修改,设置“maxmemory-policy 策略”,它的优点是重启 Redis 服务后配置不会丢失,缺点是必须重启 Redis 服务,设置才能生效。

内存淘汰算法

从内测淘汰策略分类上,我们可以得知,除了随机删除和不删除之外,主要有两种淘汰算法:LRU 算法和 LFU 算法。

LRU 算法

LRU 全称是 Least Recently Used 译为最近最少使用,是一种常用的页面置换算法,选择最近最久未使用的页面予以淘汰。

1. LRU 算法实现

LRU 算法需要基于链表结构,链表中的元素按照操作顺序从前往后排列,最新操作的键会被移动到表头,当需要内存淘汰时,只需要删除链表尾部的元素即可。

2. 近 LRU 算法

Redis 使用的是一种近似 LRU 算法,目的是为了更好的节约内存,它的实现方式是给现有的数据结构添加一个额外的字段,用于记录此键值的最后一次访问时间,Redis 内存淘汰时,会使用随机采样的方式来淘汰数据,它是随机取 5 个值(此值可配置),然后淘汰最久没有使用的那个。

3. LRU 算法缺点

LRU 算法有一个缺点,比如说很久没有使用的一个键值,如果最近被访问了一次,那么它就不会被淘汰,即使它是使用次数最少的缓存,那它也不会被淘汰,因此在 Redis 4.0 之后引入了 LFU 算法,下面我们一起来看。

LFU 算法

LFU 全称是 Least Frequently Used 翻译为最不常用的,最不常用的算法是根据总访问次数来淘汰数据的,它的核心思想是“如果数据过去被访问多次,那么将来被访问的频率也更高”。

LFU 解决了偶尔被访问一次之后,数据就不会被淘汰的问题,相比于 LRU 算法也更合理一些。

在 Redis 中每个对象头中记录着 LFU 的信息,源码如下:

1
2
3
4
5
6
7
8
9
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount;
void *ptr;
} robj;

在 Redis 中 LFU 存储分为两部分,16 bit 的 ldt(last decrement time)和 8 bit 的 logc(logistic counter)。

  1. logc 是用来存储访问频次,8 bit 能表示的最大整数值为 255,它的值越小表示使用频率越低,越容易淘汰;
  2. ldt 是用来存储上一次 logc 的更新时间。

小结

通过本文我们了解到,Redis 内存淘汰策略和过期回收策略是完全不同的概念,内存淘汰策略是解决 Redis 运行内存过大的问题的,通过与 maxmemory 比较,决定要不要淘汰数据,根据 maxmemory-policy 参数,决定使用何种淘汰策略,在 Redis 4.0 之后已经有 8 种淘汰策略了,默认的策略是 noeviction 当内存超出时不淘汰任何键值,只是新增操作会报错。

24 消息队列——发布订阅模式

在 Redis 中提供了专门的类型:Publisher(发布者)和 Subscriber(订阅者)来实现消息队列。

在文章开始之前,先来介绍消息队列中有几个基础概念,以便大家更好的理解本文的内容。

首先,发布消息的叫做发布方或发布者,也就是消息的生产者,而接收消息的叫做消息的订阅方或订阅者,也就是消费者,用来处理生产者发布的消息。

image-20240604190138707

除了发布和和订阅者,在消息队列中还有一个重要的概念:channel 意为频道或通道,可以理解为某个消息队列的名称,首先消费者先要订阅某个 channel,然后当生产者把消息发送到这个 channel 中时,消费者就可以正常接收到消息了,如下图所示:

image-20240604190159465

普通订阅与发布

消息队列有两个重要的角色,一个是发送者,另一个就是订阅者,对应的命令如下:

  • 发布消息:publish channel “message”
  • 订阅消息:subscribe channel

下面我们来看具体的命令实现。

订阅消息

1
2
3
4
5
127.0.0.1:6379> subscribe channel #订阅消息channel
Reading messages...
1) "subscribe"
2) "channel"
3) (integer) 1

相关语法:

1
subscribe channel [channel ...]

此命令支持订阅一个或多个频道的命令,也就是说一个订阅者可以订阅多个频道。例如,某个客户端订阅了两个频道 channel 和 channel2,当两个发布者分别推送消息后,订阅者的信息输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
127.0.0.1:6379> subscribe channel channel2 #订阅 channel 和 channel2
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel"
3) (integer) 1
1) "subscribe"
2) "channel2"
3) (integer) 2
1) "message"
2) "channel" # 收到 channel 消息
3) "message 1."
1) "message"
2) "channel2" # 收到 channel2 消息
3) "message 2."

可以看出此订阅者可以收到来自两个频道的消息推送。

发送消息

1
2
127.0.0.1:6379> publish channel "hello,redis." #发布消息
(integer) 1

相关语法:

1
publish channel message

最后的返回值表示成功发送给几个订阅方,1 表示成功发给了一个订阅者,这个数字可以是 0~n,这是由订阅者的数量决定的。

例如,当有两个订阅者时,推送的结果为 2,如下代码所示。

订阅者一:

1
2
3
4
5
127.0.0.1:6379> subscribe channel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel"
3) (integer) 1

订阅者二:

1
2
3
4
5
127.0.0.1:6379> subscribe channel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel"
3) (integer) 1

发送消息:

1
2
127.0.0.1:6379> publish channel "message"
(integer) 2

可以看出,此消息已成功发给两个订阅者,结果也变成 2 了。

主题订阅

上面介绍了普通的订阅与发布模式,但如果我要订阅某一个类型的消息就不适用了,例如我要订阅日志类的消息队列,它们的命名都是 logXXX,这个时候就需要使用 Redis 提供的另一个功能 Pattern Subscribe 主题订阅,这种方式可以使用 * 来匹配多个频道,如下图所示:

image-20240604190215916

主题模式的具体实现代码如下,订阅者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
127.0.0.1:6379> psubscribe log_* #主题订阅 log_*
1) "psubscribe"
2) "log_*"
3) (integer) 1
1) "pmessage"
2) "log_*"
3) "log_user" #接收到频道 log_user 的消息推送
4) "user message."
1) "pmessage"
2) "log_*"
3) "log_sys" #接收到频道 log_sys 的消息推送
4) "sys message."
1) "pmessage"
2) "log_*"
3) "log_db" #接收到频道 log_db 的消息推送
4) "db message"

从上面的运行结果,可以看出使用命令 psubscribe log_* 可以接收到所有频道包含 log_XXX 的消息。

相关语法:

1
psubscribe pattern [pattern ...]

生产者的代码如下:

1
2
3
4
5
6
127.0.0.1:6379> publish log_user "user message."
(integer) 1
127.0.0.1:6379> publish log_sys "sys message."
(integer) 1
127.0.0.1:6379> publish log_db "db message"
(integer) 1

代码实战

下面我们使用 Jedis 实现普通的发布订阅模式和主题订阅的功能。

普通模式

消费者代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 消费者
*/
public static void consumer() {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 接收并处理消息
jedis.subscribe(new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
// 接收消息,业务处理
System.out.println("频道 " + channel + " 收到消息:" + message);
}
}, "channel");
}

生产者代码如下:

1
2
3
4
5
6
7
8
/**
* 生产者
*/
public static void producer() {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 推送消息
jedis.publish("channel", "Hello, channel.");
}

发布者和订阅者模式运行:

1
2
3
4
5
6
7
8
public static void main(String[] args) throws InterruptedException {
// 创建一个新线程作为消费者
new Thread(() -> consumer()).start();
// 暂停 0.5s 等待消费者初始化
Thread.sleep(500);
// 生产者发送消息
producer();
}

以上代码运行结果如下:

1
频道 channel 收到消息:Hello, channel.

主题订阅模式

主题订阅模式的生产者的代码是一样,只有消费者的代码是不同的,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 主题订阅
*/
public static void pConsumer() {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 主题订阅
jedis.psubscribe(new JedisPubSub() {
@Override
public void onPMessage(String pattern, String channel, String message) {
// 接收消息,业务处理
System.out.println(pattern + " 主题 | 频道 " + channel + " 收到消息:" + message);
}
}, "channel*");
}

主题模式运行代码如下:

1
2
3
4
5
6
7
8
public static void main(String[] args) throws InterruptedException {
// 主题订阅
new Thread(() -> pConsumer()).start();
// 暂停 0.5s 等待消费者初始化
Thread.sleep(500);
// 生产者发送消息
producer();
}

以上代码运行结果如下:

1
channel* 主题 | 频道 channel 收到消息:Hello, channel.

注意事项

发布订阅模式存在以下两个缺点:

  1. 无法持久化保存消息,如果 Redis 服务器宕机或重启,那么所有的消息将会丢失;
  2. 发布订阅模式是“发后既忘”的工作模式,如果有订阅者离线重连之后不能消费之前的历史消息。

然而这些缺点在 Redis 5.0 添加了 Stream 类型之后会被彻底的解决。

除了以上缺点外,发布订阅模式还有另一个需要注意问题:当消费端有一定的消息积压时,也就是生产者发送的消息,消费者消费不过来时,如果超过 32M 或者是 60s 内持续保持在 8M 以上,消费端会被强行断开,这个参数是在配置文件中设置的,默认值是 client-output-buffer-limit pubsub 32mb 8mb 60

小结

本文介绍了消息队列的几个名词,生产者、消费者对应的就是消息的发送者和接收者,也介绍了发布订阅模式的三个命令:

  • subscribe channel 普通订阅
  • publish channel message 消息推送
  • psubscribe pattern 主题订阅

使用它们之后就可以完成单个频道和多个频道的消息收发,但发送与订阅模式也有一些缺点,比如“发后既忘”和不能持久化等问题,然而这些问题会等到 Stream 类型的出现而得到解决,关于更多 Stream 的内容后面文章会详细介绍。

25 消息队列的其他实现方式

在 Redis 5.0 之前消息队列的实现方式有很多种,比较常见的除了我们上文介绍的发布订阅模式,还有两种:List 和 ZSet 的实现方式。

List 和 ZSet 的方式解决了发布订阅模式不能持久化的问题,但这两种方式也有自己的缺点,接下来我们一起来了解一下,先从 List 实现消息队列的方式说起。

List 版消息队列

List 方式是实现消息队列最简单和最直接的方式,它主要是通过 lpush 和 rpop 存入和读取实现消息队列的,如下图所示:

image-20240604190256339

List 使用命令的方式实现消息队列:

1
2
3
4
5
6
7
8
127.0.0.1:6379> lpush mq "hello" #推送消息 hello
(integer) 1
127.0.0.1:6379> lpush mq "msg" #推送消息 msg
(integer) 2
127.0.0.1:6379> rpop mq #接收到消息 hello
"hello"
127.0.0.1:6379> rpop mq #接收到消息 msg
"mq"

其中,mq 就相当于频道名称 channel,而 lpush 用于生产消息, rpop 拉取消息。

代码实现

接下来我们用 Java 代码的方式来实现 List 形式的消息队列,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import redis.clients.jedis.Jedis;

public class ListMQExample {
public static void main(String[] args){
// 消费者
new Thread(() -> consumer()).start();
// 生产者
producer();
}
/**
* 生产者
*/
public static void producer() {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 推送消息
jedis.lpush("mq", "Hello, List.");
}
/**
* 消费者
*/
public static void consumer() {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 消费消息
while (true) {
// 获取消息
String msg = jedis.rpop("mq");
if (msg != null) {
// 接收到了消息
System.out.println("接收到消息:" + msg);
}
}
}
}

以上程序的运行结果是:

1
接收到消息:Hello, List.

我们使用无限循环来获取队列中的数据,这样就可以实时地获取相关信息了,但这样会带来另一个新的问题,当队列中如果没有数据的情况下,无限循环会一直消耗系统的资源,这时候我们可以使用 brpop 替代 rpop 来完美解决这个问题。

b 是 blocking 的缩写,表示阻塞读,也就是当队列没有数据时,它会进入休眠状态,当有数据进入队列之后,它才会“苏醒”过来执行读取任务,这样就可以解决 while 循环一直执行消耗系统资源的问题了,改良版代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import redis.clients.jedis.Jedis;

public class ListMQExample {
public static void main(String[] args) throws InterruptedException {
// 消费者 改良版
new Thread(() -> bConsumer()).start();
// 生产者
producer();
}
/**
* 生产者
*/
public static void producer() throws InterruptedException {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 推送消息
jedis.lpush("mq", "Hello, List.");
Thread.sleep(1000);
jedis.lpush("mq", "message 2.");
Thread.sleep(2000);
jedis.lpush("mq", "message 3.");
}
/**
* 消费者(阻塞版)
*/
public static void bConsumer() {
Jedis jedis = new Jedis("127.0.0.1", 6379);
while (true) {
// 阻塞读
for (String item : jedis.brpop(0,"mq")) {
// 读取到相关数据,进行业务处理
System.out.println(item);
}
}
}
}

其中,brpop() 方法的第一个参数是设置超时时间的,设置 0 表示一直阻塞。

优缺点分析

List 优点:

  • 消息可以被持久化,借助 Redis 本身的持久化(AOF、RDB 或者是混合持久化),可以有效的保存数据;
  • 消费者可以积压消息,不会因为客户端的消息过多而被强行断开。

List 缺点:

  • 消息不能被重复消费,一个消息消费完就会被删除;
  • 没有主题订阅的功能。

ZSet 版消息队列

ZSet 版消息队列相比于之前的两种方式,List 和发布订阅方式在实现上要复杂一些,但 ZSet 因为多了一个 score(分值)属性,从而使它具备更多的功能,例如我们可以用它来存储时间戳,以此来实现延迟消息队列等。

它的实现思路和 List 相同也是利用 zadd 和 zrangebyscore 来实现存入和读取,这里就不重复叙述了,读者可以根据 List 的实现方式来实践一下,看能不能实现相应的功能,如果写不出来也没关系,本课程的后面章节,介绍延迟队列的时候会用 ZSet 来实现。

优缺点分析

ZSet 优点:

  • 支持消息持久化;
  • 相比于 List 查询更方便,ZSet 可以利用 score 属性很方便的完成检索,而 List 则需要遍历整个元素才能检索到某个值。

ZSet 缺点:

  • ZSet 不能存储相同元素的值,也就是如果有消息是重复的,那么只能插入一条信息在有序集合中;
  • ZSet 是根据 score 值排序的,不能像 List 一样,按照插入顺序来排序;
  • ZSet 没有向 List 的 brpop 那样的阻塞弹出的功能。

小结

本文介绍了消息队列的另外两种实现方式 List 和 ZSet,它们都是利用自身方法,先把数据放到队列里,在使用无限循环读取队列中的消息,以实现消息队列的功能,相比发布订阅模式本文介绍的这两种方式的优势是支持持久化,当然它们各自都存在一些问题,所以期待下一课时 Stream 的出现能够解决这些问题。

26 消息队列终极解决方案——Stream(上)

基础使用

Stream 既然是一个数据类型,那么和其他数据类型相似,它也有一些自己的操作方法,例如:

  • xadd 添加消息;
  • xlen 查询消息长度;
  • xdel 根据消息 ID 删除消息;
  • del 删除整个 Stream;
  • xrange 读取区间消息
  • xread 读取某个消息之后的消息。

具体使用如下所述。

添加消息

1
2
127.0.0.1:6379> xadd key * name redis age 10
"1580880750844-0" #结果返回的是消息 id

其中 * 表示使用 Redis 的规则:时间戳 + 序号的方式自动生成 ID,用户也可以自己指定 ID。

相关语法:

1
xadd key ID field string [field string ...]

查询消息的长度

1
2
127.0.0.1:6379> xlen key
(integer) 1

相关语法:

1
xlen key

删除消息

1
2
3
4
5
6
7
8
127.0.0.1:6379> xadd key * name redis
"1580881585129-0" #消息 ID
127.0.0.1:6379> xlen key
(integer) 1
127.0.0.1:6379> xdel key 1580881585129-0 #删除消息,根据 ID
(integer) 1
127.0.0.1:6379> xlen key
(integer) 0

相关语法:

1
xdel key ID [ID ...]

此命令支持删除一条或多条消息,根据消息 ID。

删除整个 Stream

1
2
3
4
127.0.0.1:6379> del key #删除整个 Stream
(integer) 1
127.0.0.1:6379> xlen key
(integer) 0

相关语法:

1
del key [key ...]

此命令支持删除一个或多个 Stream。

查询区间消息

1
2
3
4
5
6
7
8
9
10
11
127.0.0.1:6379> xrange mq - +
1) 1) "1580882060464-0"
2) 1) "name"
2) "redis"
3) "age"
4) "10"
2) 1) "1580882071524-0"
2) 1) "name"
2) "java"
3) "age"
4) "20"

其中:- 表示第一条消息,+ 表示最后一条消息。

相关语法:

1
xrange key start end [COUNT count]

查询某个消息之后的消息

1
2
3
4
5
6
7
127.0.0.1:6379> xread count 1 streams mq 1580882060464-0
1) 1) "mq"
2) 1) 1) "1580882071524-0"
2) 1) "name"
2) "java"
3) "age"
4) "20"

在名称为 mq 的 Stream 中,从消息 ID 为 1580882060464-0 的,往后查询一条消息。

相关语法:

1
xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

此命令提供了阻塞读的参数 block,我们可以使用它读取从当前数据以后新增数据,命令如下:

1
127.0.0.1:6379> xread count 1 block 0 streams mq $

其中 block 0 表示一直阻塞,$ 表示从最后开始读取,这个时候新开一个命令行插入一条数据,此命令展示的结果如下:

1
2
3
4
5
6
7
8
9
10
11
127.0.0.1:6379> xadd mq * name sql age 20 #新窗口添加数据
"1580890737890-0"
#阻塞读取到的新数据
127.0.0.1:6379> xread count 1 block 0 streams mq $
1) 1) "mq"
2) 1) 1) "1580890737890-0"
2) 1) "name"
2) "sql"
3) "age"
4) "20"
(36.37s)

基础版消息队列

使用 Stream 消费分组实现消息队列的功能和列表方式的消息队列比较相似,使用 xadd 命令和 xread 循环读取就可以实现基础版的消息队列,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import com.google.gson.Gson;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamExample {
public static void main(String[] args) throws InterruptedException {
// 消费者
new Thread(() -> consumer()).start();
Thread.sleep(1000);
// 生产者
producer();
}
/**
* 生产者
*/
public static void producer() throws InterruptedException {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 推送消息
Map<String, String> map = new HashMap<>();
map.put("name", "redis");
map.put("age", "10");
// 添加消息
StreamEntryID id = jedis.xadd("mq", null, map);
System.out.println("消息添加成功 ID:" + id);
}
/**
* 消费者
*/
public static void consumer() {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 消费消息
while (true) {
// 获取消息,new StreamEntryID().LAST_ENTRY 标识获取当前时间以后的新增消息
Map.Entry<String, StreamEntryID> entry = new AbstractMap.SimpleImmutableEntry<>("mq",
new StreamEntryID().LAST_ENTRY);
// 阻塞读取一条消息(最大阻塞时间120s)
List<Map.Entry<String, List<StreamEntry>>> list = jedis.xread(1, 120 * 1000, entry);
if (list.size() == 1) {
// 读取到消息
System.out.println("读取到消息 ID:" + list.get(0).getValue().get(0).getID());
// 使用 Gson 来打印 JSON 格式的消息内容
System.out.println("内容:" + new Gson().toJson(list.get(0).getValue().get(0).getFields()));
}
}
}
}

以上代码运行结果如下:

1
2
3
消息添加成功 ID:1580895735148-0
读取到消息 ID:1580895735148-0
内容:{"name":"redis","age":"10"}

以上代码需要特殊说明的是,我们使用 new StreamEntryID().LAST_ENTRY 来实现读取当前时间以后新增的消息,如果要从头读取历史消息把这行代码中的 .LAST_ENTRY 去掉即可。

还有一点需要注意,在 Jedis 框架中如果使用 jedis.xread() 方法来阻塞读取消息队列,第二个参数 long block 必须设置大于 0,如果设置小于 0,此阻塞条件就无效了,我查看了 jedis 的源码发现,它只有判断在大于 0 的时候才会设置阻塞属性,源码如下:

1
2
3
4
if (block > 0L) {
params[streamsIndex++] = Keyword.BLOCK.raw;
params[streamsIndex++] = Protocol.toByteArray(block);
}

所以 block 属性我们可以设置一个比较大的值来阻塞读取消息。

所谓的阻塞读取消息指的是当队列中没有数据时会进入休眠模式,等有数据之后才会唤醒继续执行。

小结

本文介绍了 Stream 的基础方法,并使用 xadd 存入消息和 xread 循环阻塞读取消息的方式实现了简易版的消息队列,交互流程如下图所示:

image-20240604192242356

27 消息队列终极解决方案——Stream(下)

在开始使用消息分组之前,我们必须手动创建分组才行,以下是几个和 Stream 分组有关的命令,我们先来学习一下它的使用。

消息分组命令

创建消费者群组

1
2
127.0.0.1:6379> xgroup create mq group1 0-0 
OK

相关语法:

1
xgroup create stream-key group-key ID

其中:

  • mq 为 Stream 的 key;
  • group1 为分组的名称;
  • 0-0 表示从第一条消息开始读取。

如果要从当前最后一条消息向后读取,使用 $ 即可,命令如下:

1
2
127.0.0.1:6379> xgroup create mq group2 $
OK

读取消息

1
2
3
4
5
6
7
127.0.0.1:6379> xreadgroup group group1 c1 count 1 streams mq >
1) 1) "mq"
2) 1) 1) "1580959593553-0"
2) 1) "name"
2) "redis"
3) "age"
4) "10"

相关语法:

1
xreadgroup group group-key consumer-key streams stream-key

其中:

  • > 表示读取下一条消息;
  • group1 表示分组名称;
  • c1 表示 consumer(消费者)名称。

xreadgroup 命令和 xread 使用类似,也可以设置阻塞读取,命令如下:

1
2
3
4
5
6
7
8
9
10
127.0.0.1:6379> xreadgroup group group1 c2 streams mq >
1) 1) "mq"
2) 1) 1) "1580959606181-0"
2) 1) "name"
2) "java"
3) "age"
4) "20"
127.0.0.1:6379> xreadgroup group group1 c2 streams mq >
(nil) #队列中的消息已经被读取完
127.0.0.1:6379> xreadgroup group group1 c1 count 1 block 0 streams mq > #阻塞读取

此时打开另一个命令行创建使用 xadd 添加一条消息,阻塞命令执行结果如下:

1
2
3
4
5
6
7
8
127.0.0.1:6379> xreadgroup group group1 c1 count 1 block 0 streams mq >
1) 1) "mq"
2) 1) 1) "1580961475368-0"
2) 1) "name"
2) "sql"
3) "age"
4) "20"
(86.14s)

消息消费确认

接收到消息之后,我们要手动确认一下(ack),命令如下:

1
2
127.0.0.1:6379> xack mq group1 1580959593553-0
(integer) 1

相关语法:

1
xack key group-key ID [ID ...]

消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 ack 确认消息已经被消费完成,整个流程的执行如下图所示:

image-20240604192818740

查询未确认的消费队列

1
2
3
4
5
6
7
8
9
10
11
12
13
127.0.0.1:6379> xpending mq group1
1) (integer) 1 #未确认(ack)的消息数量为 1 条
2) "1580994063971-0"
3) "1580994063971-0"
4) 1) 1) "c1"
2) "1"
127.0.0.1:6379> xack mq group1 1580994063971-0 #消费确认
(integer) 1
127.0.0.1:6379> xpending mq group1
1) (integer) 0 #没有未确认的消息
2) (nil)
3) (nil)
4) (nil)

xinfo 查询相关命令

1. 查询流信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
127.0.0.1:6379> xinfo stream mq
1) "length"
2) (integer) 2 #队列中有两个消息
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "groups"
8) (integer) 1 #一个消费分组
9) "last-generated-id"
10) "1580959606181-0"
11) "first-entry"
12) 1) "1580959593553-0"
2) 1) "name"
2) "redis"
3) "age"
4) "10"
13) "last-entry"
14) 1) "1580959606181-0"
2) 1) "name"
2) "java"
3) "age"
4) "20"

相关语法:

1
xinfo stream stream-key

2. 查询消费组消息

1
2
3
4
5
6
7
8
9
127.0.0.1:6379> xinfo groups mq
1) 1) "name"
2) "group1" #消息分组名称
3) "consumers"
4) (integer) 1 #一个消费者客户端
5) "pending"
6) (integer) 1 #一个未确认消息
7) "last-delivered-id"
8) "1580959593553-0" #读取的最后一条消息 ID

相关语法:

1
xinfo groups stream-key

3. 查看消费者组成员信息

1
2
3
4
5
6
7
127.0.0.1:6379> xinfo consumers mq group1
1) 1) "name"
2) "c1" #消费者名称
3) "pending"
4) (integer) 0 #未确认消息
5) "idle"
6) (integer) 481855

相关语法:

1
xinfo consumers stream group-key

删除消费者

1
2
127.0.0.1:6379> xgroup delconsumer mq group1 c1
(integer) 1

相关语法:

1
xgroup delconsumer stream-key group-key consumer-key

删除消费组

1
2
127.0.0.1:6379> xgroup destroy mq group1
(integer) 1

相关语法:

1
xgroup destroy stream-key group-key

代码实战

接下来我们使用 Jedis 来实现 Stream 分组消息队列,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import com.google.gson.Gson;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import utils.JedisUtils;

import java.util.AbstractMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamGroupExample {
private static final String _STREAM_KEY = "mq"; // 流 key
private static final String _GROUP_NAME = "g1"; // 分组名称
private static final String _CONSUMER_NAME = "c1"; // 消费者 1 的名称
private static final String _CONSUMER2_NAME = "c2"; // 消费者 2 的名称
public static void main(String[] args) {
// 生产者
producer();
// 创建消费组
createGroup(_STREAM_KEY, _GROUP_NAME);
// 消费者 1
new Thread(() -> consumer()).start();
// 消费者 2
new Thread(() -> consumer2()).start();
}
/**
* 创建消费分组
* @param stream 流 key
* @param groupName 分组名称
*/
public static void createGroup(String stream, String groupName) {
Jedis jedis = JedisUtils.getJedis();
jedis.xgroupCreate(stream, groupName, new StreamEntryID(), true);
}
/**
* 生产者
*/
public static void producer() {
Jedis jedis = JedisUtils.getJedis();
// 添加消息 1
Map<String, String> map = new HashMap<>();
map.put("data", "redis");
StreamEntryID id = jedis.xadd(_STREAM_KEY, null, map);
System.out.println("消息添加成功 ID:" + id);
// 添加消息 2
Map<String, String> map2 = new HashMap<>();
map2.put("data", "java");
StreamEntryID id2 = jedis.xadd(_STREAM_KEY, null, map2);
System.out.println("消息添加成功 ID:" + id2);
}
/**
* 消费者 1
*/
public static void consumer() {
Jedis jedis = JedisUtils.getJedis();
// 消费消息
while (true) {
// 读取消息
Map.Entry<String, StreamEntryID> entry = new AbstractMap.SimpleImmutableEntry<>(_STREAM_KEY,
new StreamEntryID().UNRECEIVED_ENTRY);
// 阻塞读取一条消息(最大阻塞时间120s)
List<Map.Entry<String, List<StreamEntry>>> list = jedis.xreadGroup(_GROUP_NAME, _CONSUMER_NAME, 1,
120 * 1000, true, entry);
if (list != null && list.size() == 1) {
// 读取到消息
Map<String, String> content = list.get(0).getValue().get(0).getFields(); // 消息内容
System.out.println("Consumer 1 读取到消息 ID:" + list.get(0).getValue().get(0).getID() +
" 内容:" + new Gson().toJson(content));
}
}
}
/**
* 消费者 2
*/
public static void consumer2() {
Jedis jedis = JedisUtils.getJedis();
// 消费消息
while (true) {
// 读取消息
Map.Entry<String, StreamEntryID> entry = new AbstractMap.SimpleImmutableEntry<>(_STREAM_KEY,
new StreamEntryID().UNRECEIVED_ENTRY);
// 阻塞读取一条消息(最大阻塞时间120s)
List<Map.Entry<String, List<StreamEntry>>> list = jedis.xreadGroup(_GROUP_NAME, _CONSUMER2_NAME, 1,
120 * 1000, true, entry);
if (list != null && list.size() == 1) {
// 读取到消息
Map<String, String> content = list.get(0).getValue().get(0).getFields(); // 消息内容
System.out.println("Consumer 2 读取到消息 ID:" + list.get(0).getValue().get(0).getID() +
" 内容:" + new Gson().toJson(content));
}
}
}
}

以上代码运行结果如下:

1
2
3
4
消息添加成功 ID:1580971482344-0
消息添加成功 ID:1580971482415-0
Consumer 1 读取到消息 ID:1580971482344-0 内容:{"data":"redis"}
Consumer 2 读取到消息 ID:1580971482415-0 内容:{"data":"java"}

其中,jedis.xreadGroup() 方法的第五个参数 noAck 表示是否自动确认消息,如果设置 true 收到消息会自动确认(ack)消息,否则则需要手动确认。

注意:Jedis 框架要使用最新版,低版本 block 设置大于 0 时,会有 bug 抛连接超时异常。

可以看出,同一个分组内的多个 consumer 会读取到不同消息,不同的 consumer 不会读取到分组内的同一条消息。

小结

本文我们介绍了 Stream 分组的相关知识,使用 Jedis 的 xreadGroup() 方法实现了消息的阻塞读取,并且使用此方法自带 noAck 参数,实现了消息的自动确认,通过本文我们也知道了,一个分组内的多个 consumer 会轮询收到消息队列的消息,并且不会出现一个消息被多个 consumer 读取的情况。

Redis默认的数据逐出策略是什么?