新增等待ws请求功能

This commit is contained in:
Jerry 2025-04-29 14:01:48 +08:00
parent c96d53dc44
commit 5f28555f57
4 changed files with 62 additions and 4 deletions

View File

@ -15,6 +15,7 @@
"ioredis": "^5.6.0", "ioredis": "^5.6.0",
"mkdirp": "^3.0.1", "mkdirp": "^3.0.1",
"simple-git": "^3.27.0", "simple-git": "^3.27.0",
"uuid": "^11.1.0",
"ws": "^8.18.1" "ws": "^8.18.1"
}, },
"devDependencies": { "devDependencies": {

9
pnpm-lock.yaml generated
View File

@ -32,6 +32,9 @@ importers:
simple-git: simple-git:
specifier: ^3.27.0 specifier: ^3.27.0
version: 3.27.0 version: 3.27.0
uuid:
specifier: ^11.1.0
version: 11.1.0
ws: ws:
specifier: ^8.18.1 specifier: ^8.18.1
version: 8.18.1 version: 8.18.1
@ -746,6 +749,10 @@ packages:
resolution: {integrity: sha512-pMZTvIkT1d+TFGvDOqodOclx0QWkkgi6Tdoa8gC8ffGAAqz9pzPTZWAybbsHHoED/ztMtkv/VoYTYyShUn81hA==} resolution: {integrity: sha512-pMZTvIkT1d+TFGvDOqodOclx0QWkkgi6Tdoa8gC8ffGAAqz9pzPTZWAybbsHHoED/ztMtkv/VoYTYyShUn81hA==}
engines: {node: '>= 0.4.0'} engines: {node: '>= 0.4.0'}
uuid@11.1.0:
resolution: {integrity: sha512-0/A9rDy9P7cJ+8w1c9WD9V//9Wj15Ce2MPz8Ri6032usz+NfePxx5AcN3bN+r6ZL6jEo066/yNYB3tn4pQEx+A==}
hasBin: true
v8-compile-cache-lib@3.0.1: v8-compile-cache-lib@3.0.1:
resolution: {integrity: sha512-wa7YjyUGfNZngI/vtK0UHAN+lgDCxBPCylVXGp0zu59Fz5aiGtNXaq3DhIov063MorB+VfufLh3JlF2KdTK3xg==} resolution: {integrity: sha512-wa7YjyUGfNZngI/vtK0UHAN+lgDCxBPCylVXGp0zu59Fz5aiGtNXaq3DhIov063MorB+VfufLh3JlF2KdTK3xg==}
@ -1491,6 +1498,8 @@ snapshots:
utils-merge@1.0.1: {} utils-merge@1.0.1: {}
uuid@11.1.0: {}
v8-compile-cache-lib@3.0.1: {} v8-compile-cache-lib@3.0.1: {}
vary@1.1.2: {} vary@1.1.2: {}

View File

@ -3,6 +3,7 @@ import wsTools from '../../utils/ws/wsTools';
import { WebSocket } from 'ws'; import { WebSocket } from 'ws';
import logger from '../../utils/core/logger'; import logger from '../../utils/core/logger';
import redisService from '../redis/redis'; import redisService from '../redis/redis';
import wsClientManager from './wsClientManager';
type MessageHandler = (socket: WebSocket, msg: any) => Promise<void>; type MessageHandler = (socket: WebSocket, msg: any) => Promise<void>;
@ -20,6 +21,11 @@ class WSMessageHandler {
async handle(socket: AuthenticatedSocket, clientId: string, msg: any) { async handle(socket: AuthenticatedSocket, clientId: string, msg: any) {
try { try {
//检查是否是 pendingRequests 的回包
if (msg.requestId && wsClientManager.resolvePendingRequest(msg.requestId, msg)) {
return;
}
const handler = this.handlers.get(msg.type); const handler = this.handlers.get(msg.type);
if (handler) { if (handler) {
@ -37,7 +43,6 @@ class WSMessageHandler {
} }
private async handleTest(socket: WebSocket, msg: any) { private async handleTest(socket: WebSocket, msg: any) {
//logger.info(`消息测试[test]`);
await wsTools.send(socket, { await wsTools.send(socket, {
type: 'test', type: 'test',
data: { status: 'ok' }, data: { status: 'ok' },
@ -45,7 +50,6 @@ class WSMessageHandler {
} }
private async handlePing(socket: WebSocket, msg: any) { private async handlePing(socket: WebSocket, msg: any) {
//logger.info(`ping`);
await wsTools.send(socket, { type: 'pong' }); await wsTools.send(socket, { type: 'pong' });
} }

View File

@ -1,6 +1,9 @@
import WebSocket from 'ws'; import WebSocket from 'ws';
import { v4 as uuidv4 } from 'uuid';
import { clearTimeout } from 'node:timers';
type ClientID = string; type ClientID = string;
const pendingRequests = new Map<string, (data: any) => void>();
class WSClientManager { class WSClientManager {
private clients = new Map<ClientID, WebSocket>(); private clients = new Map<ClientID, WebSocket>();
@ -23,9 +26,50 @@ class WSClientManager {
return this.safeSend(socket, payload); return this.safeSend(socket, payload);
} }
public async sendAndWait(id: ClientID, payload: any, timeout = 5000): Promise<any> {
const socket = this.clients.get(id);
if (!socket) return;
payload.requestId = uuidv4();
const requestId = payload.requestId;
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
pendingRequests.delete(requestId);
reject(new Error(`${requestId}: 请求超时`));
}, timeout);
pendingRequests.set(requestId, (response) => {
clearTimeout(timer);
pendingRequests.delete(requestId);
resolve(response);
});
this.safeSend(socket, payload).catch((err) => {
clearTimeout(timer);
pendingRequests.delete(requestId);
reject(err);
});
});
}
public resolvePendingRequest(requestId: string, data: any): boolean {
const callback = pendingRequests.get(requestId);
if (callback) {
pendingRequests.delete(requestId);
callback(data);
return true;
}
return false;
}
public async broadcast(payload: any): Promise<void> { public async broadcast(payload: any): Promise<void> {
const tasks = Array.from(this.clients.values()).map((socket) => { const tasks = Array.from(this.clients.values()).map((socket) => {
socket.readyState === WebSocket.OPEN ? this.safeSend(socket, payload) : Promise.resolve(); if (socket.readyState === WebSocket.OPEN) {
return this.safeSend(socket, payload);
} else {
return Promise.resolve();
}
}); });
await Promise.all(tasks); await Promise.all(tasks);
} }
@ -33,7 +77,7 @@ class WSClientManager {
private async safeSend(socket: WebSocket, data: any): Promise<boolean> { private async safeSend(socket: WebSocket, data: any): Promise<boolean> {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
socket.send(JSON.stringify(data), (err) => { socket.send(JSON.stringify(data), (err) => {
if (err) reject(false); if (err) reject(new Error('发送失败'));
else resolve(true); else resolve(true);
}); });
}); });