Skip to main content
Version: 11.x

订阅

介绍

¥Introduction

订阅是客户端和服务器之间的一种实时事件流。当你需要将实时更新推送到客户端时,请使用订阅。

¥Subscriptions are a type of real-time event stream between the client and server. Use subscriptions when you need to push real-time updates to the client.

使用 tRPC 的订阅,客户端可以建立并维持与服务器的持久连接,并且如果在 tracked() 事件的帮助下断开连接,则会自动尝试重新连接并正常恢复。

¥With tRPC's subscriptions, the client establishes and maintains a persistent connection to the server plus automatically attempts to reconnect and recover gracefully if disconnected with the help of tracked() events.

WebSockets 或服务器发送事件?

¥WebSockets or Server-sent Events?

你可以使用 WebSockets 或 服务器发送事件 (SSE) 在 tRPC 中设置实时订阅。

¥You can either use WebSockets or Server-sent Events (SSE) to setup real-time subscriptions in tRPC.

如果你不确定使用哪一个,我们建议使用 SSE 进行订阅,因为它更容易设置并且不需要设置 WebSocket 服务器。

¥If you are unsure which one to use, we recommend using SSE for subscriptions as it's easier to setup and don't require setting up a WebSocket server.

参考项目

¥Reference projects

类型示例类型关联
WebSockets最低限度的 Node.js WebSockets 示例/示例/独立服务器
SSE全栈 SSE 实现github.com/trpc/examples-next-sse-chat
WebSockets全栈 WebSockets 实现github.com/trpc/examples-next-prisma-websockets-starter

基本示例

¥Basic example

提示

有关完整示例,请参阅 我们的全栈 SSE 示例

¥For a full example, see our full-stack SSE example.

server.ts
ts
import { initTRPC } from '@trpc/server';
const t = initTRPC.create();
const ee = new EventEmitter();
export const appRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
// listen for new events
for await (const [data] of on(ee, 'add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the request is aborted
signal: opts.signal,
})) {
const post = data as Post;
yield post;
}
}),
});
server.ts
ts
import { initTRPC } from '@trpc/server';
const t = initTRPC.create();
const ee = new EventEmitter();
export const appRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
// listen for new events
for await (const [data] of on(ee, 'add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the request is aborted
signal: opts.signal,
})) {
const post = data as Post;
yield post;
}
}),
});

¥Automatic tracking of id using tracked() (recommended)

如果你使用我们的 tracked() 助手 yield 事件并包含 id,则客户端将在断开连接时自动重新连接并发送最后一个已知 ID。

¥If you yield an event using our tracked()-helper and include an id, the client will automatically reconnect when it gets disconnected and send the last known ID.

你可以在初始化订阅时发送初始 lastEventId,它将在浏览器接收数据时自动更新。

¥You can send an initial lastEventId when initializing the subscription and it will be automatically updated as the browser receives data.

  • 对于 SSE,这是 EventSource-spec 的一部分,将通过 .input() 中的 lastEventId 传播。

    ¥For SSE, this is part of the EventSource-spec and will be propagated through lastEventId in your .input().

  • 对于 WebSockets,我们的 wsLink 将自动发送最后一个已知 ID 并在浏览器接收数据时更新它。

    ¥For WebSockets, our wsLink will automatically send the last known ID and update it as the browser receives data.

提示

如果你正在基于 lastEventId 获取数据,并且捕获所有事件至关重要,请确保在从数据库获取事件之前设置事件监听器,就像在 我们的全栈 SSE 示例 中所做的那样,这可以防止在基于 lastEventId 产生原始批次时忽略新发出的事件。

¥If you're fetching data based on the lastEventId, and capturing all events is critical, make sure you setup the event listener before fetching events from your database as is done in our full-stack SSE example, this can prevent newly emitted events being ignored while yield'ing the original batch based on lastEventId.

ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { tracked } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z
.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
lastEventId: z.string().nullish(),
})
.optional(),
)
.subscription(async function* (opts) {
// We start by subscribing to the ee so that we don't miss any new events while fetching
const iterable = ee.toIterable('add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the request is aborted
signal: opts.signal,
});
if (opts.input.lastEventId) {
// [...] get the posts since the last event id and yield them
// const items = await db.post.findMany({ ... })
// for (const item of items) {
// yield tracked(item.id, item);
// }
}
// listen for new events
for await (const [data] of on(ee, 'add', {
signal: opts.signal,
})) {
const post = data as Post;
// tracking the post id ensures the client can reconnect at any time and get the latest events this id
yield tracked(post.id, post);
}
}),
});
ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { tracked } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z
.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
lastEventId: z.string().nullish(),
})
.optional(),
)
.subscription(async function* (opts) {
// We start by subscribing to the ee so that we don't miss any new events while fetching
const iterable = ee.toIterable('add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the request is aborted
signal: opts.signal,
});
if (opts.input.lastEventId) {
// [...] get the posts since the last event id and yield them
// const items = await db.post.findMany({ ... })
// for (const item of items) {
// yield tracked(item.id, item);
// }
}
// listen for new events
for await (const [data] of on(ee, 'add', {
signal: opts.signal,
})) {
const post = data as Post;
// tracking the post id ensures the client can reconnect at any time and get the latest events this id
yield tracked(post.id, post);
}
}),
});

