【开源、教程】RAG全流程实现(接口限流:令牌桶 + AOP):第三弹

我认不到你 2026-06-18 17:12 1


本帖使用社区开源推广,符合推广要求。我申明并遵循社区要求的以下内容:



  • 我的帖子已经打上 开源推广 标签:

  • 我的开源项目完整开源,无未开源部分:

  • 我的开源项目已链接认可 LINUX DO 社区:

  • 我帖子内的项目介绍,AI生成、润色内容部分已截图发出:

  • 以上选择我承诺是永久有效的,接受社区和佬友监督:


以下为项目介绍正文内容,AI生成、润色内容已使用截图方式发出




前言



本教程的环境基于 jdk8 + langchain4j 0.35



教程源码放在这里了:




一个学习检索增强生成的全流程助手


文章内容


因为内容比较多,我会从下面三个文章进行讲解,后续发布后会贴出来,这节讲:接口限流:令牌桶 + AOP



  • RAG实现全流程:【开源、教程】RAG全流程实现(java+完整代码):第一弹

  • 接入飞书WIKI文档:【开源、教程】RAG全流程实现(接入飞书WIKI):第二弹

  • 接口限流:令牌桶 + AOP


限流



限流的核心是控制访问速率。你可以把它想象成一个景区的入口:



  • 不限流:黄金周所有游客一拥而入,景区内部严重拥堵,甚至有踩踏和设施损坏的风险。

  • 限流:门口售票处控制每分钟只放行100人。虽然外面排起了长队,但里面的游客体验是安全、顺畅的。


本项目中,限流是为了防止 LLM 接口被恶意调用,造成不必要的损失

在实际的项目中,用户的流量会经历以下环节后进入我们的服务器

用户流量 → CDN/WAF → Nginx(限流) → 网关/应用(Sentinel) → 后端服务

但该项目只是一个通用学习架构,所以本文会带大家用代码(框架)实现平替(低配) Sentinel 的限流策略


经典限流算法(计数器、滑动窗口、漏斗、令牌桶)在网上已经有很多的讲解了

这里我就带着大家过一下我项目的代码逻辑和实现原理(令牌桶)

如果有佬友对其他的限流算法有兴趣的话,我后面再做一期专门讲限流算法的文章

如果文章写的有什么问题,欢迎大佬们指教,感谢~



项目限流拆解(令牌桶(Redisson/RRateLimiter)+AOP)



先从简单的讲起(AOP)

项目用了两个自定义注解(IpRateLimit、RateLimit)配合AOP+Redisson实现限流

IpRateLimit:定制化IP限流注解,可通过配置项热更新

RateLimit:通用限流注解,需要项目重启可生效

以上两个注解的参数都以key的形式保存redis,所以每次更新参数都会刷新key(后面会讲,这里只是简单答疑为什么改参数即生效)



AOP



我把所有类都列出来,不要嫌我啰嗦,这样可以不用翻源码就可以看懂



注解


IpRateLimit.java


/**  
* 定制化 ip 限流注解 支持热更新
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface IpRateLimit {
/**
* 限流 key
*/
String value();
}

RateLimit.java


/**  
* 通用限流注解
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RateLimit {
/**
* 限流 key
*/
String value();

/**
* 令牌桶容量
*/
long maxCapacity();

/**
* 补充令牌的时间,默认 1 分钟补充 maxCapacity 个令牌
* 默认单位:秒
*/
long supplementTimeOfSeconds() default 60L;

/**
* 每日最大调用次数,默认 0 表示不限制
*/
int dailyMaximumCount() default 0;

/**
* 超时时间 单位,默认 24 小时
* 默认单位:小时
*/
long timeOutOfHours() default 24L;
}

切面


RateLimitAspect.java


