本帖使用社区开源推广,符合推广要求。我申明并遵循社区要求的以下内容:
我的帖子已经打上 开源推广 标签: 是
我的开源项目完整开源,无未开源部分: 是
我的开源项目已链接认可 LINUX DO 社区: 是
我帖子内的项目介绍,AI生成、润色内容部分已截图发出: 是
以上选择我承诺是永久有效的,接受社区和佬友监督: 是
以下为项目介绍正文内容,AI生成、润色内容已使用截图方式发出
前言
本教程的环境基于 jdk8 + langchain4j 0.35
教程源码放在这里了:
GitHub - worenbudaoni/rag-study-helper: 一个学习检索增强生成的全流程助手
一个学习检索增强生成的全流程助手
一个学习检索增强生成的全流程助手
文章内容
因为内容比较多,我会从下面三个文章进行讲解,后续发布后会贴出来,这节讲:接口限流:令牌桶 + AOP
RAG实现全流程 :【开源、教程】RAG全流程实现(java+完整代码):第一弹
接入飞书WIKI文档 :【开源、教程】RAG全流程实现(接入飞书WIKI):第二弹
接口限流:令牌桶 + AOP
限流
限流的核心是控制访问速率 。你可以把它想象成一个景区的入口:
不限流 :黄金周所有游客一拥而入,景区内部严重拥堵,甚至有踩踏和设施损坏的风险。
限流 :门口售票处控制每分钟只放行100人。虽然外面排起了长队,但里面的游客体验是安全、顺畅的。
本项目中,限流是为了防止 LLM 接口被恶意调用,造成不必要的损失
在实际的项目中,用户的流量会经历以下环节后进入我们的服务器
用户流量 → CDN/WAF → Nginx(限流) → 网关/应用(Sentinel) → 后端服务
但该项目只是一个通用学习架构,所以本文会带大家用代码(框架)实现平替(低配) Sentinel 的限流策略
经典限流算法(计数器、滑动窗口、漏斗、令牌桶)在网上已经有很多的讲解了
这里我就带着大家过一下我项目的代码逻辑和实现原理(令牌桶)
如果有佬友对其他的限流算法有兴趣的话,我后面再做一期专门讲限流算法的文章
如果文章写的有什么问题,欢迎大佬们指教,感谢~
项目限流拆解(令牌桶(Redisson/RRateLimiter)+AOP)
先从简单的讲起(AOP)
项目用了两个自定义注解(IpRateLimit、RateLimit)配合AOP+Redisson实现限流
IpRateLimit:定制化IP限流注解,可通过配置项热更新
RateLimit:通用限流注解,需要项目重启可生效
以上两个注解的参数都以key的形式保存redis,所以每次更新参数都会刷新key(后面会讲,这里只是简单答疑为什么改参数即生效)
AOP
我把所有类都列出来,不要嫌我啰嗦,这样可以不用翻源码就可以看懂
注解
IpRateLimit.java
/**
* 定制化 ip 限流注解 支持热更新
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface IpRateLimit {
/**
* 限流 key
*/
String value();
}
RateLimit.java
/**
* 通用限流注解
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RateLimit {
/**
* 限流 key
*/
String value();
/**
* 令牌桶容量
*/
long maxCapacity();
/**
* 补充令牌的时间,默认 1 分钟补充 maxCapacity 个令牌
* 默认单位:秒
*/
long supplementTimeOfSeconds() default 60L;
/**
* 每日最大调用次数,默认 0 表示不限制
*/
int dailyMaximumCount() default 0;
/**
* 超时时间 单位,默认 24 小时
* 默认单位:小时
*/
long timeOutOfHours() default 24L;
}
切面
RateLimitAspect.java
/**
* 限流切面。
*/
@Aspect
@Component
public class RateLimitAspect {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Autowired
private RateLimitService rateLimitService;
@Value("${app.rate-limit.ip-rate:20}")
private int ipRate;
@Value("${app.rate-limit.ip-supplement-seconds:60}")
private int ipSupplementSeconds;
@Value("${app.rate-limit.ip-expire-hours:24}")
private int ipExpireHours;
@Value("${app.rate-limit.daily-max:10000}")
private int dailyMax;
/**
* 聊天接口限流
* 定制化IP限流 支持热更新
*/
@Around("@annotation(ipRateLimit)")
public Object chatAround(ProceedingJoinPoint pjp, IpRateLimit ipRateLimit) throws Throwable {
// 从当前请求上下文获取 request / response
ServletRequestAttributes attrs = (ServletRequestAttributes)
RequestContextHolder.currentRequestAttributes();
HttpServletRequest request = attrs.getRequest();
HttpServletResponse response = attrs.getResponse();
String key = ipRateLimit.value();
// 1. 全局每日限流
if (!rateLimitService.tryDaily(key, dailyMax)) {
writeRateLimitResponse(response, "今日调用次数已达上限");
return null; }
// 2. IP 令牌桶限流
String clientIp = getClientIp(request);
if (!rateLimitService.tryAcquire("ip:" + key + ":" + clientIp, ipRate, ipSupplementSeconds, ipExpireHours)) {
writeRateLimitResponse(response, "请求过于频繁,请稍后再试");
return null; }
// 通过限流,执行业务方法
return pjp.proceed();
}
/**
* 通用限流
*/
@Around("@annotation(rateLimit)")
public Object commonAround(ProceedingJoinPoint pjp, RateLimit rateLimit) throws Throwable {
// 从当前请求上下文获取 response
ServletRequestAttributes attrs = (ServletRequestAttributes)
RequestContextHolder.currentRequestAttributes();
HttpServletResponse response = attrs.getResponse();
// redis 的 key
String key = rateLimit.value();
// 令牌桶最大容量
long maxCapacity = rateLimit.maxCapacity();
// 令牌补充时间(秒)
long supplementTimeOfSeconds = rateLimit.supplementTimeOfSeconds();
// 超时时间
long timeOutOfHours = rateLimit.timeOutOfHours();
// 每日最大计数
int dailyMaximumCount = rateLimit.dailyMaximumCount();
if (dailyMaximumCount > 0) {
// 全局每日限流
if (!rateLimitService.tryDaily(key, dailyMaximumCount)) {
writeRateLimitResponse(response, "今日调用次数已达上限");
return null; }
}
// 令牌桶限流
if (!rateLimitService.tryAcquire(key, maxCapacity, supplementTimeOfSeconds, timeOutOfHours)) {
writeRateLimitResponse(response, "请求过于频繁,请稍后再试");
return null; }
// 通过限流,执行业务方法
return pjp.proceed();
}
// 获取IP
private String getClientIp(HttpServletRequest request) {
String xff = request.getHeader("X-Forwarded-For");
if (xff != null && !xff.isEmpty() && !"unknown".equalsIgnoreCase(xff)) {
return xff.split(",")[0].trim();
}
return request.getRemoteAddr();
}
// 响应限流报错信息
private void writeRateLimitResponse(HttpServletResponse response, String msg) throws Exception {
response.setStatus(HttpServletResponse.SC_OK);
response.setContentType("application/json;charset=UTF-8");
Results<Void> result = Results.failed("429", msg);
response.getWriter().write(OBJECT_MAPPER.writeValueAsString(result));
}
}
限流服务
代码:RateLimitService.java
/**
* 基于 Redisson 分布式令牌桶的限流服务
*/
@Service
public class RateLimitService {
private static final Logger log = LoggerFactory.getLogger(RateLimitService.class);
private final RedissonClient redissonClient;
private final StringRedisTemplate redisTemplate;
public RateLimitService(RedissonClient redissonClient, StringRedisTemplate redisTemplate) {
this.redissonClient = redissonClient;
this.redisTemplate = redisTemplate;
}
/**
* 分布式令牌桶限流
*
* @param key 限流 key
* @param maxCapacity 桶容量上限
* @param supplementTimeOfSeconds 补充周期(秒)
* @param timeOutOfHours 过期时间(小时)
* @return true 允许通过,false 触发限流
*/
public boolean tryAcquire(String key, long maxCapacity, long supplementTimeOfSeconds, long timeOutOfHours) {
// 将配置值编码进 key,变更配置即自动切换限流器,旧 key 随 TTL 过期
RRateLimiter limiter = redissonClient.getRateLimiter("rl:" + key + ":" + maxCapacity + ":" + supplementTimeOfSeconds + ":" + timeOutOfHours);
if (!limiter.isExists()) {
// RateType.OVERALL:所有实例共享同一桶(如三个实例,maxCapacity=10, supplementTimeOfSeconds=600 → 三个实例加起来每10分钟最多访问10次)
// RateType.PER_CLIENT:每个实例独立桶(三个实例各自每10分钟最多10次)
// maxCapacity:桶容量(最大令牌数)
// supplementTimeOfSeconds:补充周期(每隔该秒数补充令牌,补充量为 maxCapacity)
// 例子:maxCapacity=10, supplementTimeOfSeconds=60 → 每60秒补充10个令牌
limiter.trySetRate(RateType.OVERALL, maxCapacity, Duration.ofSeconds(supplementTimeOfSeconds), Duration.ofHours(timeOutOfHours));
}
return limiter.tryAcquire();
}
/**
* 全局每日调用计数限流。
* <p>
* 使用 INCR + EXPIRE 实现,每日零点自动重置。
*
* @param keyPrefix key 前缀
* @param maxCalls 每日最大调用次数
* @return true 允许通过,false 超限
*/
public boolean tryDaily(String keyPrefix, int maxCalls) {
String key = "rl:daily:" + keyPrefix + ":" + LocalDate.now();
Long count = redisTemplate.opsForValue().increment(key);
// 首次创建时设置过期时间,28 小时后自动删除
if (count != null && count == 1L) {
redisTemplate.expire(key, 28, TimeUnit.HOURS);
}
if (count != null && count > maxCalls) {
log.warn("每日调用超限: key={}, count={}, max={}", keyPrefix, count, maxCalls);
return false; }
return true;
}
}
方法拆解
一、分布式令牌桶限流:tryAcquire
/**
* 分布式令牌桶限流
*
* @param key 限流 key
* @param maxCapacity 桶容量上限
* @param supplementTimeOfSeconds 补充周期(秒)
* @param timeOutOfHours 过期时间(小时)
* @return true 允许通过,false 触发限流
*/
public boolean tryAcquire(String key, long maxCapacity, long supplementTimeOfSeconds, long timeOutOfHours) {
// 将配置值编码进 key,变更配置即自动切换限流器,旧 key 随 TTL 过期
RRateLimiter limiter = redissonClient.getRateLimiter("rl:" + key + ":" + maxCapacity + ":" + supplementTimeOfSeconds + ":" + timeOutOfHours);
if (!limiter.isExists()) {
// RateType.OVERALL:所有实例共享同一桶(如三个实例,maxCapacity=10, supplementTimeOfSeconds=600 → 三个实例加起来每10分钟最多访问10次)
// RateType.PER_CLIENT:每个实例独立桶(三个实例各自每10分钟最多10次)
// maxCapacity:桶容量(最大令牌数)
// supplementTimeOfSeconds:补充周期(每隔该秒数补充令牌,补充量为 maxCapacity)
// 例子:maxCapacity=10, supplementTimeOfSeconds=60 → 每60秒补充10个令牌
limiter.trySetRate(RateType.OVERALL, maxCapacity, Duration.ofSeconds(supplementTimeOfSeconds), Duration.ofHours(timeOutOfHours));
}
return limiter.tryAcquire();
}
细节拆析:
redissonClient.getRateLimiter 只是初始化+设置名字不调用redis
limiter.isExists() 判断存不存在,其实要不要这个都无所谓,性能提升很有限
重要的只有两行,所以这节的篇幅主要是分析下面两行的lua脚本
一行是 limiter.trySetRate 设置令牌桶元数据
一行是 limiter.tryAcquire 限流
limiter.trySetRate
重要的是理解lua脚本每一行是干什么的,这里我带着大家解读
// 我项目里写的代码 对应下面的源码
limiter.trySetRate(RateType.OVERALL, rate, rateInterval, keepAliveTime);
// 调用的redisson源码(lua脚本)
@Override
public RFuture<Boolean> trySetRateAsync(RateType type, long rate, Duration rateInterval, Duration keepAliveTime) {
if (!keepAliveTime.equals(Duration.ZERO) && keepAliveTime.toMillis() < rateInterval.toMillis()) {
throw new IllegalArgumentException("The parameter keepAliveTime should be greater than or equal to rateInterval");
}
return commandExecutor.evalWriteNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 设置一个 哈希的 key-map(field-value) 这样的数据结构,setnx 就是没有就创建(成功返回 1 ),有就不创建(失败返回 0 )
// 相当于创建了一个类(KEYS[1]),把里面的属性(rate、interval、keepAliveTime赋值)
// 用 json 表示,就是这样的一个元数据,redis 通过这个 KEYS[1] 找到下面的 map
//{
// "KEYS[1]":{
// "rate":"ARGV[1]", // 桶大小
// "interval":"ARGV[2]", // 补充周期
// "keepAliveTime":"ARGV[4]", // 过期时间 这个过期时间只是当一个属性保存起来,后面的才是真正设置 TTL
// "type":"ARGV[3]" // RateType.OVERALL
// }
//}
"redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);"
+ "redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);"
+ "redis.call('hsetnx', KEYS[1], 'keepAliveTime', ARGV[4]);"
+ "local res = redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);"
// 如果 通过 hsetnx 成功了就返回 1 过期时间大于 0 就进 if 分支
+ "if res == 1 and tonumber(ARGV[4]) > 0 then "
// 设置 这个元数据的过期时间
+ "redis.call('pexpire', KEYS[1], ARGV[4]); "
+ "end; "
+ "return res;",
// 这里的 getRawName() 就是通过 redissonClient.getRateLimiter("name") 设置的
Collections.singletonList(getRawName()),
// maxCapacity 桶大小 对应的 ARGV[1]
rate,
// supplementTimeOfSeconds 补充周期 转毫秒 对应的 ARGV[2]
rateInterval.toMillis(),
// ordinal 排序 RateType.OVERALL 就对应的 0 (因为它在第一个) 对应的 ARGV[3]
type.ordinal(),
// timeOutOfHours 过期时间 转毫秒 对应的 ARGV[4]
keepAliveTime.toMillis());
}
limiter.tryAcquire()
因为这里的字符串拼接太长了看着费劲,所以我把 java 代码和 lua 脚本拆出来了,lua 脚本在后面
java 代码主要看入参注释有哪些,lua 脚本就由我带着佬友解读
佬友可以打开 idea 追进 redisson 中 RedissonRateLimiter 类看源码(我这里也是拷贝的源码,但是md文档肯定没有编辑器看着舒服),再配合我放在最后的流程图看着更舒适
// limiter.tryAcquire(value);
// 上面代码对应下面源码的注释
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
// 生成 16 位随机 ID
byte[] random = getServiceManager().generateIdArray();
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"local rate = redis.call('hget', KEYS[1], 'rate');"
+ "local interval = redis.call('hget', KEYS[1], 'interval');"
+ "local type = redis.call('hget', KEYS[1], 'type');"
+ "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"
+ "local valueName = KEYS[2];"
+ "local permitsName = KEYS[4];"
+ "if type == '1' then "
+ "valueName = KEYS[3];"
+ "permitsName = KEYS[5];"
+ "end;"
+ "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount cannot exceed defined rate'); "
+ "local currentValue = redis.call('get', valueName); "
+ "local res;"
+ "if currentValue ~= false then "
+ "local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
+ "local released = 0; "
+ "for i, v in ipairs(expiredValues) do "
+ "local random, permits = struct.unpack('Bc0I', v);"
+ "released = released + permits;"
+ "end; "
+ "if released > 0 then "
+ "redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
+ "if tonumber(currentValue) + released > tonumber(rate) then "
+ "local values = redis.call('zrange', permitsName, 0, -1); "
+ "local used = 0; "
+ "for i, v in ipairs(values) do "
+ "local random, permits = struct.unpack('Bc0I', v);"
+ "used = used + permits;"
+ "end; "
+ "currentValue = tonumber(rate) - used; "
+ "else "
+ "currentValue = tonumber(currentValue) + released; "
+ "end; "
+ "redis.call('set', valueName, currentValue);"
+ "end;"
+ "if tonumber(currentValue) < tonumber(ARGV[1]) then "
+ "local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores'); "
+ "res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));"
+ "else "
+ "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "
+ "redis.call('decrby', valueName, ARGV[1]); "
+ "res = nil; "
+ "end; "
+ "else "
+ "redis.call('set', valueName, rate); "
+ "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "
+ "redis.call('decrby', valueName, ARGV[1]); "
+ "res = nil; "
+ "end;"
+ "local keepAliveTime = redis.call('hget', KEYS[1], 'keepAliveTime'); "
+ "if (keepAliveTime ~= false and tonumber(keepAliveTime) > 0) then "
+ "redis.call('pexpire', KEYS[1], keepAliveTime); "
+ "redis.call('pexpire', valueName, keepAliveTime); "
+ "redis.call('pexpire', permitsName, keepAliveTime); "
+ "else "
+ "local ttl = redis.call('pttl', KEYS[1]); "
+ "if ttl > 0 then "
+ "redis.call('pexpire', valueName, ttl); "
+ "redis.call('pexpire', permitsName, ttl); "
+ "end; "
+ "end; "
+ "return res;",
Arrays.asList(
// 就是 redissonClient.getRateLimiter("chat") 就是这个 chat,下面的 chat 也对应的这个
// KEYS[1]
getRawName(),
// 相当于创建一个新的 key 为 {chat}:value,这个 value 是硬编码(不是一个变量)
// KEYS[2]
getValueName(),
// 相当于创建一个新的 key 为 getValueName():serverId
// getValueName():serverId = {chat}:value:serverId
// 这个 serverId 是变量 对应一个服务(唯一ID)
// KEYS[3]
getClientValueName(),
// 相当于创建一个新的 key 为 {chat}:permits,这个 permits 是硬编码(不是一个变量)
// KEYS[4]
getPermitsName(),
// 相当于创建一个新的 key 为 getPermitsName():serverId
// getPermitsName():serverId = {chat}:permits:serverId
// 这个 serverId 是变量 对应一个服务(唯一ID)
// KEYS[5]
getClientPermitsName()
),
// 需要取多少个令牌 对应的 limiter.tryAcquire(value) 中的 value 不填默认为 1,就是取一个
// ARGV[1]
value,
// 时间戳
// ARGV[2]
System.currentTimeMillis(),
// 生成 16 位随机 ID
// ARGV[3]
random
);
}
limiter.tryAcquire()LUA脚本
-- 获取桶容量
local rate = redis.call('hget', KEYS[1], 'rate');
-- 获取补充周期
local interval = redis.call('hget', KEYS[1], 'interval');
-- 获取 RateType.OVERALL(全局限流)/RateType.PER_CLIENT(客户端限流)对应的 0/1
local type = redis.call('hget', KEYS[1], 'type');
-- 如果上述三个有一个为空则认为元数据没有被初始化,报错
-- assert 条件为 true 不报错 , ~= 不等于 , false 不存在
assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized');
-- 我就假设 KEYS[1] = "chat",serverId = 666,type = 0(全局限流) 讲解
-- 初始化 valueName = {chat}:value
local valueName = KEYS[2];
-- 初始化 和permitsName = {chat}:permits
local permitsName = KEYS[4];
-- 如果 type 为 RateType.PER_CLIENT(对应的1)
if type == '1' then
-- valueName = {chat}:value:666
valueName = KEYS[3];
-- permitsName = {chat}:permits:666
permitsName = KEYS[5];
end;
-- 如果取的令牌超过设置的最大桶容量就报错
assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount cannot exceed defined rate');
-- 当前时间可用令牌数:获取 key 为 {chat}:value 的值
local currentValue = redis.call('get', valueName);
-- 设置一个返回变量 res = nil(同 null )表示获取成功
local res;
-- 如果 currentValue 存在(即已经初始化过)
if currentValue ~= false then
-- 查询在 key 为 permitsName 的 zset 集合中(当前时间 - 补充周期)之前过期的令牌记录
local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval);
-- 累计可回收的令牌数
local released = 0;
-- 遍历所有过期的令牌记录
for i, v in ipairs(expiredValues) do
-- 解包每条记录:参数 'Bc0I' 表示结构为 1字节长度 + 字符串(随机ID)+ 4字节整数(令牌数)
-- 随机ID和该批次消耗的令牌数
local random, permits = struct.unpack('Bc0I', v);
-- 累加可回收的令牌
released = released + permits;
end;
-- 如果有可回收的令牌
if released > 0 then
-- 从有序集合中删除所有过期的记录
redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval);
-- 如果回收后剩余令牌会超过桶容量
-- lua脚本保证了原子性,但不能保证rate(元数据中的最大桶容量)的值没有被修改,如果被减小,了那么可用令牌+回收令牌就可能>rate,这时就要修改valueName了,如果增大了,还有令牌的情况下也没必要现在就改 valueName
if tonumber(currentValue) + released > tonumber(rate) then
-- 获取当前所有未过期的令牌记录
local values = redis.call('zrange', permitsName, 0, -1);
-- 计算当前实际已使用的令牌总数
local used = 0;
-- 遍历所有未过期的令牌记录(valueName 是当前剩余的令牌数,需要重新精确计算)
for i, v in ipairs(values) do
-- random 是之前生成但未使用的随机ID,permits 是该批次实际消耗的令牌数
local random, permits = struct.unpack('Bc0I', v);
-- 累加所有未过期记录中的令牌总数,得到当前已被占用的令牌数
used = used + permits;
end;
-- 剩余令牌数 = 桶容量 - 已使用数
currentValue = tonumber(rate) - used;
else
-- 否则直接将回收的令牌加回剩余令牌
currentValue = tonumber(currentValue) + released;
end;
-- 更新剩余令牌数到 Redis
redis.call('set', valueName, currentValue);
end;
-- 如果剩余令牌数不够本次请求
if tonumber(currentValue) < tonumber(ARGV[1]) then
-- 获取最早一次令牌记录的分数(时间戳)
local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores');
-- 计算需要等待的毫秒数:补充周期 - (当前时间 - 最早记录时间) + 3(最小等待时间)
res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));
else
-- 令牌足够:记录本次消耗的令牌批次
redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1]));
-- 减少剩余令牌数
redis.call('decrby', valueName, ARGV[1]);
-- 返回 nil 表示获取成功
res = nil;
end;
else
-- currentValue 不存在(首次获取令牌)
-- 之前判断了 获取的令牌数必须 <= 最大令牌桶容量 才能走到这
-- 初始化剩余令牌为桶容量
redis.call('set', valueName, rate);
-- 记录本次消耗的令牌批次
redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1]));
-- 减少剩余令牌数
redis.call('decrby', valueName, ARGV[1]);
-- 返回 nil 表示获取成功
res = nil;
end;
-- 获取限流器配置的 keepAliveTime(过期时间)
local keepAliveTime = redis.call('hget', KEYS[1], 'keepAliveTime');
-- 如果设置了 keepAliveTime 且大于 0
if (keepAliveTime ~= false and tonumber(keepAliveTime) > 0) then
-- 更新主 key 的过期时间
redis.call('pexpire', KEYS[1], keepAliveTime);
-- 更新剩余令牌 key 的过期时间
redis.call('pexpire', valueName, keepAliveTime);
-- 更新令牌记录有序集合的过期时间
redis.call('pexpire', permitsName, keepAliveTime);
else
-- 否则获取主 key(chat) 的剩余 TTL
local ttl = redis.call('pttl', KEYS[1]);
-- 如果 chat 还没有过期
if ttl > 0 then
-- 将剩余令牌 key 的过期时间设置与主 key 一致
redis.call('pexpire', valueName, ttl);
-- 将令牌记录有序集合的过期时间设置与主 key 一致
redis.call('pexpire', permitsName, ttl);
end;
end;
-- 返回结果:nil=成功获取,数字=需要等待的毫秒数
-- 返回数字后会重试,代码要用limiter.tryAcquire(Duration.ofSeconds(2));才有重试机制
return res;
limiter.tryAcquire()流程图
图片有点大,如果被压缩了可以找我要原图
二、全局每日调用计数限流:tryDaily
这个就没有什么好讲的了,因为redis执行时单线程的,所以每次请求都+1,到了最大调用数就不让访问就行了
需要注意的点就是设置TTL可以稍微大一点,可以避免时间临界问题
public boolean tryDaily(String keyPrefix, int maxCalls) {
String key = "rl:daily:" + keyPrefix + ":" + LocalDate.now();
Long count = redisTemplate.opsForValue().increment(key);
// 首次创建时设置过期时间,28 小时后自动删除
if (count != null && count == 1L) {
redisTemplate.expire(key, 28, TimeUnit.HOURS);
}
if (count != null && count > maxCalls) {
log.warn("每日调用超限: key={}, count={}, max={}", keyPrefix, count, maxCalls);
return false; }
return true;
}