Skip to main content
Version: 11.x

HTTP 订阅链接

httpSubscriptionLink 是使用 服务器发送事件(SSE)进行订阅的 终止链接

¥httpSubscriptionLink is a terminating link that's uses Server-sent Events (SSE) for subscriptions.

SSE 是实时的一个好选择,因为它比设置 WebSockets 服务器要容易一些。

¥SSE is a good option for real-time as it's a bit easier than setting up a WebSockets-server.

设置

¥Setup

信息

如果你的客户端环境不支持 EventSource,则需要 EventSource polyfill。有关 React Native 特定说明,请参阅 兼容性部分

¥If your client's environment doesn't support EventSource, you need an EventSource polyfill. For React Native specific instructions please defer to the compatibility section.

要使用 httpSubscriptionLink,你需要使用 splitLink 明确表示我们想要使用 SSE 进行订阅。

¥To use httpSubscriptionLink, you need to use a splitLink to make it explicit that we want to use SSE for subscriptions.

client/index.ts
ts
import type { TRPCLink } from '@trpc/client';
import {
httpBatchLink,
httpSubscriptionLink,
loggerLink,
splitLink,
} from '@trpc/client';
const trpcClient = createTRPCClient<AppRouter>({
/**
* @see https://trpc.nodejs.cn/docs/v11/client/links
*/
links: [
// adds pretty logs to your console in development and logs errors in production
loggerLink(),
splitLink({
// uses the httpSubscriptionLink for subscriptions
condition: (op) => op.type === 'subscription',
true: httpSubscriptionLink({
url: `/api/trpc`,
}),
false: httpBatchLink({
url: `/api/trpc`,
}),
}),
],
});
client/index.ts
ts
import type { TRPCLink } from '@trpc/client';
import {
httpBatchLink,
httpSubscriptionLink,
loggerLink,
splitLink,
} from '@trpc/client';
const trpcClient = createTRPCClient<AppRouter>({
/**
* @see https://trpc.nodejs.cn/docs/v11/client/links
*/
links: [
// adds pretty logs to your console in development and logs errors in production
loggerLink(),
splitLink({
// uses the httpSubscriptionLink for subscriptions
condition: (op) => op.type === 'subscription',
true: httpSubscriptionLink({
url: `/api/trpc`,
}),
false: httpBatchLink({
url: `/api/trpc`,
}),
}),
],
});
提示

此处的文档概述了使用 httpSubscriptionLink 的具体细节。有关订阅的一般用法,请参阅 我们的订阅指南

¥The document here outlines the specific details of using httpSubscriptionLink. For general usage of subscriptions, see our subscriptions guide.

标头和授权/身份验证

¥Headers and authorization / authentication

Web 应用

¥Web apps

相同域

¥Same domain

如果你正在运行 Web 应用,只要你的客户端与服务器位于同一域中,cookie 就会作为请求的一部分发送。

¥If you're doing a web application, cookies are sent as part of the request as long as your client is on the same domain as the server.

跨域

¥Cross-domain

如果客户端和服务器不在同一个域中,你可以使用 withCredentials: true在此处阅读 MDN 上的更多信息)。

¥If the client and server are not on the same domain, you can use withCredentials: true (read more on MDN here).

示例:

¥Example:

tsx
// [...]
httpSubscriptionLink({
url: 'https://example.com/api/trpc',
eventSourceOptions() {
return {
withCredentials: true, // <---
};
},
});
tsx
// [...]
httpSubscriptionLink({
url: 'https://example.com/api/trpc',
eventSourceOptions() {
return {
withCredentials: true, // <---
};
},
});

通过 ponyfill 自定义标头

¥Custom headers through ponyfill

推荐用于非 Web 环境

¥Recommended for non-web environments

你可以 ponyfill EventSource 并使用 eventSourceOptions 回调来填充标头。

¥You can ponyfill EventSource and use the eventSourceOptions -callback to populate headers.

