Skip to main content
Version: 10.x

订阅/WebSocket

使用订阅

¥Using Subscriptions

提示

添加订阅流程

¥Adding a subscription procedure

server/router.ts
tsx
import { EventEmitter } from 'events';
import { initTRPC } from '@trpc/server';
import { observable } from '@trpc/server/observable';
import { z } from 'zod';
// create a global event emitter (could be replaced by redis, etc)
const ee = new EventEmitter();
const t = initTRPC.create();
export const appRouter = t.router({
onAdd: t.procedure.subscription(() => {
// return an `observable` with a callback which is triggered immediately
return observable<Post>((emit) => {
const onAdd = (data: Post) => {
// emit data to client
emit.next(data);
};
// trigger `onAdd()` when `add` is triggered in our event emitter
ee.on('add', onAdd);
// unsubscribe function when client disconnects or stops subscribing
return () => {
ee.off('add', onAdd);
};
});
}),
add: t.procedure
.input(
z.object({
id: z.string().uuid().optional(),
text: z.string().min(1),
}),
)
.mutation(async (opts) => {
const post = { ...opts.input }; /* [..] add to db */
ee.emit('add', post);
return post;
}),
});
server/router.ts
tsx
import { EventEmitter } from 'events';
import { initTRPC } from '@trpc/server';
import { observable } from '@trpc/server/observable';
import { z } from 'zod';
// create a global event emitter (could be replaced by redis, etc)
const ee = new EventEmitter();
const t = initTRPC.create();
export const appRouter = t.router({
onAdd: t.procedure.subscription(() => {
// return an `observable` with a callback which is triggered immediately
return observable<Post>((emit) => {
const onAdd = (data: Post) => {
// emit data to client
emit.next(data);
};
// trigger `onAdd()` when `add` is triggered in our event emitter
ee.on('add', onAdd);
// unsubscribe function when client disconnects or stops subscribing
return () => {
ee.off('add', onAdd);
};
});
}),
add: t.procedure
.input(
z.object({
id: z.string().uuid().optional(),
text: z.string().min(1),
}),
)
.mutation(async (opts) => {
const post = { ...opts.input }; /* [..] add to db */
ee.emit('add', post);
return post;
}),
});

创建 WebSocket 服务器

¥Creating a WebSocket-server

bash
yarn add ws
bash
yarn add ws
server/wsServer.ts
ts
import { applyWSSHandler } from '@trpc/server/adapters/ws';
import ws from 'ws';
import { appRouter } from './routers/app';
import { createContext } from './trpc';
const wss = new ws.Server({
port: 3001,
});
const handler = applyWSSHandler({ wss, router: appRouter, createContext });
wss.on('connection', (ws) => {
console.log(`➕➕ Connection (${wss.clients.size})`);
ws.once('close', () => {
console.log(`➖➖ Connection (${wss.clients.size})`);
});
});
console.log('✅ WebSocket Server listening on ws://localhost:3001');
process.on('SIGTERM', () => {
console.log('SIGTERM');
handler.broadcastReconnectNotification();
wss.close();
});
server/wsServer.ts
ts
import { applyWSSHandler } from '@trpc/server/adapters/ws';
import ws from 'ws';
import { appRouter } from './routers/app';
import { createContext } from './trpc';
const wss = new ws.Server({
port: 3001,
});
const handler = applyWSSHandler({ wss, router: appRouter, createContext });
wss.on('connection', (ws) => {
console.log(`➕➕ Connection (${wss.clients.size})`);
ws.once('close', () => {
console.log(`➖➖ Connection (${wss.clients.size})`);
});
});
console.log('✅ WebSocket Server listening on ws://localhost:3001');
process.on('SIGTERM', () => {
console.log('SIGTERM');
handler.broadcastReconnectNotification();
wss.close();
});

TRPCClient 设置为使用 WebSocket

¥Setting TRPCClient to use WebSockets

提示

你可以使用 链接 通过 WebSocket 将查询和/或突变路由到 HTTP 传输和订阅。

¥You can use Links to route queries and/or mutations to HTTP transport and subscriptions over WebSockets.

client.ts
tsx
import { createTRPCProxyClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from '../path/to/server/trpc';
// create persistent WebSocket connection
const wsClient = createWSClient({
url: `ws://localhost:3001`,
});
// configure TRPCClient to use WebSockets transport
const client = createTRPCProxyClient<AppRouter>({
links: [
wsLink({
client: wsClient,
}),
],
});
client.ts
tsx
import { createTRPCProxyClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from '../path/to/server/trpc';
// create persistent WebSocket connection
const wsClient = createWSClient({
url: `ws://localhost:3001`,
});
// configure TRPCClient to use WebSockets transport
const client = createTRPCProxyClient<AppRouter>({
links: [
wsLink({
client: wsClient,
}),
],
});

使用 React

¥Using React

参见 /examples/next-prisma-starter-websockets

¥See /examples/next-prisma-starter-websockets.

WebSockets RPC 规范

¥WebSockets RPC Specification

你可以通过深入研究 TypeScript 定义来阅读更多详细信息:

¥You can read more details by drilling into the TypeScript definitions:

query/mutation

请求

¥Request

ts
{
id: number | string;
jsonrpc?: '2.0'; // optional
method: 'query' | 'mutation';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}
ts
{
id: number | string;
jsonrpc?: '2.0'; // optional
method: 'query' | 'mutation';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}

响应

¥Response

...下面,或者出现错误。

¥... below, or an error.

ts
{
id: number | string;
jsonrpc?: '2.0'; // only defined if included in request
result: {
type: 'data'; // always 'data' for mutation / queries
data: TOutput; // output from procedure
}
}
ts
{
id: number | string;
jsonrpc?: '2.0'; // only defined if included in request
result: {
type: 'data'; // always 'data' for mutation / queries
data: TOutput; // output from procedure
}
}

subscription/subscription.stop

开始订阅

¥Start a subscription

ts
{
id: number | string;
jsonrpc?: '2.0';
method: 'subscription';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}
ts
{
id: number | string;
jsonrpc?: '2.0';
method: 'subscription';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}

如需取消订阅,请致电 subscription.stop

¥To cancel a subscription, call subscription.stop

ts
{
id: number | string; // <-- id of your created subscription
jsonrpc?: '2.0';
method: 'subscription.stop';
}
ts
{
id: number | string; // <-- id of your created subscription
jsonrpc?: '2.0';
method: 'subscription.stop';
}

订阅响应形状

¥Subscription response shape

...下面,或者出现错误。

¥... below, or an error.

ts
{
id: number | string;
jsonrpc?: '2.0';
result: (
| {
type: 'data';
data: TData; // subscription emitted data
}
| {
type: 'started'; // subscription started
}
| {
type: 'stopped'; // subscription stopped
}
)
}
ts
{
id: number | string;
jsonrpc?: '2.0';
result: (
| {
type: 'data';
data: TData; // subscription emitted data
}
| {
type: 'started'; // subscription started
}
| {
type: 'stopped'; // subscription stopped
}
)
}

错误

¥Errors

参见 https://www.jsonrpc.org/specification#error_object格式错误

¥See https://www.jsonrpc.org/specification#error_object or Error Formatting.

从服务器到客户端的通知

¥Notifications from Server to Client

{ id: null, type: 'reconnect' }

告诉客户端在关闭服务器之前重新连接。由 wssHandler.broadcastReconnectNotification() 调用。

¥Tells clients to reconnect before shutting down the server. Invoked by wssHandler.broadcastReconnectNotification().