/**  
* 限流切面。
*/
@Aspect
@Component
public class RateLimitAspect {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

@Autowired
private RateLimitService rateLimitService;

@Value("${app.rate-limit.ip-rate:20}")
private int ipRate;

@Value("${app.rate-limit.ip-supplement-seconds:60}")
private int ipSupplementSeconds;

@Value("${app.rate-limit.ip-expire-hours:24}")
private int ipExpireHours;

@Value("${app.rate-limit.daily-max:10000}")
private int dailyMax;

/**
* 聊天接口限流
* 定制化IP限流 支持热更新
*/
@Around("@annotation(ipRateLimit)")
public Object chatAround(ProceedingJoinPoint pjp, IpRateLimit ipRateLimit) throws Throwable {
// 从当前请求上下文获取 request / response
ServletRequestAttributes attrs = (ServletRequestAttributes)
RequestContextHolder.currentRequestAttributes();
HttpServletRequest request = attrs.getRequest();
HttpServletResponse response = attrs.getResponse();

String key = ipRateLimit.value();

// 1. 全局每日限流
if (!rateLimitService.tryDaily(key, dailyMax)) {
writeRateLimitResponse(response, "今日调用次数已达上限");
return null; }

// 2. IP 令牌桶限流
String clientIp = getClientIp(request);
if (!rateLimitService.tryAcquire("ip:" + key + ":" + clientIp, ipRate, ipSupplementSeconds, ipExpireHours)) {
writeRateLimitResponse(response, "请求过于频繁,请稍后再试");
return null; }

// 通过限流,执行业务方法
return pjp.proceed();
}

/**
* 通用限流
*/
@Around("@annotation(rateLimit)")
public Object commonAround(ProceedingJoinPoint pjp, RateLimit rateLimit) throws Throwable {
// 从当前请求上下文获取 response
ServletRequestAttributes attrs = (ServletRequestAttributes)
RequestContextHolder.currentRequestAttributes();
HttpServletResponse response = attrs.getResponse();
// redis 的 key
String key = rateLimit.value();
// 令牌桶最大容量
long maxCapacity = rateLimit.maxCapacity();
// 令牌补充时间(秒)
long supplementTimeOfSeconds = rateLimit.supplementTimeOfSeconds();
// 超时时间
long timeOutOfHours = rateLimit.timeOutOfHours();
// 每日最大计数
int dailyMaximumCount = rateLimit.dailyMaximumCount();

if (dailyMaximumCount > 0) {
// 全局每日限流
if (!rateLimitService.tryDaily(key, dailyMaximumCount)) {
writeRateLimitResponse(response, "今日调用次数已达上限");
return null; }
}

// 令牌桶限流
if (!rateLimitService.tryAcquire(key, maxCapacity, supplementTimeOfSeconds, timeOutOfHours)) {
writeRateLimitResponse(response, "请求过于频繁,请稍后再试");
return null; }

// 通过限流,执行业务方法
return pjp.proceed();
}

// 获取IP
private String getClientIp(HttpServletRequest request) {
String xff = request.getHeader("X-Forwarded-For");
if (xff != null && !xff.isEmpty() && !"unknown".equalsIgnoreCase(xff)) {
return xff.split(",")[0].trim();
}
return request.getRemoteAddr();
}

// 响应限流报错信息
private void writeRateLimitResponse(HttpServletResponse response, String msg) throws Exception {
response.setStatus(HttpServletResponse.SC_OK);
response.setContentType("application/json;charset=UTF-8");
Results<Void> result = Results.failed("429", msg);
response.getWriter().write(OBJECT_MAPPER.writeValueAsString(result));
}
}

限流服务


代码:RateLimitService.java


/**  
* 基于 Redisson 分布式令牌桶的限流服务
*/
@Service
public class RateLimitService {

private static final Logger log = LoggerFactory.getLogger(RateLimitService.class);

private final RedissonClient redissonClient;
private final StringRedisTemplate redisTemplate;

public RateLimitService(RedissonClient redissonClient, StringRedisTemplate redisTemplate) {
this.redissonClient = redissonClient;
this.redisTemplate = redisTemplate;
}

/**
* 分布式令牌桶限流
*
* @param key 限流 key
* @param maxCapacity 桶容量上限
* @param supplementTimeOfSeconds 补充周期(秒)
* @param timeOutOfHours 过期时间(小时)
* @return true 允许通过,false 触发限流
*/
public boolean tryAcquire(String key, long maxCapacity, long supplementTimeOfSeconds, long timeOutOfHours) {
// 将配置值编码进 key,变更配置即自动切换限流器,旧 key 随 TTL 过期
RRateLimiter limiter = redissonClient.getRateLimiter("rl:" + key + ":" + maxCapacity + ":" + supplementTimeOfSeconds + ":" + timeOutOfHours);
if (!limiter.isExists()) {
// RateType.OVERALL:所有实例共享同一桶(如三个实例,maxCapacity=10, supplementTimeOfSeconds=600 → 三个实例加起来每10分钟最多访问10次)
// RateType.PER_CLIENT:每个实例独立桶(三个实例各自每10分钟最多10次)
// maxCapacity:桶容量(最大令牌数)
// supplementTimeOfSeconds:补充周期(每隔该秒数补充令牌,补充量为 maxCapacity)
// 例子:maxCapacity=10, supplementTimeOfSeconds=60 → 每60秒补充10个令牌
limiter.trySetRate(RateType.OVERALL, maxCapacity, Duration.ofSeconds(supplementTimeOfSeconds), Duration.ofHours(timeOutOfHours));
}
return limiter.tryAcquire();
}

/**
* 全局每日调用计数限流。
* <p>
* 使用 INCR + EXPIRE 实现,每日零点自动重置。
*
* @param keyPrefix key 前缀
* @param maxCalls 每日最大调用次数
* @return true 允许通过,false 超限
*/
public boolean tryDaily(String keyPrefix, int maxCalls) {
String key = "rl:daily:" + keyPrefix + ":" + LocalDate.now();
Long count = redisTemplate.opsForValue().increment(key);
// 首次创建时设置过期时间,28 小时后自动删除
if (count != null && count == 1L) {
redisTemplate.expire(key, 28, TimeUnit.HOURS);
}
if (count != null && count > maxCalls) {
log.warn("每日调用超限: key={}, count={}, max={}", keyPrefix, count, maxCalls);
return false; }
return true;
}
}

