Skip to main content
Version: 11.x

WebSockets

你可以将 WebSockets 用于与服务器的全部或部分通信,请参阅 wsLink 了解如何在客户端上进行设置。

¥You can use WebSockets for all or some of the communication with your server, see wsLink for how to set it up on the client.

提示

此处的文档概述了使用 WebSockets 的具体细节。有关订阅的一般用法,请参阅 我们的订阅指南

¥The document here outlines the specific details of using WebSockets. For general usage of subscriptions, see our subscriptions guide.

创建 WebSocket 服务器

¥Creating a WebSocket-server

bash
yarn add ws
bash
yarn add ws
server/wsServer.ts
ts
import { applyWSSHandler } from '@trpc/server/adapters/ws';
import ws from 'ws';
import { appRouter } from './routers/app';
import { createContext } from './trpc';
const wss = new ws.Server({
port: 3001,
});
const handler = applyWSSHandler({
wss,
router: appRouter,
createContext,
// Enable heartbeat messages to keep connection open (disabled by default)
keepAlive: {
enabled: true,
// server ping message interval in milliseconds
pingMs: 30000,
// connection is terminated if pong message is not received in this many milliseconds
pongWaitMs: 5000,
},
});
wss.on('connection', (ws) => {
console.log(`➕➕ Connection (${wss.clients.size})`);
ws.once('close', () => {
console.log(`➖➖ Connection (${wss.clients.size})`);
});
});
console.log('✅ WebSocket Server listening on ws://localhost:3001');
process.on('SIGTERM', () => {
console.log('SIGTERM');
handler.broadcastReconnectNotification();
wss.close();
});
server/wsServer.ts
ts
import { applyWSSHandler } from '@trpc/server/adapters/ws';
import ws from 'ws';
import { appRouter } from './routers/app';
import { createContext } from './trpc';
const wss = new ws.Server({
port: 3001,
});
const handler = applyWSSHandler({
wss,
router: appRouter,
createContext,
// Enable heartbeat messages to keep connection open (disabled by default)
keepAlive: {
enabled: true,
// server ping message interval in milliseconds
pingMs: 30000,
// connection is terminated if pong message is not received in this many milliseconds
pongWaitMs: 5000,
},
});
wss.on('connection', (ws) => {
console.log(`➕➕ Connection (${wss.clients.size})`);
ws.once('close', () => {
console.log(`➖➖ Connection (${wss.clients.size})`);
});
});
console.log('✅ WebSocket Server listening on ws://localhost:3001');
process.on('SIGTERM', () => {
console.log('SIGTERM');
handler.broadcastReconnectNotification();
wss.close();
});

TRPCClient 设置为使用 WebSocket

¥Setting TRPCClient to use WebSockets

提示

你可以使用 链接 通过 WebSocket 将查询和/或突变路由到 HTTP 传输和订阅。

¥You can use Links to route queries and/or mutations to HTTP transport and subscriptions over WebSockets.

client.ts
tsx
import { createTRPCClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from '../path/to/server/trpc';
// create persistent WebSocket connection
const wsClient = createWSClient({
url: `ws://localhost:3001`,
});
// configure TRPCClient to use WebSockets transport
const client = createTRPCClient<AppRouter>({
links: [
wsLink({
client: wsClient,
}),
],
});
client.ts
tsx
import { createTRPCClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from '../path/to/server/trpc';
// create persistent WebSocket connection
const wsClient = createWSClient({
url: `ws://localhost:3001`,
});
// configure TRPCClient to use WebSockets transport
const client = createTRPCClient<AppRouter>({
links: [
wsLink({
client: wsClient,
}),
],
});

身份验证/连接参数

¥Authentication / connection params

提示

如果你正在运行 Web 应用,则可以忽略此部分,因为 cookie 是作为请求的一部分发送的。

¥If you're doing a web application, you can ignore this section as the cookies are sent as part of the request.

为了使用 WebSockets 进行身份验证,你可以将 connectionParams 定义为 createWSClient。当客户端建立 WebSocket 连接时,这将作为第一条消息发送。

¥In order to authenticate with WebSockets, you can define connectionParams to createWSClient. This will be sent as the first message when the client establishes a WebSocket connection.

server/context.ts
ts
import type { CreateWSSContextFnOptions } from '@trpc/server/adapters/ws';
 
export const createContext = async (opts: CreateWSSContextFnOptions) => {
const token = opts.info.connectionParams?.token;
const token: string | undefined
 
// [... authenticate]
 
return {};
};
 
export type Context = Awaited<ReturnType<typeof createContext>>;
server/context.ts
ts
import type { CreateWSSContextFnOptions } from '@trpc/server/adapters/ws';
 
export const createContext = async (opts: CreateWSSContextFnOptions) => {
const token = opts.info.connectionParams?.token;
const token: string | undefined
 
// [... authenticate]
 
return {};
};
 
export type Context = Awaited<ReturnType<typeof createContext>>;
client/trpc.ts
ts
import { createTRPCClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from '~/server/routers/_app';
const wsClient = createWSClient({
url: `ws://localhost:3000`,
connectionParams: async () => {
return {
token: 'supersecret',
};
},
});
export const trpc = createTRPCClient<AppRouter>({
links: [wsLink({ client: wsClient, transformer: superjson })],
});
client/trpc.ts
ts
import { createTRPCClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from '~/server/routers/_app';
const wsClient = createWSClient({
url: `ws://localhost:3000`,
connectionParams: async () => {
return {
token: 'supersecret',
};
},
});
export const trpc = createTRPCClient<AppRouter>({
links: [wsLink({ client: wsClient, transformer: superjson })],
});

¥Automatic tracking of id using tracked() (recommended)

如果你使用我们的 tracked() 助手 yield 事件并包含 id,则客户端将在断开连接时自动重新连接,并在重新连接时发送最后一个已知 ID 作为 lastEventId 输入的一部分。

¥If you yield an event using our tracked()-helper and include an id, the client will automatically reconnect when it gets disconnected and send the last known ID when reconnecting as part of the lastEventId-input.

你可以在初始化订阅时发送初始 lastEventId,它将在浏览器接收数据时自动更新。

¥You can send an initial lastEventId when initializing the subscription and it will be automatically updated as the browser receives data.

信息

如果你基于 lastEventId 获取数据,并且捕获所有事件至关重要,则可能需要使用 ReadableStream 或类似模式作为中介,就像在 我们的全栈 SSE 示例 中所做的那样,以防止在基于 lastEventId 产生原始批次时忽略新发出的事件。

¥If you're fetching data based on the lastEventId, and capturing all events is critical, you may want to use ReadableStream's or a similar pattern as an intermediary as is done in our full-stack SSE example to prevent newly emitted events being ignored while yield'ing the original batch based on lastEventId.

ts
import EventEmitter, { on } from 'events';
import { tracked } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z
.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
lastEventId: z.string().nullish(),
})
.optional(),
)
.subscription(async function* (opts) {
if (opts.input.lastEventId) {
// [...] get the posts since the last event id and yield them
}
// listen for new events
for await (const [data] of on(ee, 'add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the subscription is aborted
signal: opts.signal,
})) {
const post = data as Post;
// tracking the post id ensures the client can reconnect at any time and get the latest events this id
yield tracked(post.id, post);
}
}),
});
ts
import EventEmitter, { on } from 'events';
import { tracked } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z
.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
lastEventId: z.string().nullish(),
})
.optional(),
)
.subscription(async function* (opts) {
if (opts.input.lastEventId) {
// [...] get the posts since the last event id and yield them
}
// listen for new events
for await (const [data] of on(ee, 'add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the subscription is aborted
signal: opts.signal,
})) {
const post = data as Post;
// tracking the post id ensures the client can reconnect at any time and get the latest events this id
yield tracked(post.id, post);
}
}),
});