停止来自服务器的订阅

¥Stopping a subscription from the server

如果你需要从服务器停止订阅,只需在生成器函数中输入 return 即可。

¥If you need to stop a subscription from the server, simply return in the generator function.

ts
import { publicProcedure, router } from '../trpc';
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z.object({
lastEventId: z.string().coerce.number().min(0).optional(),
}),
)
.subscription(async function* (opts) {
let index = opts.input.lastEventId ?? 0;
while (true) {
const idx = index++;
if (idx > 100) {
// With this, the subscription will stop and the client will disconnect
return;
}
await new Promise((resolve) => setTimeout(resolve, 10));
}
}
}),
});
ts
import { publicProcedure, router } from '../trpc';
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z.object({
lastEventId: z.string().coerce.number().min(0).optional(),
}),
)
.subscription(async function* (opts) {
let index = opts.input.lastEventId ?? 0;
while (true) {
const idx = index++;
if (idx > 100) {
// With this, the subscription will stop and the client will disconnect
return;
}
await new Promise((resolve) => setTimeout(resolve, 10));
}
}
}),
});

在客户端上,你只需 .unsubscribe() 订阅即可。

¥On the client, you just .unsubscribe() the subscription.

清除副作用

¥Cleanup of side effects

如果你需要清除订阅的任何副作用,则可以使用 try...finally 模式,因为当订阅因任何原因停止时,trpc 会调用生成器实例的 .return()

¥If you need to clean up any side-effects of your subscription you can use the try...finally pattern, as trpc invokes the .return() of the Generator Instance when the subscription stops for any reason.

ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
let timeout;
try {
for await (const [data] of on(ee, 'add', {
signal: opts.signal,
})) {
timeout = setTimeout(() => console.log('Pretend like this is useful'));
const post = data as Post;
yield post;
}
} finally {
if (timeout) clearTimeout(timeout);
}
}),
});
ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
let timeout;
try {
for await (const [data] of on(ee, 'add', {
signal: opts.signal,
})) {
timeout = setTimeout(() => console.log('Pretend like this is useful'));
const post = data as Post;
yield post;
}
} finally {
if (timeout) clearTimeout(timeout);
}
}),
});

错误处理

¥Error handling

在生成器函数中抛出错误会传播到后端的 trpconError()

¥Throwing an error in a generator function propagates to trpc's onError() on the backend.

如果抛出的错误是 5xx 错误,客户端将根据最后一个事件 ID 即 使用 tracked() 跟踪 自动尝试重新连接。对于其他错误,订阅将被取消并传播到 onError() 回调。

¥If the error thrown is a 5xx error, the client will automatically attempt to reconnect based on the last event id that is tracked using tracked(). For other errors, the subscription will be cancelled and propagate to the onError() callback.

输出验证

¥Output validation

由于订阅是异步迭代器,因此你必须通过迭代器来验证输出。

¥Since subscriptions are async iterators, you have to go through the iterator to validate the output.

zod 示例

¥Example with zod