tsx
import {
createTRPCClient,
httpBatchLink,
httpSubscriptionLink,
splitLink,
} from '@trpc/client';
import { EventSourcePolyfill } from 'event-source-polyfill';
import type { AppRouter } from '../server/index.js';
// Initialize the tRPC client
const trpc = createTRPCClient<AppRouter>({
links: [
splitLink({
condition: (op) => op.type === 'subscription',
true: httpSubscriptionLink({
url: 'http://localhost:3000',
// ponyfill EventSource
EventSource: EventSourcePolyfill,
// options to pass to the EventSourcePolyfill constructor
eventSourceOptions: async ({ op }) => {
// ^ Includes the operation that's being executed
// you can use this to generate a signature for the operation
const signature = await getSignature(op);
return {
headers: {
authorization: 'Bearer supersecret',
'x-signature': signature,
},
};
},
}),
false: httpBatchLink({
url: 'http://localhost:3000',
}),
}),
],
});
tsx
import {
createTRPCClient,
httpBatchLink,
httpSubscriptionLink,
splitLink,
} from '@trpc/client';
import { EventSourcePolyfill } from 'event-source-polyfill';
import type { AppRouter } from '../server/index.js';
// Initialize the tRPC client
const trpc = createTRPCClient<AppRouter>({
links: [
splitLink({
condition: (op) => op.type === 'subscription',
true: httpSubscriptionLink({
url: 'http://localhost:3000',
// ponyfill EventSource
EventSource: EventSourcePolyfill,
// options to pass to the EventSourcePolyfill constructor
eventSourceOptions: async ({ op }) => {
// ^ Includes the operation that's being executed
// you can use this to generate a signature for the operation
const signature = await getSignature(op);
return {
headers: {
authorization: 'Bearer supersecret',
'x-signature': signature,
},
};
},
}),
false: httpBatchLink({
url: 'http://localhost:3000',
}),
}),
],
});

在活动连接上更新配置

¥Updating configuration on an active connection

httpSubscriptionLink 通过 EventSource 利用 SSE,确保遇到网络故障或错误响应代码等错误的连接会自动重试。但是,EventSource 不允许重新执行 eventSourceOptions()url() 选项来更新其配置,这在自上次连接以来身份验证已过期的情况下尤为重要。

¥httpSubscriptionLink leverages SSE through EventSource, ensuring that connections encountering errors like network failures or bad response codes are automatically retried. However, EventSource does not allow re-execution of the eventSourceOptions() or url() options to update its configuration, which is particularly important in scenarios where authentication has expired since the last connection.

为了解决这个限制,你可以将 retryLinkhttpSubscriptionLink 结合使用。此方法可确保使用最新配置重新建立连接,包括任何更新的身份验证详细信息。

¥To address this limitation, you can use a retryLink in conjunction with httpSubscriptionLink. This approach ensures that the connection is re-established with the latest configuration, including any updated authentication details.

提醒

请注意,重新启动连接将导致从头开始重新创建 EventSource,这意味着任何先前跟踪的事件都将丢失。

¥Please note that restarting the connection will result in the EventSource being recreated from scratch, which means any previously tracked events will be lost.

