在开发项目中经常使用限流算法,这里将常用限流算法进行总结。

计数器限流

在一段时间间隔内,处理请求的最大数量固定,超过部分不做处理。

其原理是:通过维护一个单位时间内的计数值,每当一个请求通过时,就将计数值加1,当计数值超过预先设定的阈值时,就拒绝单位时间内的其他请求。如果单位时间已经结束,则将计数器清零,开启下一轮的计数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import java.util.Random;

public class Counter {

//时间窗口
private final int interval = 1000;

//时间窗口内的阈值
private final int limit = 5;

private long lastTime = System.currentTimeMillis();

private int counter = 0;

public boolean tryAcquire() {

if (System.currentTimeMillis() < lastTime + interval) {
// 在时间窗口内
counter++;
} else {
//超过时间窗口充值重置counter
lastTime = System.currentTimeMillis();
counter = 1;
}
return counter <= limit;
}


public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();
while (true) {
if (counter.tryAcquire()) {
System.out.println("进行请求");
} else {
System.out.println("限流了。。。。");
}
Thread.sleep(100 * new Random().nextInt(5));
}

}
}

但是,计数器存在临界值的问题:

假设系统的限流规则设定为“1秒内最多允许100个请求”,如果用户在第一个时间窗口的最后几毫秒发送了 99 个请求,然后紧接着在下一个时间窗口的开始发送了 99 个请求,那么用户在短时间内(接近 1 秒)发送了 198 个请求,但由于这些请求跨越了两个不同的时间窗口,限流系统并不会阻止这种情况。

滑动时间窗口算法

滑动时间窗口算法就是为了解决上述固定时间窗口存在的临界值问题而诞生。要解决这种临界值问题,显然只用一个窗口是解决不了问题的。假设我们仍然设定1秒内允许通过的请求是200个,但是在这里我们需要把1秒的时间分成多格,假设分成5格(格数越多,流量过渡越平滑),每格窗口的时间大小是200毫秒,每过200毫秒,就将窗口向前移动一格。为了便于理解,可以看下图

image-20240928195740966

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import java.util.LinkedList;
import java.util.Random;