方法拆解


一、分布式令牌桶限流:tryAcquire


/**  
* 分布式令牌桶限流
*
* @param key 限流 key
* @param maxCapacity 桶容量上限
* @param supplementTimeOfSeconds 补充周期(秒)
* @param timeOutOfHours 过期时间(小时)
* @return true 允许通过,false 触发限流
*/
public boolean tryAcquire(String key, long maxCapacity, long supplementTimeOfSeconds, long timeOutOfHours) {
// 将配置值编码进 key,变更配置即自动切换限流器,旧 key 随 TTL 过期
RRateLimiter limiter = redissonClient.getRateLimiter("rl:" + key + ":" + maxCapacity + ":" + supplementTimeOfSeconds + ":" + timeOutOfHours);
if (!limiter.isExists()) {
// RateType.OVERALL:所有实例共享同一桶(如三个实例,maxCapacity=10, supplementTimeOfSeconds=600 → 三个实例加起来每10分钟最多访问10次)
// RateType.PER_CLIENT:每个实例独立桶(三个实例各自每10分钟最多10次)
// maxCapacity:桶容量(最大令牌数)
// supplementTimeOfSeconds:补充周期(每隔该秒数补充令牌,补充量为 maxCapacity)
// 例子:maxCapacity=10, supplementTimeOfSeconds=60 → 每60秒补充10个令牌
limiter.trySetRate(RateType.OVERALL, maxCapacity, Duration.ofSeconds(supplementTimeOfSeconds), Duration.ofHours(timeOutOfHours));
}
return limiter.tryAcquire();
}


细节拆析:

redissonClient.getRateLimiter 只是初始化+设置名字不调用redis

limiter.isExists() 判断存不存在,其实要不要这个都无所谓,性能提升很有限


重要的只有两行,所以这节的篇幅主要是分析下面两行的lua脚本

一行是 limiter.trySetRate 设置令牌桶元数据

一行是 limiter.tryAcquire 限流



limiter.trySetRate


重要的是理解lua脚本每一行是干什么的,这里我带着大家解读



// 我项目里写的代码 对应下面的源码
limiter.trySetRate(RateType.OVERALL, rate, rateInterval, keepAliveTime);