tsx
import {
createTRPCClient,
httpBatchLink,
httpSubscriptionLink,
retryLink,
splitLink,
} from '@trpc/client';
import {
EventSourcePolyfill,
EventSourcePolyfillInit,
} from 'event-source-polyfill';
import type { AppRouter } from '../server/index.js';
// Initialize the tRPC client
const trpc = createTRPCClient<AppRouter>({
links: [
splitLink({
condition: (op) => op.type === 'subscription',
false: httpBatchLink({
url: 'http://localhost:3000',
}),
true: [
retryLink({
retry: (opts) => {
opts.op.type;
// ^? will always be 'subscription' since we're in a splitLink
const code = opts.error.data?.code;
if (!code) {
// This shouldn't happen as our httpSubscriptionLink will automatically retry within when there's a non-parsable response
console.error('No error code found, retrying', opts);
return true;
}
if (code === 'UNAUTHORIZED' || code === 'FORBIDDEN') {
console.log('Retrying due to 401/403 error');
return true;
}
return false;
},
}),
httpSubscriptionLink({
url: async () => {
// calculate the latest URL if needed...
return getAuthenticatedUri();
},
// ponyfill EventSource
EventSource: EventSourcePolyfill,
eventSourceOptions: async () => {
// ...or maybe renew an access token
const token = await auth.getOrRenewToken();
return {
headers: {
authorization: `Bearer ${token}`,
},
};
},
}),
],
}),
],
});
tsx
import {
createTRPCClient,
httpBatchLink,
httpSubscriptionLink,
retryLink,
splitLink,
} from '@trpc/client';
import {
EventSourcePolyfill,
EventSourcePolyfillInit,
} from 'event-source-polyfill';
import type { AppRouter } from '../server/index.js';
// Initialize the tRPC client
const trpc = createTRPCClient<AppRouter>({
links: [
splitLink({
condition: (op) => op.type === 'subscription',
false: httpBatchLink({
url: 'http://localhost:3000',
}),
true: [
retryLink({
retry: (opts) => {
opts.op.type;
// ^? will always be 'subscription' since we're in a splitLink
const code = opts.error.data?.code;
if (!code) {
// This shouldn't happen as our httpSubscriptionLink will automatically retry within when there's a non-parsable response
console.error('No error code found, retrying', opts);
return true;
}
if (code === 'UNAUTHORIZED' || code === 'FORBIDDEN') {
console.log('Retrying due to 401/403 error');
return true;
}
return false;
},
}),
httpSubscriptionLink({
url: async () => {
// calculate the latest URL if needed...
return getAuthenticatedUri();
},
// ponyfill EventSource
EventSource: EventSourcePolyfill,
eventSourceOptions: async () => {
// ...or maybe renew an access token
const token = await auth.getOrRenewToken();
return {
headers: {
authorization: `Bearer ${token}`,
},
};
},
}),
],
}),
],
});

连接参数

¥Connection params

为了使用 EventSource 进行身份验证,你可以在 httpSubscriptionLink 中定义 connectionParams。这将作为 URL 的一部分发送,这就是为什么首选其他方法的原因。

¥In order to authenticate with EventSource, you can define connectionParams in httpSubscriptionLink. This will be sent as part of the URL, which is why other methods are preferred).

server/context.ts
ts
import type { CreateHTTPContextOptions } from '@trpc/server/adapters/standalone';
 
export const createContext = async (opts: CreateHTTPContextOptions) => {
const token = opts.info.connectionParams?.token;
const token: string | undefined
 
// [... authenticate]
 
return {};
};
 
export type Context = Awaited<ReturnType<typeof createContext>>;
server/context.ts
ts
import type { CreateHTTPContextOptions } from '@trpc/server/adapters/standalone';
 
export const createContext = async (opts: CreateHTTPContextOptions) => {
const token = opts.info.connectionParams?.token;
const token: string | undefined
 
// [... authenticate]
 
return {};
};
 
export type Context = Awaited<ReturnType<typeof createContext>>;
client/trpc.ts
ts
import {
createTRPCClient,
httpBatchLink,
httpSubscriptionLink,
splitLink,
} from '@trpc/client';
import type { AppRouter } from '../server/index.js';
// Initialize the tRPC client
const trpc = createTRPCClient<AppRouter>({
links: [
splitLink({
condition: (op) => op.type === 'subscription',
true: httpSubscriptionLink({
url: 'http://localhost:3000',
connectionParams: async () => {
// Will be serialized as part of the URL
return {
token: 'supersecret',
};
},
}),
false: httpBatchLink({
url: 'http://localhost:3000',
}),
}),
],
});
client/trpc.ts
ts
import {
createTRPCClient,
httpBatchLink,
httpSubscriptionLink,
splitLink,
} from '@trpc/client';
import type { AppRouter } from '../server/index.js';
// Initialize the tRPC client
const trpc = createTRPCClient<AppRouter>({
links: [
splitLink({
condition: (op) => op.type === 'subscription',
true: httpSubscriptionLink({
url: 'http://localhost:3000',
connectionParams: async () => {
// Will be serialized as part of the URL
return {
token: 'supersecret',
};
},
}),
false: httpBatchLink({
url: 'http://localhost:3000',
}),
}),
],
});

