diff --git a/package.json b/package.json index 16507bd..75855a6 100644 --- a/package.json +++ b/package.json @@ -15,6 +15,7 @@ "ioredis": "^5.6.0", "mkdirp": "^3.0.1", "simple-git": "^3.27.0", + "uuid": "^11.1.0", "ws": "^8.18.1" }, "devDependencies": { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 8e2b495..c1b5e22 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -32,6 +32,9 @@ importers: simple-git: specifier: ^3.27.0 version: 3.27.0 + uuid: + specifier: ^11.1.0 + version: 11.1.0 ws: specifier: ^8.18.1 version: 8.18.1 @@ -746,6 +749,10 @@ packages: resolution: {integrity: sha512-pMZTvIkT1d+TFGvDOqodOclx0QWkkgi6Tdoa8gC8ffGAAqz9pzPTZWAybbsHHoED/ztMtkv/VoYTYyShUn81hA==} engines: {node: '>= 0.4.0'} + uuid@11.1.0: + resolution: {integrity: sha512-0/A9rDy9P7cJ+8w1c9WD9V//9Wj15Ce2MPz8Ri6032usz+NfePxx5AcN3bN+r6ZL6jEo066/yNYB3tn4pQEx+A==} + hasBin: true + v8-compile-cache-lib@3.0.1: resolution: {integrity: sha512-wa7YjyUGfNZngI/vtK0UHAN+lgDCxBPCylVXGp0zu59Fz5aiGtNXaq3DhIov063MorB+VfufLh3JlF2KdTK3xg==} @@ -1491,6 +1498,8 @@ snapshots: utils-merge@1.0.1: {} + uuid@11.1.0: {} + v8-compile-cache-lib@3.0.1: {} vary@1.1.2: {} diff --git a/src/services/ws/handler.ts b/src/services/ws/handler.ts index 3827b63..617fde6 100644 --- a/src/services/ws/handler.ts +++ b/src/services/ws/handler.ts @@ -3,6 +3,7 @@ import wsTools from '../../utils/ws/wsTools'; import { WebSocket } from 'ws'; import logger from '../../utils/core/logger'; import redisService from '../redis/redis'; +import wsClientManager from './wsClientManager'; type MessageHandler = (socket: WebSocket, msg: any) => Promise; @@ -20,6 +21,11 @@ class WSMessageHandler { async handle(socket: AuthenticatedSocket, clientId: string, msg: any) { try { + //检查是否是 pendingRequests 的回包 + if (msg.requestId && wsClientManager.resolvePendingRequest(msg.requestId, msg)) { + return; + } + const handler = this.handlers.get(msg.type); if (handler) { @@ -37,7 +43,6 @@ class WSMessageHandler { } private async handleTest(socket: WebSocket, msg: any) { - //logger.info(`消息测试[test]`); await wsTools.send(socket, { type: 'test', data: { status: 'ok' }, @@ -45,7 +50,6 @@ class WSMessageHandler { } private async handlePing(socket: WebSocket, msg: any) { - //logger.info(`ping`); await wsTools.send(socket, { type: 'pong' }); } diff --git a/src/services/ws/wsClientManager.ts b/src/services/ws/wsClientManager.ts index 2d89e46..998dbc6 100644 --- a/src/services/ws/wsClientManager.ts +++ b/src/services/ws/wsClientManager.ts @@ -1,6 +1,9 @@ import WebSocket from 'ws'; +import { v4 as uuidv4 } from 'uuid'; +import { clearTimeout } from 'node:timers'; type ClientID = string; +const pendingRequests = new Map void>(); class WSClientManager { private clients = new Map(); @@ -23,9 +26,50 @@ class WSClientManager { return this.safeSend(socket, payload); } + public async sendAndWait(id: ClientID, payload: any, timeout = 5000): Promise { + const socket = this.clients.get(id); + if (!socket) return; + + payload.requestId = uuidv4(); + const requestId = payload.requestId; + + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + pendingRequests.delete(requestId); + reject(new Error(`${requestId}: 请求超时`)); + }, timeout); + + pendingRequests.set(requestId, (response) => { + clearTimeout(timer); + pendingRequests.delete(requestId); + resolve(response); + }); + + this.safeSend(socket, payload).catch((err) => { + clearTimeout(timer); + pendingRequests.delete(requestId); + reject(err); + }); + }); + } + + public resolvePendingRequest(requestId: string, data: any): boolean { + const callback = pendingRequests.get(requestId); + if (callback) { + pendingRequests.delete(requestId); + callback(data); + return true; + } + return false; + } + public async broadcast(payload: any): Promise { const tasks = Array.from(this.clients.values()).map((socket) => { - socket.readyState === WebSocket.OPEN ? this.safeSend(socket, payload) : Promise.resolve(); + if (socket.readyState === WebSocket.OPEN) { + return this.safeSend(socket, payload); + } else { + return Promise.resolve(); + } }); await Promise.all(tasks); } @@ -33,7 +77,7 @@ class WSClientManager { private async safeSend(socket: WebSocket, data: any): Promise { return new Promise((resolve, reject) => { socket.send(JSON.stringify(data), (err) => { - if (err) reject(false); + if (err) reject(new Error('发送失败')); else resolve(true); }); });