HTTP 订阅链接
httpSubscriptionLink
是使用 服务器发送事件(SSE)进行订阅的 终止链接。
¥httpSubscriptionLink
is a terminating link that's uses Server-sent Events (SSE) for subscriptions.
SSE 是实时的一个好选择,因为它比设置 WebSockets 服务器要容易一些。
¥SSE is a good option for real-time as it's a bit easier than setting up a WebSockets-server.
设置
¥Setup
如果你的客户端环境不支持 EventSource,则需要 EventSource polyfill。有关 React Native 特定说明,请参阅 兼容性部分。
¥If your client's environment doesn't support EventSource, you need an EventSource polyfill. For React Native specific instructions please defer to the compatibility section.
要使用 httpSubscriptionLink
,你需要使用 splitLink 明确表示我们想要使用 SSE 进行订阅。
¥To use httpSubscriptionLink
, you need to use a splitLink to make it explicit that we want to use SSE for subscriptions.
client/index.tsts
import type { TRPCLink } from '@trpc/client';import {httpBatchLink,httpSubscriptionLink,loggerLink,splitLink,} from '@trpc/client';const trpcClient = createTRPCClient<AppRouter>({/*** @see https://trpc.nodejs.cn/docs/v11/client/links*/links: [// adds pretty logs to your console in development and logs errors in productionloggerLink(),splitLink({// uses the httpSubscriptionLink for subscriptionscondition: (op) => op.type === 'subscription',true: httpSubscriptionLink({url: `/api/trpc`,}),false: httpBatchLink({url: `/api/trpc`,}),}),],});
client/index.tsts
import type { TRPCLink } from '@trpc/client';import {httpBatchLink,httpSubscriptionLink,loggerLink,splitLink,} from '@trpc/client';const trpcClient = createTRPCClient<AppRouter>({/*** @see https://trpc.nodejs.cn/docs/v11/client/links*/links: [// adds pretty logs to your console in development and logs errors in productionloggerLink(),splitLink({// uses the httpSubscriptionLink for subscriptionscondition: (op) => op.type === 'subscription',true: httpSubscriptionLink({url: `/api/trpc`,}),false: httpBatchLink({url: `/api/trpc`,}),}),],});
此处的文档概述了使用 httpSubscriptionLink
的具体细节。有关订阅的一般用法,请参阅 我们的订阅指南。
¥The document here outlines the specific details of using httpSubscriptionLink
. For general usage of subscriptions, see our subscriptions guide.
标头和授权/身份验证
¥Headers and authorization / authentication
Web 应用
¥Web apps
相同域
¥Same domain
如果你正在运行 Web 应用,只要你的客户端与服务器位于同一域中,cookie 就会作为请求的一部分发送。
¥If you're doing a web application, cookies are sent as part of the request as long as your client is on the same domain as the server.
跨域
¥Cross-domain
如果客户端和服务器不在同一个域中,你可以使用 withCredentials: true
(在此处阅读 MDN 上的更多信息)。
¥If the client and server are not on the same domain, you can use withCredentials: true
(read more on MDN here).
示例:
¥Example:
tsx
// [...]httpSubscriptionLink({url: 'https://example.com/api/trpc',eventSourceOptions() {return {withCredentials: true, // <---};},});
tsx
// [...]httpSubscriptionLink({url: 'https://example.com/api/trpc',eventSourceOptions() {return {withCredentials: true, // <---};},});
通过 ponyfill 自定义标头
¥Custom headers through ponyfill
推荐用于非 Web 环境
¥Recommended for non-web environments
你可以 ponyfill EventSource
并使用 eventSourceOptions
回调来填充标头。
¥You can ponyfill EventSource
and use the eventSourceOptions
-callback to populate headers.
tsx
import {createTRPCClient,httpBatchLink,httpSubscriptionLink,splitLink,} from '@trpc/client';import { EventSourcePolyfill } from 'event-source-polyfill';import type { AppRouter } from '../server/index.js';// Initialize the tRPC clientconst trpc = createTRPCClient<AppRouter>({links: [splitLink({condition: (op) => op.type === 'subscription',true: httpSubscriptionLink({url: 'http://localhost:3000',// ponyfill EventSourceEventSource: EventSourcePolyfill,// options to pass to the EventSourcePolyfill constructoreventSourceOptions: async ({ op }) => {// ^ Includes the operation that's being executed// you can use this to generate a signature for the operationconst signature = await getSignature(op);return {headers: {authorization: 'Bearer supersecret','x-signature': signature,},};},}),false: httpBatchLink({url: 'http://localhost:3000',}),}),],});
tsx
import {createTRPCClient,httpBatchLink,httpSubscriptionLink,splitLink,} from '@trpc/client';import { EventSourcePolyfill } from 'event-source-polyfill';import type { AppRouter } from '../server/index.js';// Initialize the tRPC clientconst trpc = createTRPCClient<AppRouter>({links: [splitLink({condition: (op) => op.type === 'subscription',true: httpSubscriptionLink({url: 'http://localhost:3000',// ponyfill EventSourceEventSource: EventSourcePolyfill,// options to pass to the EventSourcePolyfill constructoreventSourceOptions: async ({ op }) => {// ^ Includes the operation that's being executed// you can use this to generate a signature for the operationconst signature = await getSignature(op);return {headers: {authorization: 'Bearer supersecret','x-signature': signature,},};},}),false: httpBatchLink({url: 'http://localhost:3000',}),}),],});
在活动连接上更新配置
¥Updating configuration on an active connection
httpSubscriptionLink
通过 EventSource
利用 SSE,确保遇到网络故障或错误响应代码等错误的连接会自动重试。但是,EventSource
不允许重新执行 eventSourceOptions()
或 url()
选项来更新其配置,这在自上次连接以来身份验证已过期的情况下尤为重要。
¥httpSubscriptionLink
leverages SSE through EventSource
, ensuring that connections encountering errors like network failures or bad response codes are automatically retried. However, EventSource
does not allow re-execution of the eventSourceOptions()
or url()
options to update its configuration, which is particularly important in scenarios where authentication has expired since the last connection.
为了解决这个限制,你可以将 retryLink
与 httpSubscriptionLink
结合使用。此方法可确保使用最新配置重新建立连接,包括任何更新的身份验证详细信息。
¥To address this limitation, you can use a retryLink
in conjunction with httpSubscriptionLink
. This approach ensures that the connection is re-established with the latest configuration, including any updated authentication details.
请注意,重新启动连接将导致从头开始重新创建 EventSource
,这意味着任何先前跟踪的事件都将丢失。
¥Please note that restarting the connection will result in the EventSource
being recreated from scratch, which means any previously tracked events will be lost.
tsx
import {createTRPCClient,httpBatchLink,httpSubscriptionLink,retryLink,splitLink,} from '@trpc/client';import {EventSourcePolyfill,EventSourcePolyfillInit,} from 'event-source-polyfill';import type { AppRouter } from '../server/index.js';// Initialize the tRPC clientconst trpc = createTRPCClient<AppRouter>({links: [splitLink({condition: (op) => op.type === 'subscription',false: httpBatchLink({url: 'http://localhost:3000',}),true: [retryLink({retry: (opts) => {opts.op.type;// ^? will always be 'subscription' since we're in a splitLinkconst code = opts.error.data?.code;if (!code) {// This shouldn't happen as our httpSubscriptionLink will automatically retry within when there's a non-parsable responseconsole.error('No error code found, retrying', opts);return true;}if (code === 'UNAUTHORIZED' || code === 'FORBIDDEN') {console.log('Retrying due to 401/403 error');return true;}return false;},}),httpSubscriptionLink({url: async () => {// calculate the latest URL if needed...return getAuthenticatedUri();},// ponyfill EventSourceEventSource: EventSourcePolyfill,eventSourceOptions: async () => {// ...or maybe renew an access tokenconst token = await auth.getOrRenewToken();return {headers: {authorization: `Bearer ${token}`,},};},}),],}),],});
tsx
import {createTRPCClient,httpBatchLink,httpSubscriptionLink,retryLink,splitLink,} from '@trpc/client';import {EventSourcePolyfill,EventSourcePolyfillInit,} from 'event-source-polyfill';import type { AppRouter } from '../server/index.js';// Initialize the tRPC clientconst trpc = createTRPCClient<AppRouter>({links: [splitLink({condition: (op) => op.type === 'subscription',false: httpBatchLink({url: 'http://localhost:3000',}),true: [retryLink({retry: (opts) => {opts.op.type;// ^? will always be 'subscription' since we're in a splitLinkconst code = opts.error.data?.code;if (!code) {// This shouldn't happen as our httpSubscriptionLink will automatically retry within when there's a non-parsable responseconsole.error('No error code found, retrying', opts);return true;}if (code === 'UNAUTHORIZED' || code === 'FORBIDDEN') {console.log('Retrying due to 401/403 error');return true;}return false;},}),httpSubscriptionLink({url: async () => {// calculate the latest URL if needed...return getAuthenticatedUri();},// ponyfill EventSourceEventSource: EventSourcePolyfill,eventSourceOptions: async () => {// ...or maybe renew an access tokenconst token = await auth.getOrRenewToken();return {headers: {authorization: `Bearer ${token}`,},};},}),],}),],});
连接参数
¥Connection params
为了使用 EventSource
进行身份验证,你可以在 httpSubscriptionLink
中定义 connectionParams
。这将作为 URL 的一部分发送,这就是为什么首选其他方法的原因。
¥In order to authenticate with EventSource
, you can define connectionParams
in httpSubscriptionLink
. This will be sent as part of the URL, which is why other methods are preferred).
server/context.tsts
import type {CreateHTTPContextOptions } from '@trpc/server/adapters/standalone';export constcreateContext = async (opts :CreateHTTPContextOptions ) => {consttoken =opts .info .connectionParams ?.token ;// [... authenticate]return {};};export typeContext =Awaited <ReturnType <typeofcreateContext >>;
server/context.tsts
import type {CreateHTTPContextOptions } from '@trpc/server/adapters/standalone';export constcreateContext = async (opts :CreateHTTPContextOptions ) => {consttoken =opts .info .connectionParams ?.token ;// [... authenticate]return {};};export typeContext =Awaited <ReturnType <typeofcreateContext >>;
client/trpc.tsts
import {createTRPCClient,httpBatchLink,httpSubscriptionLink,splitLink,} from '@trpc/client';import type { AppRouter } from '../server/index.js';// Initialize the tRPC clientconst trpc = createTRPCClient<AppRouter>({links: [splitLink({condition: (op) => op.type === 'subscription',true: httpSubscriptionLink({url: 'http://localhost:3000',connectionParams: async () => {// Will be serialized as part of the URLreturn {token: 'supersecret',};},}),false: httpBatchLink({url: 'http://localhost:3000',}),}),],});
client/trpc.tsts
import {createTRPCClient,httpBatchLink,httpSubscriptionLink,splitLink,} from '@trpc/client';import type { AppRouter } from '../server/index.js';// Initialize the tRPC clientconst trpc = createTRPCClient<AppRouter>({links: [splitLink({condition: (op) => op.type === 'subscription',true: httpSubscriptionLink({url: 'http://localhost:3000',connectionParams: async () => {// Will be serialized as part of the URLreturn {token: 'supersecret',};},}),false: httpBatchLink({url: 'http://localhost:3000',}),}),],});
超时配置
¥Timeout Configuration
httpSubscriptionLink
支持通过 reconnectAfterInactivityMs
选项配置不活动超时。如果在指定的超时期限内未收到任何消息(包括 ping 消息),则连接将被标记为 "connecting" 并自动尝试重新连接。
¥The httpSubscriptionLink
supports configuring a timeout for inactivity through the reconnectAfterInactivityMs
option. If no messages (including ping messages) are received within the specified timeout period, the connection will be marked as "connecting" and automatically attempt to reconnect.
初始化 tRPC 时,在服务器端设置超时配置:
¥The timeout configuration is set on the server side when initializing tRPC:
server/trpc.tsts
import { initTRPC } from '@trpc/server';export const t = initTRPC.create({sse: {client: {reconnectAfterInactivityMs: 3_000,},},});
server/trpc.tsts
import { initTRPC } from '@trpc/server';export const t = initTRPC.create({sse: {client: {reconnectAfterInactivityMs: 3_000,},},});
服务器 Ping 配置
¥Server Ping Configuration
可以配置服务器发送定期 ping 消息以保持连接活动并防止超时断开连接。当与 reconnectAfterInactivityMs
选项结合使用时,这特别有用。
¥The server can be configured to send periodic ping messages to keep the connection alive and prevent timeout disconnections. This is particularly useful when combined with the reconnectAfterInactivityMs
-option.
server/trpc.tsts
import { initTRPC } from '@trpc/server';export const t = initTRPC.create({sse: {// Maximum duration of a single SSE connection in milliseconds// maxDurationMs: 60_00,ping: {// Enable periodic ping messages to keep connection aliveenabled: true,// Send ping message every 2sintervalMs: 2_000,},// client: {// reconnectAfterInactivityMs: 3_000// }},});
server/trpc.tsts
import { initTRPC } from '@trpc/server';export const t = initTRPC.create({sse: {// Maximum duration of a single SSE connection in milliseconds// maxDurationMs: 60_00,ping: {// Enable periodic ping messages to keep connection aliveenabled: true,// Send ping message every 2sintervalMs: 2_000,},// client: {// reconnectAfterInactivityMs: 3_000// }},});
兼容性(React Native)
¥Compatibility (React Native)
httpSubscriptionLink
使用 EventSource
API、Streams API 和 AsyncIterator
,这些 API 不受 React Native 原生支持,必须进行填充。
¥The httpSubscriptionLink
makes use of the EventSource
API, Streams API, and AsyncIterator
s, these are not natively supported by React Native and will have to be ponyfilled.
要 ponyfill EventSource
,我们建议使用利用 React Native 公开的网络库的 polyfill,而不是使用使用 XMLHttpRequest
API 的 polyfill。使用 XMLHttpRequest
polyfill EventSource
的库在应用处于后台后无法重新连接。考虑使用 rn-eventsource-reborn 包。
¥To ponyfill EventSource
we recommend to use a polyfill that utilizes the networking library exposed by React Native, over using a polyfill that using the XMLHttpRequest
API. Libraries that polyfill EventSource
using XMLHttpRequest
fail to reconnect after the app has been in the background. Consider using the rn-eventsource-reborn package.
可以使用 web-streams-polyfill 包来填充 Streams API。
¥The Streams API can be ponyfilled using the web-streams-polyfill package.
AsyncIterator
可以使用 @azure/core-asynciterator-polyfill 包进行 polyfill。
¥AsyncIterator
s can be polyfilled using the @azure/core-asynciterator-polyfill package.
安装
¥Installation
安装所需的 polyfill:
¥Install the required polyfills:
- npm
- yarn
- pnpm
- bun
- deno
npm install rn-eventsource-reborn web-streams-polyfill @azure/core-asynciterator-polyfill
yarn add rn-eventsource-reborn web-streams-polyfill @azure/core-asynciterator-polyfill
pnpm add rn-eventsource-reborn web-streams-polyfill @azure/core-asynciterator-polyfill
bun add rn-eventsource-reborn web-streams-polyfill @azure/core-asynciterator-polyfill
deno add npm:rn-eventsource-reborn npm:web-streams-polyfill npm:@azure/core-asynciterator-polyfill
在使用链接之前将 polyfill 添加到你的项目中(例如,你添加 TRPCReact.Provider 的位置):
¥Add the polyfills to your project before the link is used (e.g. where you add your TRPCReact.Provider):
utils/api.tsxts
import '@azure/core-asynciterator-polyfill';import { RNEventSource } from 'rn-eventsource-reborn';import { ReadableStream, TransformStream } from 'web-streams-polyfill';globalThis.ReadableStream = globalThis.ReadableStream || ReadableStream;globalThis.TransformStream = globalThis.TransformStream || TransformStream;
utils/api.tsxts
import '@azure/core-asynciterator-polyfill';import { RNEventSource } from 'rn-eventsource-reborn';import { ReadableStream, TransformStream } from 'web-streams-polyfill';globalThis.ReadableStream = globalThis.ReadableStream || ReadableStream;globalThis.TransformStream = globalThis.TransformStream || TransformStream;
添加 ponyfill 后,你可以继续设置 httpSubscriptionLink
,如 setup 部分所述。
¥Once the ponyfills are added, you can continue setting up the httpSubscriptionLink
as described in the setup section.
httpSubscriptionLink
选项
¥httpSubscriptionLink
Options
ts
type HTTPSubscriptionLinkOptions<TRoot extends AnyClientTypes,TEventSource extends EventSourceLike.AnyConstructor = typeof EventSource,> = {/*** EventSource ponyfill*/EventSource?: TEventSource;/*** EventSource options or a callback that returns them*/eventSourceOptions?:| EventSourceLike.InitDictOf<TEventSource>| ((opts: {op: Operation;}) =>| EventSourceLike.InitDictOf<TEventSource>| Promise<EventSourceLike.InitDictOf<TEventSource>>);};
ts
type HTTPSubscriptionLinkOptions<TRoot extends AnyClientTypes,TEventSource extends EventSourceLike.AnyConstructor = typeof EventSource,> = {/*** EventSource ponyfill*/EventSource?: TEventSource;/*** EventSource options or a callback that returns them*/eventSourceOptions?:| EventSourceLike.InitDictOf<TEventSource>| ((opts: {op: Operation;}) =>| EventSourceLike.InitDictOf<TEventSource>| Promise<EventSourceLike.InitDictOf<TEventSource>>);};
SSE 选项服务器
¥SSE Options on the server
ts
export interface SSEStreamProducerOptions<TValue = unknown> {ping?: {/*** Enable ping comments sent from the server* @default false*/enabled: boolean;/*** Interval in milliseconds* @default 1000*/intervalMs?: number;};/*** Maximum duration in milliseconds for the request before ending the stream* @default undefined*/maxDurationMs?: number;/*** End the request immediately after data is sent* Only useful for serverless runtimes that do not support streaming responses* @default false*/emitAndEndImmediately?: boolean;/*** Client-specific options - these will be sent to the client as part of the first message* @default {}*/client?: {/*** Timeout and reconnect after inactivity in milliseconds* @default undefined*/reconnectAfterInactivityMs?: number;};}
ts
export interface SSEStreamProducerOptions<TValue = unknown> {ping?: {/*** Enable ping comments sent from the server* @default false*/enabled: boolean;/*** Interval in milliseconds* @default 1000*/intervalMs?: number;};/*** Maximum duration in milliseconds for the request before ending the stream* @default undefined*/maxDurationMs?: number;/*** End the request immediately after data is sent* Only useful for serverless runtimes that do not support streaming responses* @default false*/emitAndEndImmediately?: boolean;/*** Client-specific options - these will be sent to the client as part of the first message* @default {}*/client?: {/*** Timeout and reconnect after inactivity in milliseconds* @default undefined*/reconnectAfterInactivityMs?: number;};}