From 964cc489931fb25010b27a4d18bc323030f2543c Mon Sep 17 00:00:00 2001 From: Jerry Date: Tue, 29 Apr 2025 18:43:23 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E6=B3=A8=E9=87=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/services/ws/handler.ts | 2 +- src/services/ws/wsClientManager.ts | 55 +++++++++++++++++++++++++----- src/utils/redis/persistence.ts | 20 +++++++++-- src/utils/ws/wsTools.ts | 5 +++ 4 files changed, 71 insertions(+), 11 deletions(-) diff --git a/src/services/ws/handler.ts b/src/services/ws/handler.ts index 617fde6..4cd665f 100644 --- a/src/services/ws/handler.ts +++ b/src/services/ws/handler.ts @@ -21,7 +21,7 @@ class WSMessageHandler { async handle(socket: AuthenticatedSocket, clientId: string, msg: any) { try { - //检查是否是 pendingRequests 的回包 + //如果是 pendingRequests 的回包 if (msg.requestId && wsClientManager.resolvePendingRequest(msg.requestId, msg)) { return; } diff --git a/src/services/ws/wsClientManager.ts b/src/services/ws/wsClientManager.ts index 998dbc6..94469cd 100644 --- a/src/services/ws/wsClientManager.ts +++ b/src/services/ws/wsClientManager.ts @@ -8,30 +8,54 @@ const pendingRequests = new Map void>(); class WSClientManager { private clients = new Map(); + /** + * 添加ws客户端实例 + * @param id 标识符 + * @param socket + */ public add(id: ClientID, socket: WebSocket) { this.clients.set(id, socket); } + /** + * 移除ws客户端实例 + * @param id + */ public remove(id: ClientID) { this.clients.delete(id); } + /** + * 获取ws客户端实例 + * @param id + */ public get(id: ClientID): WebSocket | undefined { return this.clients.get(id); } - public async send(id: ClientID, payload: any): Promise { + /** + * 发送消息到ws客户端 + * @param id ws客户端标识符 + * @param data 要发送的内容 + */ + public async send(id: ClientID, data: any): Promise { const socket = this.clients.get(id); if (!socket || socket.readyState !== WebSocket.OPEN) return false; - return this.safeSend(socket, payload); + return this.safeSend(socket, data); } - public async sendAndWait(id: ClientID, payload: any, timeout = 5000): Promise { + /** + * ws发送请求&等待回调 + * @param id ws客户端标识符-id + * @param data 发送的信息 + * @param timeout 超时时间 默认5秒 + */ + public async sendAndWait(id: ClientID, data: any, timeout = 5000): Promise { const socket = this.clients.get(id); if (!socket) return; - payload.requestId = uuidv4(); - const requestId = payload.requestId; + data.requestId = uuidv4(); + const requestId = data.requestId; return new Promise((resolve, reject) => { const timer = setTimeout(() => { @@ -45,7 +69,7 @@ class WSClientManager { resolve(response); }); - this.safeSend(socket, payload).catch((err) => { + this.safeSend(socket, data).catch((err) => { clearTimeout(timer); pendingRequests.delete(requestId); reject(err); @@ -53,6 +77,11 @@ class WSClientManager { }); } + /** + * 处理回调 + * @param requestId + * @param data + */ public resolvePendingRequest(requestId: string, data: any): boolean { const callback = pendingRequests.get(requestId); if (callback) { @@ -63,10 +92,14 @@ class WSClientManager { return false; } - public async broadcast(payload: any): Promise { + /** + * 广播消息到全部ws客户端 + * @param data 消息 + */ + public async broadcast(data: any): Promise { const tasks = Array.from(this.clients.values()).map((socket) => { if (socket.readyState === WebSocket.OPEN) { - return this.safeSend(socket, payload); + return this.safeSend(socket, data); } else { return Promise.resolve(); } @@ -74,6 +107,12 @@ class WSClientManager { await Promise.all(tasks); } + /** + * 安全发送消息到ws客户端 + * @param socket ws客户端 + * @param data 发送的内容,会自动格式化 + * @private + */ private async safeSend(socket: WebSocket, data: any): Promise { return new Promise((resolve, reject) => { socket.send(JSON.stringify(data), (err) => { diff --git a/src/utils/redis/persistence.ts b/src/utils/redis/persistence.ts index 104b138..7f3d392 100644 --- a/src/utils/redis/persistence.ts +++ b/src/utils/redis/persistence.ts @@ -9,7 +9,12 @@ class Persistence { return path.join(paths.get('userData'), dataName, `${fileName}.json`); } - private static async ensureUserPath(dataName: string): Promise { + /** + * 确保数据目录存在 + * @param dataName + * @private + */ + private static async ensureDataPath(dataName: string): Promise { const dataPath = path.join(paths.get('userData'), dataName); try { await fc.createDir(dataPath, false); @@ -18,12 +23,18 @@ class Persistence { } } + /** + * 将数据写入本地,以json格式存储 + * @param dataName 目录名 + * @param data 文件内容 + * @param fileName 文件名 + */ public static async writeDataLocal( dataName: string, data: T, fileName: string ): Promise { - await this.ensureUserPath(dataName); + await this.ensureDataPath(dataName); const filePath = this.getDataPath(dataName, fileName); try { @@ -34,6 +45,11 @@ class Persistence { } } + /** + * 从本地读取文件 + * @param dataName 目录名 + * @param fileName 文件名 + */ public static async readDataLocal(dataName: string, fileName: string): Promise { const filePath = this.getDataPath(dataName, fileName); diff --git a/src/utils/ws/wsTools.ts b/src/utils/ws/wsTools.ts index 2820599..67995b9 100644 --- a/src/utils/ws/wsTools.ts +++ b/src/utils/ws/wsTools.ts @@ -5,6 +5,8 @@ import { setInterval } from 'node:timers'; class WsTools { /** * 发送消息 + * @param socket + * @param data */ static async send(socket: WebSocket, data: unknown): Promise { if (socket.readyState !== WebSocket.OPEN) return false; @@ -18,6 +20,7 @@ class WsTools { /** * 解析消息 + * @param data */ static parseMessage(data: WebSocket.RawData): T | null { try { @@ -30,6 +33,8 @@ class WsTools { /** * 心跳检测 + * @param socket + * @param interval */ static setUpHeartbeat(socket: WebSocket, interval = 30000): NodeJS.Timeout { const heartbeat = () => {