zAsyncIterable.ts
ts
import type { TrackedEnvelope } from '@trpc/server';
import { isTrackedEnvelope, tracked } from '@trpc/server';
import { z } from 'zod';
function isAsyncIterable<TValue, TReturn = unknown>(
value: unknown,
): value is AsyncIterable<TValue, TReturn> {
return !!value && typeof value === 'object' && Symbol.asyncIterator in value;
}
const trackedEnvelopeSchema =
z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);
/**
* A Zod schema helper designed specifically for validating async iterables. This schema ensures that:
* 1. The value being validated is an async iterable.
* 2. Each item yielded by the async iterable conforms to a specified type.
* 3. The return value of the async iterable, if any, also conforms to a specified type.
*/
export function zAsyncIterable<
TYieldIn,
TYieldOut,
TReturnIn = void,
TReturnOut = void,
Tracked extends boolean = false,
>(opts: {
/**
* Validate the value yielded by the async generator
*/
yield: z.ZodType<TYieldIn, any, TYieldOut>;
/**
* Validate the return value of the async generator
* @remark not applicable for subscriptions
*/
return?: z.ZodType<TReturnIn, any, TReturnOut>;
/**
* Whether if the yielded values are tracked
* @remark only applicable for subscriptions
*/
tracked?: Tracked;
}) {
return z
.custom<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn
>
>((val) => isAsyncIterable(val))
.transform(async function* (iter) {
const iterator = iter[Symbol.asyncIterator]();
try {
let next;
while ((next = await iterator.next()) && !next.done) {
if (opts.tracked) {
const [id, data] = trackedEnvelopeSchema.parse(next.value);
yield tracked(id, await opts.yield.parseAsync(data));
continue;
}
yield opts.yield.parseAsync(next.value);
}
if (opts.return) {
return await opts.return.parseAsync(next.value);
}
return;
} finally {
await iterator.return?.();
}
}) as z.ZodType<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn,
unknown
>,
any,
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,
TReturnOut,
unknown
>
>;
}
zAsyncIterable.ts
ts
import type { TrackedEnvelope } from '@trpc/server';
import { isTrackedEnvelope, tracked } from '@trpc/server';
import { z } from 'zod';
function isAsyncIterable<TValue, TReturn = unknown>(
value: unknown,
): value is AsyncIterable<TValue, TReturn> {
return !!value && typeof value === 'object' && Symbol.asyncIterator in value;
}
const trackedEnvelopeSchema =
z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);
/**
* A Zod schema helper designed specifically for validating async iterables. This schema ensures that:
* 1. The value being validated is an async iterable.
* 2. Each item yielded by the async iterable conforms to a specified type.
* 3. The return value of the async iterable, if any, also conforms to a specified type.
*/
export function zAsyncIterable<
TYieldIn,
TYieldOut,
TReturnIn = void,
TReturnOut = void,
Tracked extends boolean = false,
>(opts: {
/**
* Validate the value yielded by the async generator
*/
yield: z.ZodType<TYieldIn, any, TYieldOut>;
/**
* Validate the return value of the async generator
* @remark not applicable for subscriptions
*/
return?: z.ZodType<TReturnIn, any, TReturnOut>;
/**
* Whether if the yielded values are tracked
* @remark only applicable for subscriptions
*/
tracked?: Tracked;
}) {
return z
.custom<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn
>
>((val) => isAsyncIterable(val))
.transform(async function* (iter) {
const iterator = iter[Symbol.asyncIterator]();
try {
let next;
while ((next = await iterator.next()) && !next.done) {
if (opts.tracked) {
const [id, data] = trackedEnvelopeSchema.parse(next.value);
yield tracked(id, await opts.yield.parseAsync(data));
continue;
}
yield opts.yield.parseAsync(next.value);
}
if (opts.return) {
return await opts.return.parseAsync(next.value);
}
return;
} finally {
await iterator.return?.();
}
}) as z.ZodType<
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,
TReturnIn,
unknown
>,
any,
AsyncIterable<
Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,
TReturnOut,
unknown
>
>;
}

现在你可以使用此助手来验证订阅过程的输出:

¥Now you can use this helper to validate the output of your subscription procedures:

_app.ts
ts
import { publicProcedure, router } from '../trpc';
import { zAsyncIterable } from './zAsyncIterable';
export const appRouter = router({
mySubscription: publicProcedure
.input(
z.object({
lastEventId: z.coerce.number().min(0).optional(),
}),
)
.output(
zAsyncIterable({
yield: z.object({
count: z.number(),
}),
tracked: true,
}),
)
.subscription(async function* (opts) {
let index = opts.input.lastEventId ?? 0;
while (true) {
index++;
yield tracked(index, {
count: index,
});
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}),
});
_app.ts
ts
import { publicProcedure, router } from '../trpc';
import { zAsyncIterable } from './zAsyncIterable';
export const appRouter = router({
mySubscription: publicProcedure
.input(
z.object({
lastEventId: z.coerce.number().min(0).optional(),
}),
)
.output(
zAsyncIterable({
yield: z.object({
count: z.number(),
}),
tracked: true,
}),
)
.subscription(async function* (opts) {
let index = opts.input.lastEventId ?? 0;
while (true) {
index++;
yield tracked(index, {
count: index,
});
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}),
});