超时配置

¥Timeout Configuration

httpSubscriptionLink 支持通过 reconnectAfterInactivityMs 选项配置不活动超时。如果在指定的超时期限内未收到任何消息(包括 ping 消息),则连接将被标记为 "connecting" 并自动尝试重新连接。

¥The httpSubscriptionLink supports configuring a timeout for inactivity through the reconnectAfterInactivityMs option. If no messages (including ping messages) are received within the specified timeout period, the connection will be marked as "connecting" and automatically attempt to reconnect.

初始化 tRPC 时,在服务器端设置超时配置:

¥The timeout configuration is set on the server side when initializing tRPC:

server/trpc.ts
ts
import { initTRPC } from '@trpc/server';
export const t = initTRPC.create({
sse: {
client: {
reconnectAfterInactivityMs: 3_000,
},
},
});
server/trpc.ts
ts
import { initTRPC } from '@trpc/server';
export const t = initTRPC.create({
sse: {
client: {
reconnectAfterInactivityMs: 3_000,
},
},
});

服务器 Ping 配置

¥Server Ping Configuration

可以配置服务器发送定期 ping 消息以保持连接活动并防止超时断开连接。当与 reconnectAfterInactivityMs 选项结合使用时,这特别有用。

¥The server can be configured to send periodic ping messages to keep the connection alive and prevent timeout disconnections. This is particularly useful when combined with the reconnectAfterInactivityMs-option.

server/trpc.ts
ts
import { initTRPC } from '@trpc/server';
export const t = initTRPC.create({
sse: {
// Maximum duration of a single SSE connection in milliseconds
// maxDurationMs: 60_00,
ping: {
// Enable periodic ping messages to keep connection alive
enabled: true,
// Send ping message every 2s
intervalMs: 2_000,
},
// client: {
// reconnectAfterInactivityMs: 3_000
// }
},
});
server/trpc.ts
ts
import { initTRPC } from '@trpc/server';
export const t = initTRPC.create({
sse: {
// Maximum duration of a single SSE connection in milliseconds
// maxDurationMs: 60_00,
ping: {
// Enable periodic ping messages to keep connection alive
enabled: true,
// Send ping message every 2s
intervalMs: 2_000,
},
// client: {
// reconnectAfterInactivityMs: 3_000
// }
},
});

兼容性(React Native)

¥Compatibility (React Native)

httpSubscriptionLink 使用 EventSource API、Streams API 和 AsyncIterator,这些 API 不受 React Native 原生支持,必须进行填充。

¥The httpSubscriptionLink makes use of the EventSource API, Streams API, and AsyncIterators, these are not natively supported by React Native and will have to be ponyfilled.

要 ponyfill EventSource,我们建议使用利用 React Native 公开的网络库的 polyfill,而不是使用使用 XMLHttpRequest API 的 polyfill。使用 XMLHttpRequest polyfill EventSource 的库在应用处于后台后无法重新连接。考虑使用 rn-eventsource-reborn 包。

¥To ponyfill EventSource we recommend to use a polyfill that utilizes the networking library exposed by React Native, over using a polyfill that using the XMLHttpRequest API. Libraries that polyfill EventSource using XMLHttpRequest fail to reconnect after the app has been in the background. Consider using the rn-eventsource-reborn package.

可以使用 web-streams-polyfill 包来填充 Streams API。

¥The Streams API can be ponyfilled using the web-streams-polyfill package.

AsyncIterator 可以使用 @azure/core-asynciterator-polyfill 包进行 polyfill。

¥AsyncIterators can be polyfilled using the @azure/core-asynciterator-polyfill package.

安装

¥Installation

安装所需的 polyfill:

¥Install the required polyfills:

npm install rn-eventsource-reborn web-streams-polyfill @azure/core-asynciterator-polyfill