WebSockets RPC 规范

¥WebSockets RPC Specification

你可以通过深入研究 TypeScript 定义来阅读更多详细信息:

¥You can read more details by drilling into the TypeScript definitions:

query/mutation

请求

¥Request

ts
{
id: number | string;
jsonrpc?: '2.0'; // optional
method: 'query' | 'mutation';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}
ts
{
id: number | string;
jsonrpc?: '2.0'; // optional
method: 'query' | 'mutation';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}

响应

¥Response

...下面,或者出现错误。

¥... below, or an error.

ts
{
id: number | string;
jsonrpc?: '2.0'; // only defined if included in request
result: {
type: 'data'; // always 'data' for mutation / queries
data: TOutput; // output from procedure
}
}
ts
{
id: number | string;
jsonrpc?: '2.0'; // only defined if included in request
result: {
type: 'data'; // always 'data' for mutation / queries
data: TOutput; // output from procedure
}
}

subscription/subscription.stop

开始订阅

¥Start a subscription

ts
{
id: number | string;
jsonrpc?: '2.0';
method: 'subscription';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}
ts
{
id: number | string;
jsonrpc?: '2.0';
method: 'subscription';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}

如需取消订阅,请致电 subscription.stop

¥To cancel a subscription, call subscription.stop

ts
{
id: number | string; // <-- id of your created subscription
jsonrpc?: '2.0';
method: 'subscription.stop';
}
ts
{
id: number | string; // <-- id of your created subscription
jsonrpc?: '2.0';
method: 'subscription.stop';
}

订阅响应形状

¥Subscription response shape

...下面,或者出现错误。

¥... below, or an error.

ts
{
id: number | string;
jsonrpc?: '2.0';
result: (
| {
type: 'data';
data: TData; // subscription emitted data
}
| {
type: 'started'; // subscription started
}
| {
type: 'stopped'; // subscription stopped
}
)
}
ts
{
id: number | string;
jsonrpc?: '2.0';
result: (
| {
type: 'data';
data: TData; // subscription emitted data
}
| {
type: 'started'; // subscription started
}
| {
type: 'stopped'; // subscription stopped
}
)
}

连接参数

¥Connection params

如果使用 ?connectionParams=1 初始化连接,则第一条消息必须是连接参数。

¥If the connection is initialized with ?connectionParams=1, the first message has to be connection params.

ts
{
data: Record<string, string> | null;
method: 'connectionParams';
}
ts
{
data: Record<string, string> | null;
method: 'connectionParams';
}

错误

¥Errors

参见 https://www.jsonrpc.org/specification#error_object格式错误

¥See https://www.jsonrpc.org/specification#error_object or Error Formatting.

从服务器到客户端的通知

¥Notifications from Server to Client

{ id: null, type: 'reconnect' }

告诉客户端在关闭服务器之前重新连接。由 wssHandler.broadcastReconnectNotification() 调用。

¥Tells clients to reconnect before shutting down the server. Invoked by wssHandler.broadcastReconnectNotification().