// 调用的redisson源码(lua脚本)
@Override
public RFuture<Boolean> trySetRateAsync(RateType type, long rate, Duration rateInterval, Duration keepAliveTime) {
if (!keepAliveTime.equals(Duration.ZERO) && keepAliveTime.toMillis() < rateInterval.toMillis()) {
throw new IllegalArgumentException("The parameter keepAliveTime should be greater than or equal to rateInterval");
}
return commandExecutor.evalWriteNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 设置一个 哈希的 key-map(field-value) 这样的数据结构,setnx 就是没有就创建(成功返回 1 ),有就不创建(失败返回 0 )
// 相当于创建了一个类(KEYS[1]),把里面的属性(rate、interval、keepAliveTime赋值)
// 用 json 表示,就是这样的一个元数据,redis 通过这个 KEYS[1] 找到下面的 map
//{
// "KEYS[1]":{
// "rate":"ARGV[1]", // 桶大小
// "interval":"ARGV[2]", // 补充周期
// "keepAliveTime":"ARGV[4]", // 过期时间 这个过期时间只是当一个属性保存起来,后面的才是真正设置 TTL
// "type":"ARGV[3]" // RateType.OVERALL
// }
//}
"redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);"
+ "redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);"
+ "redis.call('hsetnx', KEYS[1], 'keepAliveTime', ARGV[4]);"
+ "local res = redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);"
// 如果 通过 hsetnx 成功了就返回 1 过期时间大于 0 就进 if 分支
+ "if res == 1 and tonumber(ARGV[4]) > 0 then "
// 设置 这个元数据的过期时间
+ "redis.call('pexpire', KEYS[1], ARGV[4]); "
+ "end; "
+ "return res;",
// 这里的 getRawName() 就是通过 redissonClient.getRateLimiter("name") 设置的
Collections.singletonList(getRawName()),
// maxCapacity 桶大小 对应的 ARGV[1]
rate,
// supplementTimeOfSeconds 补充周期 转毫秒 对应的 ARGV[2]
rateInterval.toMillis(),
// ordinal 排序 RateType.OVERALL 就对应的 0 (因为它在第一个) 对应的 ARGV[3]
type.ordinal(),
// timeOutOfHours 过期时间 转毫秒 对应的 ARGV[4]
keepAliveTime.toMillis());
}

limiter.tryAcquire()


因为这里的字符串拼接太长了看着费劲,所以我把 java 代码和 lua 脚本拆出来了,lua 脚本在后面


java 代码主要看入参注释有哪些,lua 脚本就由我带着佬友解读


佬友可以打开 idea 追进 redissonRedissonRateLimiter 类看源码(我这里也是拷贝的源码,但是md文档肯定没有编辑器看着舒服),再配合我放在最后的流程图看着更舒适



//    limiter.tryAcquire(value);  
// 上面代码对应下面源码的注释
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
// 生成 16 位随机 ID
byte[] random = getServiceManager().generateIdArray();

return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"local rate = redis.call('hget', KEYS[1], 'rate');"
+ "local interval = redis.call('hget', KEYS[1], 'interval');"
+ "local type = redis.call('hget', KEYS[1], 'type');"
+ "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"
+ "local valueName = KEYS[2];"
+ "local permitsName = KEYS[4];"
+ "if type == '1' then "
+ "valueName = KEYS[3];"
+ "permitsName = KEYS[5];"
+ "end;"

+ "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount cannot exceed defined rate'); "

+ "local currentValue = redis.call('get', valueName); "
+ "local res;"
+ "if currentValue ~= false then "
+ "local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
+ "local released = 0; "
+ "for i, v in ipairs(expiredValues) do "
+ "local random, permits = struct.unpack('Bc0I', v);"
+ "released = released + permits;"
+ "end; "

+ "if released > 0 then "
+ "redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
+ "if tonumber(currentValue) + released > tonumber(rate) then "
+ "local values = redis.call('zrange', permitsName, 0, -1); "
+ "local used = 0; "
+ "for i, v in ipairs(values) do "
+ "local random, permits = struct.unpack('Bc0I', v);"
+ "used = used + permits;"
+ "end; "
+ "currentValue = tonumber(rate) - used; "
+ "else "
+ "currentValue = tonumber(currentValue) + released; "
+ "end; "
+ "redis.call('set', valueName, currentValue);"
+ "end;"

+ "if tonumber(currentValue) < tonumber(ARGV[1]) then "
+ "local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores'); "
+ "res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));"
+ "else "
+ "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "
+ "redis.call('decrby', valueName, ARGV[1]); "
+ "res = nil; "
+ "end; "
+ "else "
+ "redis.call('set', valueName, rate); "
+ "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "
+ "redis.call('decrby', valueName, ARGV[1]); "
+ "res = nil; "
+ "end;"

