🔧 feat(meme.controller): add active connection management per IP to limit concurrent requests and prevent abuse

This commit is contained in:
Jerry 2025-12-01 17:21:20 +08:00
parent fc2ffeb145
commit 01609a1d27

View File

@ -46,6 +46,8 @@ class MemeRequestDto {
@ApiTags('Meme') @ApiTags('Meme')
export class MemeController { export class MemeController {
private readonly logger = new Logger(MemeController.name); private readonly logger = new Logger(MemeController.name);
private static readonly activeConnections = new Map<string, number>();
private static readonly maxConnectionsPerIp = 3; // 每IP最大并发连接数
constructor( constructor(
@Inject(MemeService) @Inject(MemeService)
@ -96,6 +98,33 @@ export class MemeController {
return this.handleMemeRequest(query, res, ip, 'GET'); return this.handleMemeRequest(query, res, ip, 'GET');
} }
/**
* IP活跃连接数
* @param ip IP地址
* @returns
*/
private incrementActiveConnections(ip: string): boolean {
const current = MemeController.activeConnections.get(ip) || 0;
if (current >= MemeController.maxConnectionsPerIp) {
return false;
}
MemeController.activeConnections.set(ip, current + 1);
return true;
}
/**
* IP活跃连接数
* @param ip IP地址
*/
private decrementActiveConnections(ip: string): void {
const current = MemeController.activeConnections.get(ip) || 0;
if (current <= 1) {
MemeController.activeConnections.delete(ip);
} else {
MemeController.activeConnections.set(ip, current - 1);
}
}
/** /**
* *
* @param dto * @param dto
@ -111,6 +140,15 @@ export class MemeController {
method: string, method: string,
) { ) {
try { try {
if (!this.incrementActiveConnections(ip)) {
this.logger.warn(`[${method}] ${ip} 并发连接数超限`);
res.status(429).json({
success: false,
message: '请求过于频繁,请稍后再试',
});
return;
}
const realToken = dto.token; const realToken = dto.token;
const hasValidToken = const hasValidToken =
realToken && this.toolsService.checkToken(realToken); realToken && this.toolsService.checkToken(realToken);
@ -121,6 +159,7 @@ export class MemeController {
); );
if (!memePath) { if (!memePath) {
this.decrementActiveConnections(ip);
throw ErrorUtil.createNotFoundError('表情包'); throw ErrorUtil.createNotFoundError('表情包');
} }
@ -146,38 +185,79 @@ export class MemeController {
type?.mime === 'image/webp' || type?.mime === 'image/webp' ||
type?.mime === 'image/apng'; type?.mime === 'image/apng';
//this.logger.debug(type?.mime); const singleRate = 400 * 1024; // 400 KB/s
const singleRate = 200 * 1024; // 100 KB/s * 3
const maxThreads = 2; const maxThreads = 2;
const maxRate = singleRate * maxThreads; const maxRate = singleRate * maxThreads; // 800 KB/s 每IP
const trafficWindow = 60; // 流量统计窗口60秒
const cleanup = () => {
this.decrementActiveConnections(ip);
};
if (hasValidToken) { if (hasValidToken) {
this.logger.log(`[${method}] 有token的入不限速 => ${memePath}`); this.logger.log(`[${method}] 有token的入不限速 => ${memePath}`);
stream.pipe(res); stream.pipe(res);
stream.on('end', cleanup);
stream.on('error', cleanup);
} else { } else {
stream.on('data', async (chunk) => { let totalBytes = 0;
const bytes = chunk.length;
const total = await this.redisService.incrementIpTraffic( stream.on('data', (chunk) => {
totalBytes += chunk.length;
});
stream.on('end', async () => {
cleanup();
try {
await this.redisService.incrementIpTraffic(
ip, ip,
bytes, totalBytes,
1, trafficWindow,
); );
if (total > maxRate && !isAnimatedImage) { } catch (error) {
this.logger.warn(`[${method}] ${ip} 超过速率限制,断开连接..`); this.logger.error(`更新流量统计失败: ${error.message}`);
stream.destroy();
res.end();
} }
}); });
stream.on('error', (error) => {
cleanup();
this.logger.error(`流传输错误: ${error.message}`);
});
try {
const currentTraffic = await this.redisService.getIpTraffic(ip);
if (currentTraffic > maxRate && !isAnimatedImage) {
this.logger.warn(
`[${method}] ${ip} 流量超限 (${currentTraffic} > ${maxRate}), 拒绝请求`,
);
stream.destroy();
this.decrementActiveConnections(ip);
res.status(429).json({
success: false,
message: '请求过于频繁,请稍后再试',
});
return;
}
} catch (error) {
this.logger.error(`检查流量失败: ${error.message}`);
}
const throttle = new Throttle({ rate: singleRate }); const throttle = new Throttle({ rate: singleRate });
this.logger.log( this.logger.log(
`[${method}] 白嫖入限速! (${ip}) => ${memePath} `[${method}] 白嫖入限速! (${ip}) => ${memePath} (${isAnimatedImage ? '动态图片不限速' : '静态图片限速'})`,
`,
); );
if (isAnimatedImage) {
stream.pipe(res);
} else {
stream.pipe(throttle).pipe(res); stream.pipe(throttle).pipe(res);
} }
}
} catch (e) { } catch (e) {
throw ErrorUtil.handleUnknownError(e, '获取表情包失败', 'handleMemeRequest'); this.decrementActiveConnections(ip);
throw ErrorUtil.handleUnknownError(
e,
'获取表情包失败',
'handleMemeRequest',
);
} }
} }