在使用链接之前将 polyfill 添加到你的项目中(例如,你添加 TRPCReact.Provider 的位置):

¥Add the polyfills to your project before the link is used (e.g. where you add your TRPCReact.Provider):

utils/api.tsx
ts
import '@azure/core-asynciterator-polyfill';
import { RNEventSource } from 'rn-eventsource-reborn';
import { ReadableStream, TransformStream } from 'web-streams-polyfill';
globalThis.ReadableStream = globalThis.ReadableStream || ReadableStream;
globalThis.TransformStream = globalThis.TransformStream || TransformStream;
utils/api.tsx
ts
import '@azure/core-asynciterator-polyfill';
import { RNEventSource } from 'rn-eventsource-reborn';
import { ReadableStream, TransformStream } from 'web-streams-polyfill';
globalThis.ReadableStream = globalThis.ReadableStream || ReadableStream;
globalThis.TransformStream = globalThis.TransformStream || TransformStream;

添加 ponyfill 后,你可以继续设置 httpSubscriptionLink,如 setup 部分所述。

¥Once the ponyfills are added, you can continue setting up the httpSubscriptionLink as described in the setup section.

¥httpSubscriptionLink Options

ts
type HTTPSubscriptionLinkOptions<
TRoot extends AnyClientTypes,
TEventSource extends EventSourceLike.AnyConstructor = typeof EventSource,
> = {
/**
* EventSource ponyfill
*/
EventSource?: TEventSource;
/**
* EventSource options or a callback that returns them
*/
eventSourceOptions?:
| EventSourceLike.InitDictOf<TEventSource>
| ((opts: {
op: Operation;
}) =>
| EventSourceLike.InitDictOf<TEventSource>
| Promise<EventSourceLike.InitDictOf<TEventSource>>);
};
ts
type HTTPSubscriptionLinkOptions<
TRoot extends AnyClientTypes,
TEventSource extends EventSourceLike.AnyConstructor = typeof EventSource,
> = {
/**
* EventSource ponyfill
*/
EventSource?: TEventSource;
/**
* EventSource options or a callback that returns them
*/
eventSourceOptions?:
| EventSourceLike.InitDictOf<TEventSource>
| ((opts: {
op: Operation;
}) =>
| EventSourceLike.InitDictOf<TEventSource>
| Promise<EventSourceLike.InitDictOf<TEventSource>>);
};

SSE 选项服务器

¥SSE Options on the server

ts
export interface SSEStreamProducerOptions<TValue = unknown> {
ping?: {
/**
* Enable ping comments sent from the server
* @default false
*/
enabled: boolean;
/**
* Interval in milliseconds
* @default 1000
*/
intervalMs?: number;
};
/**
* Maximum duration in milliseconds for the request before ending the stream
* @default undefined
*/
maxDurationMs?: number;
/**
* End the request immediately after data is sent
* Only useful for serverless runtimes that do not support streaming responses
* @default false
*/
emitAndEndImmediately?: boolean;
/**
* Client-specific options - these will be sent to the client as part of the first message
* @default {}
*/
client?: {
/**
* Timeout and reconnect after inactivity in milliseconds
* @default undefined
*/
reconnectAfterInactivityMs?: number;
};
}
ts
export interface SSEStreamProducerOptions<TValue = unknown> {
ping?: {
/**
* Enable ping comments sent from the server
* @default false
*/
enabled: boolean;
/**
* Interval in milliseconds
* @default 1000
*/
intervalMs?: number;
};
/**
* Maximum duration in milliseconds for the request before ending the stream
* @default undefined
*/
maxDurationMs?: number;
/**
* End the request immediately after data is sent
* Only useful for serverless runtimes that do not support streaming responses
* @default false
*/
emitAndEndImmediately?: boolean;
/**
* Client-specific options - these will be sent to the client as part of the first message
* @default {}
*/
client?: {
/**
* Timeout and reconnect after inactivity in milliseconds
* @default undefined
*/
reconnectAfterInactivityMs?: number;
};
}