优化ws模块

This commit is contained in:
Jerry 2025-04-13 13:56:37 +08:00
parent b42a152e46
commit 5bdb43b32f
8 changed files with 159 additions and 73 deletions

View File

@ -3,7 +3,7 @@ import logger from './utils/core/logger';
import config from './utils/core/config'; import config from './utils/core/config';
import redis from './services/redis/redis'; import redis from './services/redis/redis';
config.check(['PORT', 'DEBUG', 'RD_PORT', 'RD_ADD']); config.check(['PORT', 'DEBUG', 'RD_PORT', 'RD_ADD', 'WS_SECRET', 'WS_PORT']);
const PORT = config.get('PORT') || 3000; const PORT = config.get('PORT') || 3000;
apps apps

View File

@ -17,6 +17,7 @@ class RedisService {
private async initialize() { private async initialize() {
await this.connectWithRetry(); await this.connectWithRetry();
this.setupEventListeners(); this.setupEventListeners();
//await this.test();
} }
private async connectWithRetry(): Promise<void> { private async connectWithRetry(): Promise<void> {

View File

@ -1,27 +1,41 @@
import WebSocket from 'ws'; import { AuthenticatedSocket } from '../../types/ws';
import WsMessage from '../../types/wsMessage'; import wsTools from '../../utils/ws/wsTools';
import * as ws from 'ws';
type WebSocket = ws.WebSocket;
class WSMessageHandler { class WSMessageHandler {
public async handle(socket: WebSocket, clientID: string, msg: WsMessage) { async handle(socket: AuthenticatedSocket, clientId: string, msg: any) {
try {
switch (msg.type) { switch (msg.type) {
case 'test': case 'test':
await this.reply(socket, { type: 'test', data: 'hi' }); await this.handleTest(socket);
break; break;
case 'ping': case 'ping':
await this.reply(socket, { type: 'pong' }); await wsTools.send(socket, { type: 'pong' });
break; break;
default: default:
await this.reply(socket, { type: 'error', message: 'Unknown message' }); await this.handleUnknown(socket);
break; }
} catch (err) {
await wsTools.send(socket, {
type: 'error',
message: 'Processing failed',
});
} }
} }
private async reply(socket: WebSocket, data: any): Promise<void> { private async handleTest(socket: WebSocket) {
return new Promise((resolve, reject) => { await wsTools.send(socket, {
socket.send(JSON.stringify(data), (err) => { type: 'test',
if (err) reject(err); data: { status: 'ok' },
else resolve();
}); });
}
private async handleUnknown(socket: WebSocket) {
await wsTools.send(socket, {
type: 'error',
message: 'Unknown message type',
}); });
} }
} }

View File

@ -1,72 +1,83 @@
import WebSocket, { WebSocketServer } from 'ws'; import WebSocket, { WebSocketServer } from 'ws';
import config from '../../utils/core/config'; import config from '../../utils/core/config';
import wsClientManager from './wsClientManager';
import wsHandler from './handler';
import logger from '../../utils/core/logger'; import logger from '../../utils/core/logger';
import { AuthenticatedSocket, AuthMessage, WSMessage } from '../../types/ws';
interface AuthenticatedSocket extends WebSocket { import WsTools from '../../utils/ws/wsTools';
isAuthed?: boolean; import wsHandler from './handler';
clientId?: string; import { clearInterval } from 'node:timers';
} import wsClientManager from './wsClientManager';
class WSServer { class WSServer {
private wss: WebSocketServer; private readonly wss: WebSocketServer;
private PORT = config.get('WS_PORT'); private readonly port = Number(config.get('WS_PORT'));
private WS_SECRET = config.get('WS_SECRET'); private readonly secret = config.get('WS_SECRET');
constructor() { constructor() {
this.wss = new WebSocketServer({ port: Number(this.PORT) }); this.wss = new WebSocketServer({ port: this.port });
this.init(); this.init();
logger.info(`WebSocket Server started at ws://localhost:${this.PORT}`); logger.info(`WS Server listening on ws://localhost:${this.port}`);
} }
private init() { private init(): void {
this.wss.on('connection', (socket: AuthenticatedSocket) => { this.wss.on('connection', (socket: AuthenticatedSocket) => {
socket.heartbeat = WsTools.setUpHeartbeat(socket);
socket.on('message', async (raw) => { socket.on('message', async (raw) => {
let msg: any; const msg = WsTools.parseMessage<WSMessage>(raw);
try { if (!msg) return this.handleInvalidMessage(socket);
msg = JSON.parse(raw.toString());
} catch {
return this.send(socket, { type: 'error', message: 'JSON 解析失败' });
}
// 鉴权 await this.routeMessage(socket, msg);
if (!socket.isAuthed) {
if (msg.type === 'auth' && msg.secret === this.WS_SECRET && msg.clientId) {
socket.isAuthed = true;
socket.clientId = msg.clientId;
wsClientManager.add(msg.clientId, socket);
return this.send(socket, { type: 'auth', success: true });
}
return this.send(socket, { type: 'auth', success: false });
}
// 业务处理
if (socket.clientId) {
try {
await wsHandler.handle(socket, socket.clientId, msg);
} catch (e) {
await this.send(socket, { type: 'error', message: '处理出错' });
}
}
}); });
socket.on('close', () => { socket.on('close', () => {
if (socket.clientId) { this.handleDisconnect(socket);
wsClientManager.remove(socket.clientId);
}
}); });
}); });
} }
private async send(socket: WebSocket, data: any): Promise<void> { private async handleInvalidMessage(socket: WebSocket) {
return new Promise((resolve, reject) => { await WsTools.send(socket, {
socket.send(JSON.stringify(data), (err) => { type: 'error',
if (err) reject(err); message: 'Invalid message format',
else resolve();
});
}); });
} }
private async routeMessage(socket: AuthenticatedSocket, msg: WSMessage) {
if (!socket.isAuthed) {
if (this.isAuthMessage(msg)) {
await this.handleAuth(socket, msg);
}
return;
}
if (socket.clientId) {
await wsHandler.handle(socket, socket.clientId, msg);
}
}
private isAuthMessage(msg: WSMessage): msg is AuthMessage {
return (
msg.type === 'auth' &&
typeof (msg as AuthMessage).secret === 'string' &&
typeof (msg as AuthMessage).clientId === 'string'
);
}
private async handleAuth(socket: AuthenticatedSocket, msg: AuthMessage) {
if (msg.secret === this.secret) {
socket.isAuthed = true;
socket.clientId = msg.clientId;
wsClientManager.add(msg.clientId, socket);
await WsTools.send(socket, { type: 'auth', success: true });
} else {
await WsTools.send(socket, { type: 'auth', success: false });
}
}
private handleDisconnect(socket: AuthenticatedSocket) {
if (socket.heartbeat) clearInterval(socket.heartbeat);
if (socket.clientId) wsClientManager.remove(socket.clientId);
}
} }
const wsServer = new WSServer(); const wsServer = new WSServer();

18
src/types/ws.ts Normal file
View File

@ -0,0 +1,18 @@
import WebSocket from 'ws';
export interface AuthenticatedSocket extends WebSocket {
isAuthed?: boolean;
clientId?: string;
heartbeat?: NodeJS.Timeout;
}
export interface WSMessage {
type: string;
[key: string]: unknown;
}
export interface AuthMessage extends WSMessage {
type: 'auth';
secret: string;
clientId: string;
}

View File

@ -1,6 +0,0 @@
interface wsMessage {
type: string;
[key: string]: any;
}
export default wsMessage;

View File

@ -36,6 +36,10 @@ class fc {
} }
} }
/**
*
* @param message
*/
public static async logToFile(message: string): Promise<void> { public static async logToFile(message: string): Promise<void> {
const logFile = path.join(paths.get('log'), `${date.getCurrentDate()}.log`); const logFile = path.join(paths.get('log'), `${date.getCurrentDate()}.log`);
const logMessage = `${message}\n`; const logMessage = `${message}\n`;

44
src/utils/ws/wsTools.ts Normal file
View File

@ -0,0 +1,44 @@
import WebSocket from 'ws';
import logger from '../core/logger';
import { setInterval } from 'node:timers';
class WsTools {
/**
*
*/
static async send(socket: WebSocket, data: unknown): Promise<boolean> {
if (socket.readyState !== WebSocket.OPEN) return false;
return new Promise((resolve) => {
socket.send(JSON.stringify(data), (err) => {
resolve(!err);
});
});
}
/**
*
*/
static parseMessage<T>(data: WebSocket.RawData): T | null {
try {
return JSON.parse(data.toString()) as T;
} catch (err) {
logger.error(err);
return null;
}
}
/**
*
*/
static setUpHeartbeat(socket: WebSocket, interval = 30000): NodeJS.Timeout {
const heartbeat = () => {
if (socket.readyState === WebSocket.OPEN) {
WsTools.send(socket, { type: 'ping' });
}
};
return setInterval(heartbeat, interval);
}
}
export default WsTools;