+ "local keepAliveTime = redis.call('hget', KEYS[1], 'keepAliveTime'); "
+ "if (keepAliveTime ~= false and tonumber(keepAliveTime) > 0) then "
+ "redis.call('pexpire', KEYS[1], keepAliveTime); "
+ "redis.call('pexpire', valueName, keepAliveTime); "
+ "redis.call('pexpire', permitsName, keepAliveTime); "
+ "else "
+ "local ttl = redis.call('pttl', KEYS[1]); "
+ "if ttl > 0 then "
+ "redis.call('pexpire', valueName, ttl); "
+ "redis.call('pexpire', permitsName, ttl); "
+ "end; "
+ "end; "
+ "return res;",
Arrays.asList(
// 就是 redissonClient.getRateLimiter("chat") 就是这个 chat,下面的 chat 也对应的这个
// KEYS[1]
getRawName(),
// 相当于创建一个新的 key 为 {chat}:value,这个 value 是硬编码(不是一个变量)
// KEYS[2]
getValueName(),
// 相当于创建一个新的 key 为 getValueName():serverId
// getValueName():serverId = {chat}:value:serverId
// 这个 serverId 是变量 对应一个服务(唯一ID)
// KEYS[3]
getClientValueName(),
// 相当于创建一个新的 key 为 {chat}:permits,这个 permits 是硬编码(不是一个变量)
// KEYS[4]
getPermitsName(),
// 相当于创建一个新的 key 为 getPermitsName():serverId
// getPermitsName():serverId = {chat}:permits:serverId
// 这个 serverId 是变量 对应一个服务(唯一ID)
// KEYS[5]
getClientPermitsName()
),
// 需要取多少个令牌 对应的 limiter.tryAcquire(value) 中的 value 不填默认为 1,就是取一个
// ARGV[1]
value,
// 时间戳
// ARGV[2]
System.currentTimeMillis(),
// 生成 16 位随机 ID
// ARGV[3]
random
);
}

limiter.tryAcquire()LUA脚本

-- 获取桶容量
local rate = redis.call('hget', KEYS[1], 'rate');
-- 获取补充周期
local interval = redis.call('hget', KEYS[1], 'interval');
-- 获取 RateType.OVERALL(全局限流)/RateType.PER_CLIENT(客户端限流)对应的 0/1
local type = redis.call('hget', KEYS[1], 'type');

-- 如果上述三个有一个为空则认为元数据没有被初始化,报错
-- assert 条件为 true 不报错 , ~= 不等于 , false 不存在
assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized');

-- 我就假设 KEYS[1] = "chat",serverId = 666,type = 0(全局限流) 讲解
-- 初始化 valueName = {chat}:value
local valueName = KEYS[2];
-- 初始化 和permitsName = {chat}:permits
local permitsName = KEYS[4];

-- 如果 type 为 RateType.PER_CLIENT(对应的1)
if type == '1' then
-- valueName = {chat}:value:666
valueName = KEYS[3];
-- permitsName = {chat}:permits:666
permitsName = KEYS[5];
end;

-- 如果取的令牌超过设置的最大桶容量就报错
assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount cannot exceed defined rate');

-- 当前时间可用令牌数:获取 key 为 {chat}:value 的值
local currentValue = redis.call('get', valueName);
-- 设置一个返回变量 res = nil(同 null )表示获取成功
local res;

-- 如果 currentValue 存在(即已经初始化过)
if currentValue ~= false then
-- 查询在 key 为 permitsName 的 zset 集合中(当前时间 - 补充周期)之前过期的令牌记录
local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval);
-- 累计可回收的令牌数
local released = 0;

-- 遍历所有过期的令牌记录
for i, v in ipairs(expiredValues) do
-- 解包每条记录:参数 'Bc0I' 表示结构为 1字节长度 + 字符串(随机ID)+ 4字节整数(令牌数)
-- 随机ID和该批次消耗的令牌数
local random, permits = struct.unpack('Bc0I', v);
-- 累加可回收的令牌
released = released + permits;
end;

-- 如果有可回收的令牌
if released > 0 then
-- 从有序集合中删除所有过期的记录
redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval);

-- 如果回收后剩余令牌会超过桶容量
-- lua脚本保证了原子性,但不能保证rate(元数据中的最大桶容量)的值没有被修改,如果被减小,了那么可用令牌+回收令牌就可能>rate,这时就要修改valueName了,如果增大了,还有令牌的情况下也没必要现在就改 valueName
if tonumber(currentValue) + released > tonumber(rate) then
-- 获取当前所有未过期的令牌记录
local values = redis.call('zrange', permitsName, 0, -1);
-- 计算当前实际已使用的令牌总数
local used = 0;

-- 遍历所有未过期的令牌记录(valueName 是当前剩余的令牌数,需要重新精确计算)
for i, v in ipairs(values) do
-- random 是之前生成但未使用的随机ID,permits 是该批次实际消耗的令牌数
local random, permits = struct.unpack('Bc0I', v);
-- 累加所有未过期记录中的令牌总数,得到当前已被占用的令牌数
used = used + permits;
end;