public class MovingWindow {

//时间窗口/ms
private final int interval = 1000;

//时间窗口内的阈值
private final int limit = 5;

//分割窗口个数
private int slotCount = 5;

private LinkedList<Node> slot = new LinkedList<Node>();

public MovingWindow() {
new Thread(() -> {
while (true) {
// 每过200毫秒,就将窗口向前移动一格
if (slot.size() == slotCount) {
slot.poll();
}
slot.offer(new Node(System.currentTimeMillis()));
try {
Thread.sleep(interval / slotCount);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

}

public boolean tryAcquire() {
Node currWindow = getCurrWindow();
currWindow.setCount(currWindow.getCount() + 1);
return getCounter() <= limit;
}

private int getCounter() {
return slot.stream().mapToInt(Node::getCount).sum();
}

private Node getCurrWindow() {
if (slot.isEmpty()) {
while (true) {
if (slot.isEmpty()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else break;
}
}
return slot.getLast();
}


private class Node {

private int count;

private long time;

public Node(long time) {
this.time = time;
}

public int getCount() {
return count;
}

public void setCount(int count) {
this.count = count;
}

public long getTime() {
return time;
}

public void setTime(long time) {
this.time = time;
}
}


public static void main(String[] args) throws InterruptedException {
MovingWindow counter = new MovingWindow();
while (true) {
counter.slot.stream().forEach(node -> System.out.print(node.getTime() + ":" + node.getCount() + "|"));
if (counter.tryAcquire()) {
System.out.println("进行请求");
} else {
System.out.println("限流了。。。。");
}
Thread.sleep(100 * new Random().nextInt(5));
}
}
}

临界值问题

假设我们有一个 API 接口,限流规则是每分钟最多处理 100 个请求,并采用滑动时间窗口来进行限流控制。

  • 滑动时间窗口:统计过去 60 秒内的请求数,允许每分钟最多处理 100 个请求。
  • 时间窗口滑动频率:每秒滑动一次,实时监控过去 60 秒内的请求数量。

临界值问题的具体示例:

  1. 第 1 秒:用户发送了 100 个请求,这 100 个请求符合每分钟 100 个请求的限流规则。
  2. 第 59 秒:用户再次发送 100 个请求。由于滑动窗口统计的是过去 60 秒内的请求,因此这 100 个请求只会覆盖第 1 秒到第 59 秒内的请求。
  3. 第 60 秒:滑动窗口滑动,此时统计的是第 2 秒到第 60 秒的请求,因此用户可以再次发送 100 个请求。

临界点效应:

  • 用户在第 1 秒第 59 秒分别发送了 100 个请求,滑动时间窗口的总计是 200 个请求,但由于它们处于不同的统计时间窗口内,系统不会触发限流。
  • 因此,尽管限流规则是每分钟 100 个请求,但在临界点的边界时刻(窗口的开头和结尾),用户可以在短短 2 秒钟内发送200 个请求

image-20240928201150945

在滑动窗口的边界时刻(第 59 秒和第 60 秒),短时间内可以发送 200 个请求,而不会违反限流规则。

漏桶限流

漏桶大小固定,处理速度固定,但请求进入速度不固定(在突发情况请求过多时,会丢弃过多的请求)。

原理

漏桶算法以一个常量限制了出口流量速率,因此漏桶算法可以平滑突发的流量。其中漏桶作为流量容器我们可以看做一个FIFO的队列,当入口流量速率大于出口流量速率时,因为流量容器是有限的,超出的流量会被丢弃。

下图比较形象的说明了漏桶算法的原理,其中水滴是入口流量,漏桶是流量容器,匀速流出的水是出口流量。

image-20240928201224643

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;

public class Funnel {

//出口流量速率 1s 10个
private int rate = 10;

//漏桶
private ArrayBlockingQueue bucket;

public Funnel(int rate, int capacity) {
this.rate = rate;
this.bucket = new ArrayBlockingQueue(capacity);
int speed = 1000 / this.rate;
//固定速率滴水
new Thread(() -> {
while (true) {
bucket.poll();
try {
Thread.sleep(speed);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}

public boolean tryAcquire() {
// 漏桶里面放水
return bucket.offer(this);
}

public static void main(String[] args) throws InterruptedException {
Funnel funnel = new Funnel(10, 100);
while (true) {
if (funnel.tryAcquire()) {
System.out.println("进行请求");
} else {
System.out.println("限流了。。。。");
}
Thread.sleep(20 * new Random().nextInt(5));
}
}

}

因为漏桶算法的流出速率是固定的,所以漏桶算法不支持出现突发流出流量。但是在实际情况下,流量往往是突发的。

令牌桶限流

令牌桶的大小固定,令牌的产生速度固定,但是消耗令牌(即请求)速度不固定(可以应对一些某些时间请求过多的情况);每个请求都会从令牌桶中取出令牌,如果没有令牌则丢弃该次请求。

原理:

令牌桶算法是如何支持突发流量的呢?最开始,令牌桶是空的,我们以恒定速率往令牌桶里加入令牌,令牌桶被装满时,多余的令牌会被丢弃。当请求到来时,会先尝试从令牌桶获取令牌(相当于从令牌桶移除一个令牌),获取成功则请求被放行,获取失败则阻塞或拒绝请求。那么当突发流量来临时,只要令牌桶有足够的令牌,就不会被限流。

主要思想如下:

系统按照设定的速率向“桶”里添加令牌。
每次请求都需要从桶里取走一个令牌,只有当桶里有足够的令牌时,才允许请求通过。
如果桶里没有足够的令牌,请求会被拒绝或者阻塞等待。
桶的容量是有限的,当令牌生成的速度超过请求消耗的速度时,桶会装满,多余的令牌会被丢弃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class TokenBucketRateLimiter {

private final int capacity; // 令牌桶容量
private final int rate; // 令牌生成速率(每秒生成多少个令牌)
private AtomicInteger tokens; // 当前令牌数量
private long lastRefillTime; // 上次填充令牌的时间
private final Lock lock = new ReentrantLock(); // 锁,用于线程安全

public TokenBucketRateLimiter(int capacity, int rate) {
this.capacity = capacity;
this.rate = rate;
this.tokens = new AtomicInteger(capacity);
this.lastRefillTime = System.currentTimeMillis();
}

public boolean tryAcquire() {
lock.lock();
try {
refillTokens();
if (tokens.get() > 0) {
tokens.decrementAndGet();
return true;
} else {
return false;
}
} finally {
lock.unlock();
}
}

private void refillTokens() {
long currentTime = System.currentTimeMillis();
long elapsedTime = currentTime - lastRefillTime;
int newTokens = (int) (elapsedTime * rate / 1000);

if (newTokens > 0) {
tokens.set(Math.min(capacity, tokens.get() + newTokens));
lastRefillTime = currentTime;
}
}

public static void main(String[] args) {
TokenBucketRateLimiter rateLimiter = new TokenBucketRateLimiter(10, 5); // 令牌桶容量10,每秒生成5个令牌

for (int i = 0; i < 20; i++) {
if (rateLimiter.tryAcquire()) {
System.out.println("Request " + (i + 1) + " allowed");
} else {
System.out.println("Request " + (i + 1) + " rejected");
}

try {
TimeUnit.MILLISECONDS.sleep(200); // 模拟请求间隔
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

之前在校园论坛的项目中实现的。

image-20240928201902230

Guava RateLimiter也有提供~

Guava 是由 Google 开发并维护的一个开源的 Java 工具库,提供了大量常用的工具类和实用功能,帮助开发者简化代码编写,提升代码质量。Guava 是许多 Java 开发者的首选工具库之一,广泛应用于各种 Java 项目中。

总结

限流算法 原理 适用场景 优点 缺点
漏桶算法 请求进入漏桶,以固定速率流出,超出桶容量的请求被丢弃。 带宽管理网络流量整形:保证流量平稳,防止短时间内的突发流量压垮系统。 处理流量平稳,防止突发流量造成系统过载。 可能导致请求被丢弃,无法灵活处理突发流量。
令牌桶算法 请求需要获取令牌才能处理,令牌按固定速率生成,超出令牌桶容量的请求被丢弃。 API 限流请求限流:适用于允许突发流量的场景,如接口调用的限流。 支持突发流量,能够灵活处理短时间内的流量高峰。 令牌速率过低可能导致处理延迟,速率过高无法控制流量。
计数器算法 在固定时间窗口内记录请求数,超出限制的请求被拒绝。 简单 API 限流:如每日限额、每分钟限额等,适用于对流量限制较为简单的场景。 实现简单,逻辑清晰,适合短时间内的流量限制。 不能处理突发流量,流量可能集中在时间窗的边界。
滑动窗口计数 通过滑动时间窗口记录请求数,超出限制的请求被拒绝。 较灵活的限流:适用于需要平滑流量限制的场景,如需要更精细化的流量控制。 处理突发流量更加平滑,减少时间窗边界问题。 实现复杂度相对较高,计算量较大。
限速队列 通过将请求排队并延迟处理,控制请求的处理速率。 高并发系统:适用于需要控制请求速率的场景,如防止系统被瞬时高并发流量冲击。 易于实现,能够平滑处理请求,防止系统被瞬时流量压垮。 可能导致延迟过高,影响用户体验。
漏斗算法 通过设定定时“漏水”速率,限制单位时间内处理的请求数。 带宽控制流量整形:用于防止流量瞬时过大冲击系统。 易于实现,能够平稳处理请求,避免系统过载。 可能导致请求延迟或丢弃,无法应对突发流量。

适用场景

  1. 漏桶算法:适合网络流量整形带宽管理,可以平滑处理流量,防止突发流量对系统的冲击,常用于需要稳定流量的场景,如对某些服务的总带宽进行限制。
  2. 令牌桶算法:适合API 限流允许突发流量的系统,如某些系统允许短时间内处理大量请求,但超过一定限制后开始拒绝请求。典型场景包括开放 API 接口、短信发送接口等。
  3. 计数器算法:适合简单的 API 限流,如每分钟、每天的请求限额。实现较为简单,适用于流量控制需求不复杂的场景,如每天只允许用户访问某接口 100 次。
  4. 滑动窗口计数:适合需要平滑控制流量的场景,能够减少时间窗口的边界效应,适用于需要更精准速率控制的系统,如交易系统中控制每秒最多处理多少笔交易。
  5. 限速队列:适合高并发场景下的请求排队,如商品秒杀活动,能够通过排队机制避免高并发请求直接冲击后端服务,防止系统崩溃。
  6. 漏斗算法:用于带宽控制流量整形,适合需要对输入流量进行整形和调度的场景,如限制视频流的带宽,防止瞬时流量过大影响系统性能。

本文基于限流算法(计数器、滑动时间窗口、漏斗、令牌)原理以及代码实现

实战

前言

之前,在在线教育平台-快乐学习的项目中,有个课程搜索的功能,之前都通过ES进行关键词的搜索,但是对自适应的对搜索的热词限流,限流使用Redission的RateLimiter实现分布式限流。

实现思路

  1. Elasticsearch 搜索:关键词搜索交给 Elasticsearch 处理。
  2. 热词限流:通过 Redisson 的 RateLimiter 对每个搜索关键词进行限流。每个关键词的搜索请求速率超过预设值时,拒绝请求。
  3. 分布式限流:Redisson 的 RateLimiter 是基于 Redis 实现的分布式限流器,确保多个节点共享同一个限流状态。

技术选型

优点 描述 与其他限流机制的比较
分布式限流 Redisson 的 RateLimiter 基于 Redis,支持跨多个实例共享限流状态,确保在分布式系统中的一致性。 本地限流(如基于 JVM 内存的限流)仅适用于单实例,无法跨实例共享限流状态,容易导致限流不一致。
高可用性和持久性 Redis 的持久化和高可用特性确保限流状态在系统重启或故障后依然有效。 基于内存的限流无法持久化,系统重启后限流状态会丢失;其他限流方案(如 Nginx 限流)需要额外配置持久化机制。
灵活的速率控制 支持令牌桶算法,可以灵活设置请求频率,允许短时间内的突发请求,同时控制整体速率。 单纯的计数器限流无法处理突发流量,可能导致不必要的请求拒绝;其他限流方案(如 Guava RateLimiter)也支持令牌桶,但仅支持单机。
线程安全的分布式限流 通过 Redis 的原子操作保证在高并发下限流的线程安全性,确保多个实例中的限流状态一致。 本地限流工具在高并发下可能会出现竞态条件,导致限流不准确;而基于 Redis 的 Redisson 确保线程安全。
可扩展性 Redis 天然具有良好的扩展性,可通过集群扩展限流能力,适应业务增长和更高的并发量。 本地限流方案受限于单机资源,扩展性差;Nginx 等网关限流方案虽然支持分布式,但增加了配置复杂性和运维成本。
防止缓存层过载 通过限流控制请求频率,保护缓存层和数据库资源,避免高并发情况下缓存层的过载。 单纯依赖缓存(如 Redis)无法控制请求频率,缓存层本身可能成为瓶颈;RateLimiter 能有效减轻缓存层的压力。
防止恶意请求和资源滥用 限流机制能够有效防止爬虫、DDoS 等恶意请求,保护系统资源。 缓存、计数器等机制无法识别恶意请求,容易导致系统资源被滥用;而 Redisson 的限流能够有效应对这种情况。
易于集成和使用 Redisson 提供简单易用的 API,开发者可以快速将限流功能集成到业务代码中。 其他分布式限流方案(如自定义 Redis 限流)需要开发复杂的限流逻辑,而 Redisson 提供了开箱即用的限流功能,降低了开发和维护成本。
支持多种限流场景 支持多种限流粒度,如按用户、IP、全局等灵活配置限流策略,适应不同业务需求。 其他限流方案通常只能支持单一限流粒度,如按 IP 或按全局限流,缺乏灵活性。
与 Redis 生态系统无缝兼容 可以与 Redis 的其他功能(如缓存、分布式锁)结合使用,形成完整的分布式解决方案。 基于 Nginx 或其他网关的限流方案与 Redis 无法直接集成,需额外开发分布式功能;而 Redisson 与 Redis 的无缝集成,提供了更多的分布式扩展能力。

需求类比:

多次输错密码之后如何限制用户在规定时间内禁止再次登入.

当然,还有很多相同的需求捏

技术选型

Redis⽀持的Java客户端:Redisson、Jedis、lettuce …

  • 单机限流Guava RateLimiterResilience4jBucket4j,简单易用,适合单节点场景。
  • 分布式限流Redis + LuaNginx + Lua + RedisEnvoy,适合高并发、大规模分布式环境。
  • 网关层限流NginxEnvoy,适合在网关层进行流量控制,减少后端负载。

选用RateLimter原因

优势 描述 解释
简洁性 Redisson 的 RateLimiter 提供了简单易用的 API,减少了手动编写复杂限流逻辑的需求。 无需编写 Lua 脚本或管理 Redis 连接,API 直接支持分布式限流,开发效率高。
分布式支持 原生支持 Redis 集群,轻松实现全局分布式限流。 利用 Redis 的分布式特性,RateLimiter 能够保证在多个实例间共享限流规则,适合高并发分布式环境。
高性能和可扩展性 基于 Redis 的高性能存储,能够在高并发环境下快速响应,并天然支持水平扩展。 Redis 本身是高性能的内存存储引擎,Redisson 利用这一优势,能够轻松应对大规模用户请求。
原子操作保障 内置的原子性操作,保证限流操作的正确性与一致性。 无需手动处理复杂的分布式锁或竞态条件,Redisson 通过 Redis 实现了线程安全和原子性,避免了常见的限流边界问题。
无运维负担 使用 Redisson 只需依赖 Redis,减少了对其他组件(如 Nginx、Envoy)的运维投入。 既不需要额外配置 Lua 脚本,也不需要引入 Nginx 或 Envoy 等额外的运维复杂度,只要 Redis 可用,Redisson 的限流机制就能正常工作。
成熟的解决方案 Redisson 是一个成熟、经过充分测试的分布式框架。 相比手动编写 Lua 脚本或自定义方案,Redisson 提供的是经过社区广泛使用和测试的限流机制,减少了出错的可能性,可靠性更高。
可维护性 Redisson 提供了清晰的 API 和完善的文档,降低了维护成本。 相比手动维护 Lua 脚本或复杂的限流逻辑,Redisson 的代码易读易维护,长期维护成本低。

实现

关键步骤

  1. 初始化 Redisson 客户端
  2. 为每个关键词设置限流规则(通过 RateLimiter)。
  3. 在搜索请求中检查限流,如果超出限流则拒绝请求,否则通过 Elasticsearch 进行搜索。

1.引入依赖

首先,确保项目中引入了 Redisson 和 Elasticsearch 的相关依赖。

Maven 依赖:

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.17.6</version>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.10.2</version> <!-- 根据你的 ES 版本选择对应的依赖版本 -->
</dependency>

2. Redisson 配置

首先初始化 Redisson 客户端,用于后续的限流操作。

RedissonConfig.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;

public class RedissonConfig {

public RedissonClient getRedissonClient() {
// 配置 Redis 连接
Config config = new Config();
config.useSingleServer().setAddress("redis://localhost:6379");

// 创建 Redisson 客户端
return Redisson.create(config);
}
}

3. 实现搜索限流逻辑

使用 Redisson RateLimiter 对每个关键词进行限流,超出限流阈值则拒绝请求。

固定速率模式

SearchRateLimiter.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.redisson.api.RateIntervalUnit;
import org.redisson.api.RateLimiter;
import org.redisson.api.RateType;
import org.redisson.api.RedissonClient;

public class SearchRateLimiter {

private RedissonClient redissonClient;

public SearchRateLimiter(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}

/**
* 获取关键词对应的 RateLimiter
* @param keyword 搜索关键词
* @return RateLimiter
*/
private RateLimiter getRateLimiter(String keyword) {
// 获取或创建一个关键词的限流器
RateLimiter rateLimiter = redissonClient.getRateLimiter("search:keyword:" + keyword);

// 设置限流规则:每 60 秒内最多允许 10 次请求
rateLimiter.trySetRate(RateType.OVERALL, 10, 60, RateIntervalUnit.SECONDS);

return rateLimiter;
}

/**
* 尝试请求某个关键词的搜索
* @param keyword 搜索关键词
* @return 是否允许搜索
*/
public boolean trySearch(String keyword) {
RateLimiter rateLimiter = getRateLimiter(keyword);

// 尝试获取一个许可,如果失败则表示达到限流
return rateLimiter.tryAcquire();
}
}
  • getRateLimiter 方法:为每个关键词创建或获取一个 RateLimiter 对象。它使用 Redis 来存储和管理令牌桶。
  • trySetRate 方法:设置限流规则,确保每个关键词在 60 秒内最多能有 10 次请求

客户端模式

客户端级限流模式 中,每个客户端(或服务实例)都有各自独立的限流器。也就是说,不同客户端的请求不会相互影响,每个客户端都有自己的令牌桶,适合针对不同用户、租户、IP 地址等维度进行限流的场景。

场景:

  • IP 限流:对每个 IP 进行单独限流,防止某个 IP 短时间内发送过多请求。
  • 用户级限流:对每个用户单独限流,避免某个用户滥用系统资源。
  • 租户级限流:为每个租户分配独立的限流器,防止某个租户的流量影响其他租户的服务质量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import org.redisson.api.RateIntervalUnit;
import org.redisson.api.RateLimiter;
import org.redisson.api.RateType;
import org.redisson.api.RedissonClient;

public class SearchRateLimiter {

private RedissonClient redissonClient;

public SearchRateLimiter(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}

/**
* 获取关键词对应的 RateLimiter,为每个客户端(用户)单独设置速率
* @param keyword 搜索关键词
* @param userId 用户唯一标识符
* @return RateLimiter
*/
private RateLimiter getRateLimiter(String keyword, String userId) {
// 获取或创建一个关键词的限流器,基于用户ID
RateLimiter rateLimiter = redissonClient.getRateLimiter("search:keyword:" + keyword + ":user:" + userId);

// 设置限流规则:每个用户每 60 秒内最多允许 10 次请求
rateLimiter.trySetRate(RateType.PER_CLIENT, 10, 60, RateIntervalUnit.SECONDS);

return rateLimiter;
}

/**
* 尝试请求某个关键词的搜索
* @param keyword 搜索关键词
* @return 是否允许搜索
*/
public boolean trySearch(String keyword) {
String userId = UserContextHolder.getUserId(); // 获取当前用户ID
if (userId == null) {
throw new IllegalStateException("用户ID不能为空");
}

RateLimiter rateLimiter = getRateLimiter(keyword, userId);

// 尝试获取一个许可,如果失败则表示达到限流
return rateLimiter.tryAcquire();
}
}

RateLimiter 的创建只会在首次调用时创建,后续的调用会复用已经创建的 RateLimiter 实例。这是因为 Redisson 的 getRateLimiter() 方法会根据传入的 key(如 "search:keyword:" + keyword + ":user:" + userId)获取对应的 RateLimiter,如果这个 RateLimiter 已经存在于 Redis 中,则不会重新创建,而是获取已存在的实例。

客户端级限流模式(RateType.PER_CLIENT):每个客户端(或用户、IP)都有独立的限流器,不同客户端之间互不影响,适合需要按客户端维度进行限流的应用场景。

优化1:分层限流

可以根据用户类型或优先级进行分层限流,将有限的系统资源优先分配给重要的用户或业务:

  • VIP 用户:对 VIP 用户的搜索请求设置更宽松的限流策略,确保他们在流量高峰时依然能够优先使用服务。
  • 普通用户:对普通用户采取较严格的限流策略,在流量激增时优先对普通用户进行限流。

优化2:批量获取令牌

在高并发场景下,频繁向 Redis 请求令牌会导致 Redis 的连接和请求压力增加。为了解决这个问题,可以通过批量获取令牌的方式减少 Redis 请求的频率。即,每次从 Redis 获取多个令牌,缓存到本地(例如每次获取 10 个令牌),然后在本地逐步使用这些令牌。

1
2
3
4
5
// 批量获取 10 个令牌
boolean acquired = rateLimiter.tryAcquire(10);
if (acquired) {
// 本地消耗令牌
}

批量获取令牌后,可以在本地维护一个计数器,每次请求时减少本地计数器的值,直到本地缓存的令牌耗尽,再向 Redis 请求一批新的令牌。

优化3:动态调整限流速率

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class DynamicRateLimiterAdjuster {

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private static int currentRate = 20;

public void startDynamicAdjustment() {
scheduler.scheduleAtFixedRate(() -> {
// 模拟获取系统的负载信息
double systemLoad = getSystemLoad();

// 根据系统负载动态调整限流速率(例如负载超过 70% 时降低速率)
if (systemLoad > 0.7) {
currentRate = Math.max(5, currentRate - 5); // 降低速率,但最小保持 5
} else if (systemLoad < 0.5) {
currentRate = Math.min(20, currentRate + 5); // 恢复速率,但最大为 20
}

// 调整 RateLimiter 的速率
CourseSearchRateLimiter.rateLimiter.setRate(RateType.OVERALL, currentRate, 1, RateIntervalUnit.SECONDS);
System.out.println("RateLimiter 调整为每秒 " + currentRate + " 个令牌");
}, 0, 10, TimeUnit.SECONDS); // 每 10 秒检查一次系统负载
}

// 模拟系统负载获取方法
public double getSystemLoad() {
// 在实际应用中,可以从监控系统(如 Prometheus)获取系统负载指标
// 这里使用随机数模拟系统负载
return Math.random();
}
}

创建调度任务:通过 ScheduledExecutorService,每隔 10 秒检查一次系统的负载情况(通过 getSystemLoad() 模拟获取)。

动态调整令牌生成速率

  • 负载大于 70% 时,降低令牌生成速率,避免系统过载。每次减少 5 个令牌的速率,最低保证每秒至少 5 个令牌。
  • 负载小于 50% 时,逐步恢复令牌生成速率,每次增加 5 个令牌,最高限制为每秒 20 个令牌。

调整 RateLimiter:根据系统负载的变化,动态修改分布式限流器(RateLimiter)的令牌生成速率。RateLimiter 控制了每秒允许的请求量,因此通过动态调整其速率,可以控制应用的请求处理能力。

模拟系统负载getSystemLoad() 模拟了系统的负载情况,实际应用中可以替换为具体的监控指标(如 CPU 使用率、QPS 等),从而根据系统的真实状态来调整限流策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import org.redisson.api.RateLimiter;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class CourseSearchService {

@Autowired
private RedissonClient redissonClient; // RedissonClient 用于访问 Redis

// 定义搜索限流器的名字,可以根据用户的 IP 或 ID 做区分
private static final String RATE_LIMITER_KEY_PREFIX = "search_rate_limiter::";

/**
* 搜索课程的限流功能
* @param userId 用户 ID 或者用户标识
* @param keyword 搜索关键词
* @return 搜索结果
*/
public SearchResult searchCoursesWithRateLimiter(String userId, String keyword) {
// 每个用户的限流器
String rateLimiterKey = RATE_LIMITER_KEY_PREFIX + userId;
RateLimiter rateLimiter = redissonClient.getRateLimiter(rateLimiterKey);

// 设置速率:每秒钟允许 1 次搜索,最大 5 次请求的令牌桶
rateLimiter.trySetRate(RateLimiter.RateType.OVERALL, 5, 1, RateLimiter.IntervalUnit.SECONDS);

// 判断用户是否超过限流,如果没有超过,允许继续执行搜索
if (rateLimiter.tryAcquire(1)) {
return performSearch(keyword); // 执行实际的搜索功能
} else {
// 如果超过限流,返回错误信息或处理逻辑
throw new RateLimitExceededException("超过搜索频率限制,请稍后再试!");
}
}

/**
* 模拟实际的课程搜索逻辑
* @param keyword 搜索关键词
* @return 搜索结果
*/
private SearchResult performSearch(String keyword) {
// 这里可以实现实际的 Elasticsearch 或数据库搜索功能
SearchResult result = new SearchResult();
result.setMessage("搜索成功,关键词:" + keyword);
return result;
}
}

4. 实现 Elasticsearch 搜索逻辑

通过 Elasticsearch 进行关键词搜索。此处使用 RestHighLevelClient 进行关键词搜索。

CourseSearchService.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;

public class CourseSearchService {

private RestHighLevelClient esClient;
private SearchRateLimiter searchRateLimiter;

public CourseSearchService(RestHighLevelClient esClient, SearchRateLimiter searchRateLimiter) {
this.esClient = esClient;
this.searchRateLimiter = searchRateLimiter;
}

/**
* 执行关键词搜索
* @param keyword 搜索的关键词
* @throws Exception
*/
public void search(String keyword) throws Exception {
// 先检查该关键词是否被限流
if (searchRateLimiter.trySearch(keyword)) {
System.out.println("允许搜索关键词:" + keyword);

// 构建搜索请求
SearchRequest searchRequest = new SearchRequest("courses"); // courses 为索引名称
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchQuery("name", keyword));
searchRequest.source(searchSourceBuilder);

// 执行搜索
SearchResponse searchResponse = esClient.search(searchRequest, RequestOptions.DEFAULT);
System.out.println("搜索结果:" + searchResponse);
} else {
// 如果被限流,返回提示信息
System.out.println("关键词 '" + keyword + "' 被限流,请稍后再试。");
}
}

public static void main(String[] args) throws Exception {
// 初始化 Redisson 客户端
RedissonConfig redissonConfig = new RedissonConfig();
RedissonClient redissonClient = redissonConfig.getRedissonClient();

// 初始化限流器
SearchRateLimiter searchRateLimiter = new SearchRateLimiter(redissonClient);

// 初始化 Elasticsearch 客户端
RestHighLevelClient esClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);

// 初始化课程搜索服务
CourseSearchService courseSearchService = new CourseSearchService(esClient, searchRateLimiter);

// 模拟多个搜索请求
for (int i = 0; i < 15; i++) {
courseSearchService.search("java");
}

// 关闭客户端
esClient.close();
redissonClient.shutdown();
}
}

5.代码说明

  1. Redisson RateLimiter

    • RateLimiter 是 Redisson 提供的分布式限流器,支持设置某个关键词在指定时间窗口内的最大请求次数。RateLimiter.trySetRate() 设置了每个关键词 60 秒内最多允许 10 次请求。
    • tryAcquire() 方法尝试获取一个令牌,如果成功则允许请求,否则表示已达到限流。
  2. Elasticsearch 搜索

    • 使用 Elasticsearch 的 RestHighLevelClient 进行关键词搜索,通过 SearchRequest 构建搜索请求,并通过 QueryBuilders.matchQuery() 匹配关键词。
    • 搜索结果通过 SearchResponse 返回。
  3. 限流检查

    • 每当用户发起搜索请求时,首先调用 searchRateLimiter.trySearch() 方法检查关键词是否已经达到限流阈值。如果未达到限流,则允许搜索并执行 Elasticsearch 查询;否则,拒绝请求。
  4. 运行结果

当你运行上述代码时,输出结果可能如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
允许搜索关键词:java
允许搜索关键词:java
允许搜索关键词:java
允许搜索关键词:java
允许搜索关键词:java
允许搜索关键词:java
允许搜索关键词:java
允许搜索关键词:java
允许搜索关键词:java
允许搜索关键词:java
关键词 'java' 被限流,请稍后再试。
关键词 'java' 被限流,请稍后再试。
关键词 'java' 被限流,请稍后再试。
关键词 'java' 被限流,请稍后再试。
关键词 'java' 被限流,请稍后再试。

前 10 次请求成功,后续请求由于超过了限流阈值而被拒绝。

测试方案

使用JUnit 是 Java 语言中最流行的单元测试框架,用于编写和运行可重复的测试。

测试方案总结表:

测试场景 测试目标 测试步骤 预期结果
单用户测试 验证单用户在限流规则下的搜索次数是否受限 - 设置限流规则(每分钟最多 5 次搜索) - 在 1 分钟内发送 7 次搜索请求 - 前 5 次请求被允许 - 第 6、7 次请求被拒绝,返回限流错误
多用户测试 验证不同用户限流是否互相隔离,且分别生效 - 模拟用户 A 和用户 B - 每个用户分别发送 6 次搜索请求 - 用户 A 和 B 的前 5 次请求被允许 - 第 6 次请求被拒绝
并发测试 验证并发情况下限流是否生效,防止并发突破限流机制 - 启动 10 个线程,每个线程发送 2 次请求 - 最多允许 5 个请求成功 - 剩余请求应被限流
边界情况测试 验证临界点时限流是否正常工作 - 发送正好 5 次请求,验证限流 - 等待 1 分钟,发送第 6 次请求 - 连续多次触发超限 - 正好 5 次请求被允许,后续被拒绝 - 等待 1 分钟后,新请求被允许 - 多次超限请求被有效阻止
恢复机制测试 验证时间窗口结束后,限流规则是否自动恢复 - 在 1 分钟内发送 5 次请求 - 等待 1 分钟后,发送 1 次新请求 - 1 分钟后,限流重置,新请求被允许

问题

  • Redisson 是一个基于 Redis 的 Java 库,提供了分布式数据结构、锁、限流等功能,帮助开发者在分布式环境中简化 Redis 的使用。
  • RateLimiter 是一种限流器,用于控制请求的速率,防止系统过载。可以使用 Google Guava 提供的 RateLimiter 类,或者通过 Redis 实现分布式限流。

Q:Redisson 的 RateLimiter 底层

Redisson 是一个基于 Redis 的分布式框架,提供了许多分布式工具,其中包括 RateLimiter(限流器)。Redisson 的 RateLimiter 可以在分布式系统中使用 Redis 来实现对某些资源或操作的速率限制。它的底层实现基于令牌桶算法(Token Bucket),并通过 Redis 来管理令牌的分发和计数。

1.令牌桶算法概述

在 Redisson 的 RateLimiter 实现中,使用的是一种常见的限流算法:令牌桶算法。其工作原理如下:

  • 一个固定容量的桶(即令牌桶)以固定的速率生成令牌,并将令牌添加到桶中。
  • 当用户请求资源时,需要从桶中获取令牌。如果桶中有令牌可用,则允许请求通过,并从桶中移除相应的令牌。
  • 如果桶中没有足够的令牌,说明请求过多,超出限制,请求会被拒绝或延迟,直到令牌再次可用。

2.Redisson RateLimiter 的 Redis 底层实现

Redisson 的 RateLimiter 是基于 Redis 的分布式锁和 Lua 脚本来实现的。下面是 Redisson RateLimiter 的核心实现细节:

a. 令牌存储在 Redis 中

Redisson 使用 Redis 来存储令牌的数量以及上次令牌更新的时间戳。令牌桶的状态(包括当前的令牌数量和上次请求的时间)会保存在 Redis 的一个键中,这样多个分布式节点可以共享同一个限流器。

b. Lua 脚本的使用

为了保证分布式系统中的原子性操作,Redisson 使用 Lua 脚本来实现令牌的计算与更新。Lua 脚本可以确保在 Redis 中进行的操作是原子的,即不会出现并发问题。

当用户请求令牌时,Lua 脚本会执行以下操作:

  1. 计算令牌数量:根据当前时间戳和上次令牌更新的时间戳,计算应该生成多少个新令牌,并将这些令牌加入到桶中。
  2. 检查是否有足够的令牌:如果桶中的令牌数量足够,则减去相应的数量并允许请求通过。
  3. 更新 Redis 中的状态:如果令牌被成功获取,则更新 Redis 中的令牌数量和最后一次请求的时间戳。

Lua 脚本的执行是原子的,这意味着可以避免多个分布式节点同时访问令牌时出现竞争条件或数据不一致的情况。

c. 令牌生成速率

RateLimiter 允许用户定义两种类型的限流模式:

  1. 固定速率模式(RateType.OVERALL):这种模式下,令牌以一个全局固定的速率生成,所有的请求共享同一个速率。
  2. 每秒速率模式(RateType.PER_CLIENT):这种模式下,令牌以每个客户端的速率生成,可以为每个客户端单独设置一个速率。

无论选择哪种模式,Redisson 都依赖 Redis 的精确计时来确保所有的节点都按照设定的速率生成令牌。

Q:有哪些业务场景下不适合进行限流?

热词限流的主要目的是保护系统资源,防止某些高频请求导致系统过载或崩溃,但并不是所有场景都适合进行限流。特别是在核心业务、实时性要求高的场景或用户体验敏感的场景下,限流可能会带来负面影响。在这些场景下,应该根据业务特点,选择更合适的手段,如 排队机制缓存负载均衡请求分流,来保证系统的稳定性和用户体验的平衡。

Q:如果某个热词的访问量突然激增,你会如何动态调整限流策略?

面对热词访问量突然激增的场景,处理方式需要灵活、动态,主要涉及以下几个步骤:

  1. 监控系统:实时监控流量,检测热词并触发预警。
  2. 限流调整:根据热词的流量变化,动态调整令牌生成速率、突发处理能力,甚至分层限流。
  3. 缓存与熔断:通过缓存热点数据和引入熔断机制来减轻系统压力,确保系统稳定运行。
  4. 回退与恢复机制:在极端情况下进行降级处理,流量恢复后再逐步恢复限流。

Q:在高并发环境下,你如何确保 Redis 不会因为限流操作被打爆?

在高并发环境下,确保 Redis 不会因为限流操作被打爆的核心在于以下几个方面:

  1. 合理设置限流规则,避免过度限流带来不必要的 Redis 请求。
  2. 使用 Lua 脚本实现原子操作,减少多次网络请求的开销。
  3. 通过 Redis 集群分摊压力,避免单点瓶颈。
  4. 利用本地缓存减少 Redis 交互,降低 Redis 的负载。
  5. 异步化和批量处理,降低瞬时请求对 Redis 的压力。
  6. 合理配置 Redis,确保 Redis 处于最佳性能状态。
  7. 监控和预警,及时发现和解决潜在的性能问题。

Q:如果 Redis 挂掉了,RateLimiter 会受到什么影响?你是如何设计系统来应对 Redis 不可用的场景的?

为了应对 Redis 不可用的场景,设计一个健壮的限流系统时可以结合多种策略:

  1. 本地限流作为兜底方案:在 Redis 不可用时,使用本地的限流机制作为应急处理,确保系统不会完全失控。
  2. 限流降级机制:在 Redis 不可用时,自动降级限流策略,允许更多请求通过,保持服务可用性。
  3. 本地缓存限流状态:使用本地缓存来减少对 Redis 的依赖,确保 Redis 短暂不可用时系统仍然可以运行。
  4. Redis 高可用架构:通过 Redis Sentinel 或 Redis Cluster 来提高 Redis 的可用性,避免单点故障。
  5. 熔断机制:在 Redis 不可用时触发熔断,暂时停止对 Redis 的请求,防止故障蔓延。

如果 RateLimiter 达到限流阈值,用户大量请求被拒绝,如何避免对用户体验的影响?

设计缓存、排队等机制来缓解用户的请求压力。

【本地限流,本地缓存,动态调整限流策略,分层限流,监控,熔断,降级】