nestjs 本身已经集成了websocket的支持官方文档也有详细的描述:
官网
但是 他没有告诉你的是,他完全按照socket.io的方式设计的,遵从订阅、事件方式进行了处理,这个很好没有问题.
例如:
@SubscribeMessage('events')
handleEvent(
@MessageBody() data: string,
@ConnectedSocket() client: Socket,
): string {
return data;
}
虽然有适配器说明,但是依然模糊不足以支撑自定义业务,因为要求的message格式必须是如下:
{
"event": "message",
"data": {
"test": 123
}
}
我的需求是对接的文档要求所有的动作都是cmd 因此需要支持的是 websocket方案
{"cmd":"ping","sn":"T78182","timestamp":1730734491}
自定义适配器如下:
ClockWsAdapter
//
import { WsAdapter as OriginWsAdapter } from '@nestjs/platform-ws';
import { MessageMappingProperties } from '@nestjs/websockets';
import { CLOSE_EVENT } from '@nestjs/websockets/constants';
import { get } from 'lodash';
import {
EMPTY,
filter,
first,
fromEvent,
mergeMap,
Observable,
share,
takeUntil,
} from 'rxjs';
enum READY_STATE {
CONNECTING_STATE = 0,
OPEN_STATE = 1,
CLOSING_STATE = 2,
CLOSED_STATE = 3,
}
export class ClockWsAdapter extends OriginWsAdapter {
public bindMessageHandlers(
client: any,
handlers: any,
transform: (data: any) => Observable<any>,
) {
const close$ = fromEvent(client, CLOSE_EVENT).pipe(share(), first());
const source$ = fromEvent(client, 'message').pipe(
mergeMap((data: any) =>
this.bindMessageHandler(data.data, handlers, transform).pipe(
filter((result: any) => result),
),
),
takeUntil(close$),
);
const onMessage = (response: any) => {
if (client.readyState !== READY_STATE.OPEN_STATE) {
return;
}
client.send(response);
};
source$.subscribe(onMessage);
}
bindMessageHandler(
buffer: string,
handlersMap: any,
transform: (data: any) => Observable<any>,
// buffer: string | number | Buffer,
// handlers: MessageMappingProperties[],
// transform: (data: any) => Observable<any>,
) {
try {
const cmdObject = JSON.parse(buffer);
// 对应获取 @SubscribeMessage('ping') name 自定义解析
const handlerName = get(cmdObject, 'cmd', 'message');
// this.logger.debug(cmdObject, handlerName, handlersMap);
const messageHandler = (handlersMap as MessageMappingProperties[]).find(
(p) => p.message === handlerName,
);
const { callback } = messageHandler;
return transform(callback(buffer));
} catch {
return EMPTY;
}
}
}
TimeClockGateway
import { Logger } from '@nestjs/common';
import {
MessageBody,
SubscribeMessage,
WebSocketGateway,
WebSocketServer,
} from '@nestjs/websockets';
import { Server } from 'socket.io';
@WebSocketGateway({
cors: {
origin: '*',
methods: ['GET', 'POST'],
allowedHeaders: ['Content-Type'],
credentials: true,
},
})
export class TimeClockGateway {
logger = new Logger(TimeClockGateway.name);
@WebSocketServer()
server: Server;
//
@SubscribeMessage('declare')
handleDeclare(client: any, payload: any): void {
this.logger.debug(payload);
return;
}
@SubscribeMessage('ping')
handleMessage(client: any, payload: any): string {
// this.logger.debug(payload);
return JSON.stringify({ cmd: 'pong' });
}
@SubscribeMessage('events')
handleEvent(@MessageBody() data: string): string {
// this.logger.debug(data);
return data;
}
}