mirror of
https://github.com/crystelf/crystelf-core.git
synced 2025-10-14 05:19:19 +00:00
ws模块
This commit is contained in:
parent
08f74445da
commit
3b546e50a1
@ -20,13 +20,16 @@
|
||||
"@nestjs/config": "^4.0.2",
|
||||
"@nestjs/core": "^11.0.1",
|
||||
"@nestjs/platform-express": "^11.0.1",
|
||||
"@nestjs/platform-socket.io": "^11.1.6",
|
||||
"@nestjs/swagger": "^11.2.0",
|
||||
"@nestjs/websockets": "^11.1.6",
|
||||
"axios": "^1.10.0",
|
||||
"ioredis": "^5.6.1",
|
||||
"reflect-metadata": "^0.2.2",
|
||||
"rxjs": "^7.8.1",
|
||||
"simple-git": "^3.28.0",
|
||||
"ssh2": "^1.16.0",
|
||||
"uuid": "^11.1.0",
|
||||
"ws": "^8.18.3"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
8274
pnpm-lock.yaml
generated
8274
pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load Diff
@ -7,6 +7,7 @@ import { ToolsModule } from './core/tools/tools.module';
|
||||
import { AutoUpdateModule } from './core/auto-update/auto-update.module';
|
||||
import { PersistenceModule } from './core/persistence/persistence.module';
|
||||
import { RedisModule } from './core/redis/redis.module';
|
||||
import { WsModule } from './core/ws/ws.module';
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
@ -18,6 +19,7 @@ import { RedisModule } from './core/redis/redis.module';
|
||||
PersistenceModule,
|
||||
AutoUpdateModule,
|
||||
RedisModule,
|
||||
WsModule,
|
||||
],
|
||||
})
|
||||
export class AppModule {}
|
||||
|
@ -19,6 +19,11 @@ export class PersistenceService {
|
||||
private readonly fileService: FilesService,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* 确保有某个目录
|
||||
* @param dataName 目录名/资源名
|
||||
* @private
|
||||
*/
|
||||
private async ensureDataPath(dataName: string): Promise<void> {
|
||||
const dataPath = path.join(this.paths.get('userData'), dataName);
|
||||
try {
|
||||
|
86
src/core/ws/ws-client.manager.ts
Normal file
86
src/core/ws/ws-client.manager.ts
Normal file
@ -0,0 +1,86 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import WebSocket from 'ws';
|
||||
import { v4 as uuidv4 } from 'uuid';
|
||||
|
||||
type ClientID = string;
|
||||
const pendingRequests = new Map<string, (data: any) => void>();
|
||||
|
||||
@Injectable()
|
||||
export class WsClientManager {
|
||||
private clients = new Map<ClientID, WebSocket>();
|
||||
|
||||
add(id: ClientID, socket: WebSocket) {
|
||||
this.clients.set(id, socket);
|
||||
}
|
||||
|
||||
remove(id: ClientID) {
|
||||
this.clients.delete(id);
|
||||
}
|
||||
|
||||
get(id: ClientID): WebSocket | undefined {
|
||||
return this.clients.get(id);
|
||||
}
|
||||
|
||||
async send(id: ClientID, data: any): Promise<boolean> {
|
||||
const socket = this.clients.get(id);
|
||||
if (!socket || socket.readyState !== WebSocket.OPEN) return false;
|
||||
return this.safeSend(socket, data);
|
||||
}
|
||||
|
||||
async sendAndWait(id: ClientID, data: any, timeout = 5000): Promise<any> {
|
||||
const socket = this.clients.get(id);
|
||||
if (!socket) return;
|
||||
|
||||
data.requestId = uuidv4();
|
||||
const requestId = data.requestId;
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const timer = setTimeout(() => {
|
||||
pendingRequests.delete(requestId);
|
||||
reject(new Error(`${requestId}: 请求超时`));
|
||||
}, timeout);
|
||||
|
||||
pendingRequests.set(requestId, (response) => {
|
||||
clearTimeout(timer);
|
||||
pendingRequests.delete(requestId);
|
||||
resolve(response);
|
||||
});
|
||||
|
||||
this.safeSend(socket, data).catch((err) => {
|
||||
clearTimeout(timer);
|
||||
pendingRequests.delete(requestId);
|
||||
reject(err);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
resolvePendingRequest(requestId: string, data: any): boolean {
|
||||
const callback = pendingRequests.get(requestId);
|
||||
if (callback) {
|
||||
pendingRequests.delete(requestId);
|
||||
callback(data);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
async broadcast(data: any): Promise<void> {
|
||||
const tasks = Array.from(this.clients.values()).map((socket) => {
|
||||
if (socket.readyState === WebSocket.OPEN) {
|
||||
return this.safeSend(socket, data);
|
||||
} else {
|
||||
return Promise.resolve();
|
||||
}
|
||||
});
|
||||
await Promise.all(tasks);
|
||||
}
|
||||
|
||||
private async safeSend(socket: WebSocket, data: any): Promise<boolean> {
|
||||
return new Promise((resolve, reject) => {
|
||||
socket.send(JSON.stringify(data), (err) => {
|
||||
if (err) reject(new Error('发送失败'));
|
||||
else resolve(true);
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
115
src/core/ws/ws.gateway.ts
Normal file
115
src/core/ws/ws.gateway.ts
Normal file
@ -0,0 +1,115 @@
|
||||
import {
|
||||
OnGatewayConnection,
|
||||
OnGatewayDisconnect,
|
||||
WebSocketGateway,
|
||||
WebSocketServer,
|
||||
} from '@nestjs/websockets';
|
||||
import { Inject, Logger } from '@nestjs/common';
|
||||
import { Server, WebSocket } from 'ws';
|
||||
import { WsTools } from './ws.tools';
|
||||
import { WsClientManager } from './ws-client.manager';
|
||||
import { AuthenticatedSocket, AuthMessage, WSMessage } from '../../types/ws';
|
||||
import { AppConfigService } from '../../config/config.service';
|
||||
|
||||
@WebSocketGateway({
|
||||
cors: { origin: '*' },
|
||||
})
|
||||
export class WsGateway implements OnGatewayConnection, OnGatewayDisconnect {
|
||||
private readonly logger = new Logger(WsGateway.name);
|
||||
private readonly secret: string | undefined;
|
||||
|
||||
@WebSocketServer()
|
||||
server: Server;
|
||||
|
||||
constructor(
|
||||
@Inject(AppConfigService)
|
||||
private readonly configService: AppConfigService,
|
||||
private readonly wsClientManager: WsClientManager,
|
||||
) {
|
||||
this.secret = this.configService.get<string>('WS_SECRET');
|
||||
}
|
||||
|
||||
async handleConnection(client: AuthenticatedSocket, req: any) {
|
||||
const ip = req.socket.remoteAddress || 'unknown';
|
||||
this.logger.log(`收到来自 ${ip} 的 WebSocket 连接请求..`);
|
||||
|
||||
client.heartbeat = WsTools.setUpHeartbeat(client);
|
||||
|
||||
client.on('message', async (raw) => {
|
||||
this.logger.debug(`Received raw message from ${ip}: ${raw.toString()}`);
|
||||
|
||||
const msg = WsTools.parseMessage<WSMessage>(raw);
|
||||
if (!msg) return this.handleInvalidMessage(client, ip);
|
||||
|
||||
await this.routeMessage(client, msg, ip);
|
||||
});
|
||||
|
||||
client.on('error', (err) => {
|
||||
this.logger.error(`WS error from ${ip}: ${err.message}`);
|
||||
});
|
||||
}
|
||||
|
||||
async handleDisconnect(client: AuthenticatedSocket) {
|
||||
if (client.heartbeat) clearInterval(client.heartbeat);
|
||||
if (client.clientId) {
|
||||
this.wsClientManager.remove(client.clientId);
|
||||
this.logger.log(`Removed client ${client.clientId} from manager`);
|
||||
}
|
||||
}
|
||||
|
||||
private async handleInvalidMessage(client: WebSocket, ip: string) {
|
||||
this.logger.warn(`Invalid message received from ${ip}`);
|
||||
await WsTools.send(client, {
|
||||
type: 'error',
|
||||
message: 'Invalid message format',
|
||||
});
|
||||
}
|
||||
|
||||
private async routeMessage(
|
||||
client: AuthenticatedSocket,
|
||||
msg: WSMessage,
|
||||
ip: string,
|
||||
) {
|
||||
if (!client.isAuthed) {
|
||||
if (this.isAuthMessage(msg)) {
|
||||
this.logger.log(`Attempting auth from ${ip} as ${msg.clientId}`);
|
||||
await this.handleAuth(client, msg, ip);
|
||||
} else {
|
||||
this.logger.warn(
|
||||
`Received message before auth from ${ip}: ${JSON.stringify(msg)}`,
|
||||
);
|
||||
await this.handleInvalidMessage(client, ip);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.debug(
|
||||
`Routing message from ${client.clientId}: ${JSON.stringify(msg)}`,
|
||||
);
|
||||
// TODO: 注入 handler 服务
|
||||
}
|
||||
|
||||
private isAuthMessage(msg: WSMessage): msg is AuthMessage {
|
||||
return msg.type === 'auth';
|
||||
}
|
||||
|
||||
private async handleAuth(
|
||||
client: AuthenticatedSocket,
|
||||
msg: AuthMessage,
|
||||
ip: string,
|
||||
) {
|
||||
if (msg.secret === this.secret) {
|
||||
client.isAuthed = true;
|
||||
client.clientId = msg.clientId;
|
||||
this.wsClientManager.add(msg.clientId, client);
|
||||
this.logger.log(`Auth success from ${ip}, clientId: ${msg.clientId}`);
|
||||
await WsTools.send(client, { type: 'auth', success: true });
|
||||
} else {
|
||||
this.logger.warn(
|
||||
`Auth failed from ${ip} (invalid secret), clientId: ${msg.clientId}`,
|
||||
);
|
||||
await WsTools.send(client, { type: 'auth', success: false });
|
||||
client.close(4001, 'Authentication failed');
|
||||
}
|
||||
}
|
||||
}
|
11
src/core/ws/ws.module.ts
Normal file
11
src/core/ws/ws.module.ts
Normal file
@ -0,0 +1,11 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
import { WsGateway } from './ws.gateway';
|
||||
import { WsClientManager } from './ws-client.manager';
|
||||
import { AppConfigModule } from '../../config/config.module';
|
||||
|
||||
@Module({
|
||||
imports: [AppConfigModule],
|
||||
providers: [WsGateway, WsClientManager],
|
||||
exports: [WsClientManager],
|
||||
})
|
||||
export class WsModule {}
|
34
src/core/ws/ws.tools.ts
Normal file
34
src/core/ws/ws.tools.ts
Normal file
@ -0,0 +1,34 @@
|
||||
import WebSocket from 'ws';
|
||||
import { Logger } from '@nestjs/common';
|
||||
|
||||
export class WsTools {
|
||||
private static readonly logger = new Logger(WsTools.name);
|
||||
|
||||
static async send(socket: WebSocket, data: unknown): Promise<boolean> {
|
||||
if (socket.readyState !== WebSocket.OPEN) return false;
|
||||
|
||||
return new Promise((resolve) => {
|
||||
socket.send(JSON.stringify(data), (err) => {
|
||||
resolve(!err);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
static parseMessage<T>(data: WebSocket.RawData): T | null {
|
||||
try {
|
||||
return JSON.parse(data.toString()) as T;
|
||||
} catch (err) {
|
||||
this.logger.error(`WS parse error: ${err}`);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
static setUpHeartbeat(socket: WebSocket, interval = 30000): NodeJS.Timeout {
|
||||
const heartbeat = () => {
|
||||
if (socket.readyState === WebSocket.OPEN) {
|
||||
WsTools.send(socket, { type: 'ping' });
|
||||
}
|
||||
};
|
||||
return setInterval(heartbeat, interval);
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user