-- 剩余令牌数 = 桶容量 - 已使用数
currentValue = tonumber(rate) - used;
else
-- 否则直接将回收的令牌加回剩余令牌
currentValue = tonumber(currentValue) + released;
end;

-- 更新剩余令牌数到 Redis
redis.call('set', valueName, currentValue);
end;

-- 如果剩余令牌数不够本次请求
if tonumber(currentValue) < tonumber(ARGV[1]) then
-- 获取最早一次令牌记录的分数(时间戳)
local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores');
-- 计算需要等待的毫秒数:补充周期 - (当前时间 - 最早记录时间) + 3(最小等待时间)
res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));
else
-- 令牌足够:记录本次消耗的令牌批次
redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1]));
-- 减少剩余令牌数
redis.call('decrby', valueName, ARGV[1]);
-- 返回 nil 表示获取成功
res = nil;
end;
else
-- currentValue 不存在(首次获取令牌)
-- 之前判断了 获取的令牌数必须 <= 最大令牌桶容量 才能走到这
-- 初始化剩余令牌为桶容量
redis.call('set', valueName, rate);
-- 记录本次消耗的令牌批次
redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1]));
-- 减少剩余令牌数
redis.call('decrby', valueName, ARGV[1]);
-- 返回 nil 表示获取成功
res = nil;
end;

-- 获取限流器配置的 keepAliveTime(过期时间)
local keepAliveTime = redis.call('hget', KEYS[1], 'keepAliveTime');

-- 如果设置了 keepAliveTime 且大于 0
if (keepAliveTime ~= false and tonumber(keepAliveTime) > 0) then
-- 更新主 key 的过期时间
redis.call('pexpire', KEYS[1], keepAliveTime);
-- 更新剩余令牌 key 的过期时间
redis.call('pexpire', valueName, keepAliveTime);
-- 更新令牌记录有序集合的过期时间
redis.call('pexpire', permitsName, keepAliveTime);
else
-- 否则获取主 key(chat) 的剩余 TTL
local ttl = redis.call('pttl', KEYS[1]);
-- 如果 chat 还没有过期
if ttl > 0 then
-- 将剩余令牌 key 的过期时间设置与主 key 一致
redis.call('pexpire', valueName, ttl);
-- 将令牌记录有序集合的过期时间设置与主 key 一致
redis.call('pexpire', permitsName, ttl);
end;
end;
-- 返回结果:nil=成功获取,数字=需要等待的毫秒数
-- 返回数字后会重试,代码要用limiter.tryAcquire(Duration.ofSeconds(2));才有重试机制
return res;

limiter.tryAcquire()流程图


图片有点大,如果被压缩了可以找我要原图




二、全局每日调用计数限流:tryDaily



这个就没有什么好讲的了,因为redis执行时单线程的,所以每次请求都+1,到了最大调用数就不让访问就行了

需要注意的点就是设置TTL可以稍微大一点,可以避免时间临界问题



public boolean tryDaily(String keyPrefix, int maxCalls) {  
String key = "rl:daily:" + keyPrefix + ":" + LocalDate.now();
Long count = redisTemplate.opsForValue().increment(key);
// 首次创建时设置过期时间,28 小时后自动删除
if (count != null && count == 1L) {
redisTemplate.expire(key, 28, TimeUnit.HOURS);
}
if (count != null && count > maxCalls) {
log.warn("每日调用超限: key={}, count={}, max={}", keyPrefix, count, maxCalls);
return false; }
return true;
}
最新回复 (3)
  • starlin 06-25 10:42
    1

    为什么会选langchain 而不是springai

  • 我认不到你 楼主 06-25 10:57
    2

    哈哈哈,好问题,因为这是jdk8的项目,springai需要springboot3.x,至少需要jdk17+

    这个项目是因为我工作中就是用的jdk8,之前迭代有个需求就是RAG,所以这个开源项目是我总结了一下搭建了一个通用的项目

  • 章鱼烧帅哥 06-29 15:33
    3

    langchan4j自由度更高,spring ai的规范性更强,做AI这方面还是langchan4j更好使

    另外我想问问佬,你有没有碰到过Alibaba依赖与chroma依赖冲突的情况?spring AI1.0.0逻辑层一直请求chroma的V2接口,但是chroma依赖只支持v1接口,如果降低spring AI版本那Alibaba就没法用了,就很矛盾

* 帖子来源Linux.do
返回