订阅
介绍
¥Introduction
订阅是客户端和服务器之间的一种实时事件流。当你需要将实时更新推送到客户端时,请使用订阅。
¥Subscriptions are a type of real-time event stream between the client and server. Use subscriptions when you need to push real-time updates to the client.
使用 tRPC 的订阅,客户端可以建立并维持与服务器的持久连接,并且如果在 tracked()
事件的帮助下断开连接,则会自动尝试重新连接并正常恢复。
¥With tRPC's subscriptions, the client establishes and maintains a persistent connection to the server plus automatically attempts to reconnect and recover gracefully if disconnected with the help of tracked()
events.
WebSockets 或服务器发送事件?
¥WebSockets or Server-sent Events?
你可以使用 WebSockets 或 服务器发送事件 (SSE) 在 tRPC 中设置实时订阅。
¥You can either use WebSockets or Server-sent Events (SSE) to setup real-time subscriptions in tRPC.
-
有关 WebSockets,请参阅 WebSockets 页面
¥For WebSockets, see the WebSockets page
-
对于 SSE,请参阅 httpSubscriptionLink
¥For SSE, see the httpSubscriptionLink
如果你不确定使用哪一个,我们建议使用 SSE 进行订阅,因为它更容易设置并且不需要设置 WebSocket 服务器。
¥If you are unsure which one to use, we recommend using SSE for subscriptions as it's easier to setup and don't require setting up a WebSocket server.
参考项目
¥Reference projects
类型 | 示例类型 | 关联 |
---|---|---|
WebSockets | 最低限度的 Node.js WebSockets 示例 | /示例/独立服务器 |
SSE | 全栈 SSE 实现 | github.com/trpc/examples-next-sse-chat |
WebSockets | 全栈 WebSockets 实现 | github.com/trpc/examples-next-prisma-websockets-starter |
基本示例
¥Basic example
有关完整示例,请参阅 我们的全栈 SSE 示例。
¥For a full example, see our full-stack SSE example.
server.tsts
import { initTRPC } from '@trpc/server';const t = initTRPC.create();const ee = new EventEmitter();export const appRouter = router({onPostAdd: publicProcedure.subscription(async function* (opts) {// listen for new eventsfor await (const [data] of on(ee, 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal: opts.signal,})) {const post = data as Post;yield post;}}),});
server.tsts
import { initTRPC } from '@trpc/server';const t = initTRPC.create();const ee = new EventEmitter();export const appRouter = router({onPostAdd: publicProcedure.subscription(async function* (opts) {// listen for new eventsfor await (const [data] of on(ee, 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal: opts.signal,})) {const post = data as Post;yield post;}}),});
使用 tracked()
自动跟踪 id(推荐)
¥Automatic tracking of id using tracked()
(recommended)
如果你使用我们的 tracked()
助手 yield
事件并包含 id
,则客户端将在断开连接时自动重新连接并发送最后一个已知 ID。
¥If you yield
an event using our tracked()
-helper and include an id
, the client will automatically reconnect when it gets disconnected and send the last known ID.
你可以在初始化订阅时发送初始 lastEventId
,它将在浏览器接收数据时自动更新。
¥You can send an initial lastEventId
when initializing the subscription and it will be automatically updated as the browser receives data.
-
对于 SSE,这是
EventSource
-spec 的一部分,将通过.input()
中的lastEventId
传播。¥For SSE, this is part of the
EventSource
-spec and will be propagated throughlastEventId
in your.input()
. -
对于 WebSockets,我们的
wsLink
将自动发送最后一个已知 ID 并在浏览器接收数据时更新它。¥For WebSockets, our
wsLink
will automatically send the last known ID and update it as the browser receives data.
如果你正在基于 lastEventId
获取数据,并且捕获所有事件至关重要,请确保在从数据库获取事件之前设置事件监听器,就像在 我们的全栈 SSE 示例 中所做的那样,这可以防止在基于 lastEventId
产生原始批次时忽略新发出的事件。
¥If you're fetching data based on the lastEventId
, and capturing all events is critical, make sure you setup the event listener before fetching events from your database as is done in our full-stack SSE example, this can prevent newly emitted events being ignored while yield'ing the original batch based on lastEventId
.
ts
import EventEmitter, { on } from 'events';import type { Post } from '@prisma/client';import { tracked } from '@trpc/server';import { z } from 'zod';import { publicProcedure, router } from '../trpc';const ee = new EventEmitter();export const subRouter = router({onPostAdd: publicProcedure.input(z.object({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId: z.string().nullish(),}).optional(),).subscription(async function* (opts) {// We start by subscribing to the ee so that we don't miss any new events while fetchingconst iterable = ee.toIterable('add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal: opts.signal,});if (opts.input.lastEventId) {// [...] get the posts since the last event id and yield them// const items = await db.post.findMany({ ... })// for (const item of items) {// yield tracked(item.id, item);// }}// listen for new eventsfor await (const [data] of on(ee, 'add', {signal: opts.signal,})) {const post = data as Post;// tracking the post id ensures the client can reconnect at any time and get the latest events this idyield tracked(post.id, post);}}),});
ts
import EventEmitter, { on } from 'events';import type { Post } from '@prisma/client';import { tracked } from '@trpc/server';import { z } from 'zod';import { publicProcedure, router } from '../trpc';const ee = new EventEmitter();export const subRouter = router({onPostAdd: publicProcedure.input(z.object({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId: z.string().nullish(),}).optional(),).subscription(async function* (opts) {// We start by subscribing to the ee so that we don't miss any new events while fetchingconst iterable = ee.toIterable('add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal: opts.signal,});if (opts.input.lastEventId) {// [...] get the posts since the last event id and yield them// const items = await db.post.findMany({ ... })// for (const item of items) {// yield tracked(item.id, item);// }}// listen for new eventsfor await (const [data] of on(ee, 'add', {signal: opts.signal,})) {const post = data as Post;// tracking the post id ensures the client can reconnect at any time and get the latest events this idyield tracked(post.id, post);}}),});
停止来自服务器的订阅
¥Stopping a subscription from the server
如果你需要从服务器停止订阅,只需在生成器函数中输入 return
即可。
¥If you need to stop a subscription from the server, simply return
in the generator function.
ts
import { publicProcedure, router } from '../trpc';export const subRouter = router({onPostAdd: publicProcedure.input(z.object({lastEventId: z.string().coerce.number().min(0).optional(),}),).subscription(async function* (opts) {let index = opts.input.lastEventId ?? 0;while (true) {const idx = index++;if (idx > 100) {// With this, the subscription will stop and the client will disconnectreturn;}await new Promise((resolve) => setTimeout(resolve, 10));}}}),});
ts
import { publicProcedure, router } from '../trpc';export const subRouter = router({onPostAdd: publicProcedure.input(z.object({lastEventId: z.string().coerce.number().min(0).optional(),}),).subscription(async function* (opts) {let index = opts.input.lastEventId ?? 0;while (true) {const idx = index++;if (idx > 100) {// With this, the subscription will stop and the client will disconnectreturn;}await new Promise((resolve) => setTimeout(resolve, 10));}}}),});
在客户端上,你只需 .unsubscribe()
订阅即可。
¥On the client, you just .unsubscribe()
the subscription.
清除副作用
¥Cleanup of side effects
如果你需要清除订阅的任何副作用,则可以使用 try...finally
模式,因为当订阅因任何原因停止时,trpc
会调用生成器实例的 .return()
。
¥If you need to clean up any side-effects of your subscription you can use the try...finally
pattern, as trpc
invokes the .return()
of the Generator Instance when the subscription stops for any reason.
ts
import EventEmitter, { on } from 'events';import type { Post } from '@prisma/client';import { z } from 'zod';import { publicProcedure, router } from '../trpc';const ee = new EventEmitter();export const subRouter = router({onPostAdd: publicProcedure.subscription(async function* (opts) {let timeout;try {for await (const [data] of on(ee, 'add', {signal: opts.signal,})) {timeout = setTimeout(() => console.log('Pretend like this is useful'));const post = data as Post;yield post;}} finally {if (timeout) clearTimeout(timeout);}}),});
ts
import EventEmitter, { on } from 'events';import type { Post } from '@prisma/client';import { z } from 'zod';import { publicProcedure, router } from '../trpc';const ee = new EventEmitter();export const subRouter = router({onPostAdd: publicProcedure.subscription(async function* (opts) {let timeout;try {for await (const [data] of on(ee, 'add', {signal: opts.signal,})) {timeout = setTimeout(() => console.log('Pretend like this is useful'));const post = data as Post;yield post;}} finally {if (timeout) clearTimeout(timeout);}}),});
错误处理
¥Error handling
在生成器函数中抛出错误会传播到后端的 trpc
的 onError()
。
¥Throwing an error in a generator function propagates to trpc
's onError()
on the backend.
如果抛出的错误是 5xx 错误,客户端将根据最后一个事件 ID 即 使用 tracked()
跟踪 自动尝试重新连接。对于其他错误,订阅将被取消并传播到 onError()
回调。
¥If the error thrown is a 5xx error, the client will automatically attempt to reconnect based on the last event id that is tracked using tracked()
. For other errors, the subscription will be cancelled and propagate to the onError()
callback.
输出验证
¥Output validation
由于订阅是异步迭代器,因此你必须通过迭代器来验证输出。
¥Since subscriptions are async iterators, you have to go through the iterator to validate the output.
zod 示例
¥Example with zod
zAsyncIterable.tsts
import type { TrackedEnvelope } from '@trpc/server';import { isTrackedEnvelope, tracked } from '@trpc/server';import { z } from 'zod';function isAsyncIterable<TValue, TReturn = unknown>(value: unknown,): value is AsyncIterable<TValue, TReturn> {return !!value && typeof value === 'object' && Symbol.asyncIterator in value;}const trackedEnvelopeSchema =z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);/*** A Zod schema helper designed specifically for validating async iterables. This schema ensures that:* 1. The value being validated is an async iterable.* 2. Each item yielded by the async iterable conforms to a specified type.* 3. The return value of the async iterable, if any, also conforms to a specified type.*/export function zAsyncIterable<TYieldIn,TYieldOut,TReturnIn = void,TReturnOut = void,Tracked extends boolean = false,>(opts: {/*** Validate the value yielded by the async generator*/yield: z.ZodType<TYieldIn, any, TYieldOut>;/*** Validate the return value of the async generator* @remark not applicable for subscriptions*/return?: z.ZodType<TReturnIn, any, TReturnOut>;/*** Whether if the yielded values are tracked* @remark only applicable for subscriptions*/tracked?: Tracked;}) {return z.custom<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn>>((val) => isAsyncIterable(val)).transform(async function* (iter) {const iterator = iter[Symbol.asyncIterator]();try {let next;while ((next = await iterator.next()) && !next.done) {if (opts.tracked) {const [id, data] = trackedEnvelopeSchema.parse(next.value);yield tracked(id, await opts.yield.parseAsync(data));continue;}yield opts.yield.parseAsync(next.value);}if (opts.return) {return await opts.return.parseAsync(next.value);}return;} finally {await iterator.return?.();}}) as z.ZodType<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn,unknown>,any,AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,TReturnOut,unknown>>;}
zAsyncIterable.tsts
import type { TrackedEnvelope } from '@trpc/server';import { isTrackedEnvelope, tracked } from '@trpc/server';import { z } from 'zod';function isAsyncIterable<TValue, TReturn = unknown>(value: unknown,): value is AsyncIterable<TValue, TReturn> {return !!value && typeof value === 'object' && Symbol.asyncIterator in value;}const trackedEnvelopeSchema =z.custom<TrackedEnvelope<unknown>>(isTrackedEnvelope);/*** A Zod schema helper designed specifically for validating async iterables. This schema ensures that:* 1. The value being validated is an async iterable.* 2. Each item yielded by the async iterable conforms to a specified type.* 3. The return value of the async iterable, if any, also conforms to a specified type.*/export function zAsyncIterable<TYieldIn,TYieldOut,TReturnIn = void,TReturnOut = void,Tracked extends boolean = false,>(opts: {/*** Validate the value yielded by the async generator*/yield: z.ZodType<TYieldIn, any, TYieldOut>;/*** Validate the return value of the async generator* @remark not applicable for subscriptions*/return?: z.ZodType<TReturnIn, any, TReturnOut>;/*** Whether if the yielded values are tracked* @remark only applicable for subscriptions*/tracked?: Tracked;}) {return z.custom<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn>>((val) => isAsyncIterable(val)).transform(async function* (iter) {const iterator = iter[Symbol.asyncIterator]();try {let next;while ((next = await iterator.next()) && !next.done) {if (opts.tracked) {const [id, data] = trackedEnvelopeSchema.parse(next.value);yield tracked(id, await opts.yield.parseAsync(data));continue;}yield opts.yield.parseAsync(next.value);}if (opts.return) {return await opts.return.parseAsync(next.value);}return;} finally {await iterator.return?.();}}) as z.ZodType<AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldIn> : TYieldIn,TReturnIn,unknown>,any,AsyncIterable<Tracked extends true ? TrackedEnvelope<TYieldOut> : TYieldOut,TReturnOut,unknown>>;}
现在你可以使用此助手来验证订阅过程的输出:
¥Now you can use this helper to validate the output of your subscription procedures:
_app.tsts
import { publicProcedure, router } from '../trpc';import { zAsyncIterable } from './zAsyncIterable';export const appRouter = router({mySubscription: publicProcedure.input(z.object({lastEventId: z.coerce.number().min(0).optional(),}),).output(zAsyncIterable({yield: z.object({count: z.number(),}),tracked: true,}),).subscription(async function* (opts) {let index = opts.input.lastEventId ?? 0;while (true) {index++;yield tracked(index, {count: index,});await new Promise((resolve) => setTimeout(resolve, 1000));}}),});
_app.tsts
import { publicProcedure, router } from '../trpc';import { zAsyncIterable } from './zAsyncIterable';export const appRouter = router({mySubscription: publicProcedure.input(z.object({lastEventId: z.coerce.number().min(0).optional(),}),).output(zAsyncIterable({yield: z.object({count: z.number(),}),tracked: true,}),).subscription(async function* (opts) {let index = opts.input.lastEventId ?? 0;while (true) {index++;yield tracked(index, {count: index,});await new Promise((resolve) => setTimeout(resolve, 1000));}}),});