Publisher 
The Publisher is a helper that enables you to listen to and publish events to subscribers. Combined with the Event Iterator, it allows you to build streaming responses, real-time updates, and server-sent events with minimal requirements.
Installation 
npm install @orpc/experimental-publisher@latestyarn add @orpc/experimental-publisher@latestpnpm add @orpc/experimental-publisher@latestbun add @orpc/experimental-publisher@latestdeno add npm:@orpc/experimental-publisher@latestBasic Usage 
const publisher = new MemoryPublisher<{
  'something-updated': {
    id: string
  }
}>()
const live = os
  .handler(async function* ({ input, signal }) {
    const iterator = publisher.subscribe('something-updated', { signal })
    for await (const payload of iterator) {
      // Handle payload here or yield directly to client
      yield payload
    }
  })
const publish = os
  .input(z.object({ id: z.string() }))
  .handler(async ({ input }) => {
    await publisher.publish('something-updated', { id: input.id })
  })TIP
The publisher supports both static and dynamic event names.
const publisher = new MemoryPublisher<Record<string, { message: string }>>()Resume Feature 
The resume feature uses lastEventId to determine where to resume from after a disconnection.
WARNING
By default, most adapters have this feature disabled.
Server Implementation 
When subscribing, you must forward the lastEventId to the publisher to enable resuming:
const live = os
  .handler(async function* ({ input, signal, lastEventId }) {
    const iterator = publisher.subscribe('something-updated', { signal, lastEventId })
    for await (const payload of iterator) {
      yield payload
    }
  })Event ID Management
The publisher automatically manages event ids when resume is enabled. This means:
- Event ids you provide when publishing will be ignored
- When subscribing, you must forward the event id when yielding custom payloads
import { getEventMeta, withEventMeta } from '@orpc/server'
const live = os
  .handler(async function* ({ input, signal, lastEventId }) {
    const iterator = publisher.subscribe('something-updated', { signal, lastEventId })
    for await (const payload of iterator) {
      // Preserve event id when yielding custom data
      yield withEventMeta({ custom: 'value' }, { ...getEventMeta(payload) })
    }
  })
const publish = os
  .input(z.object({ id: z.string() }))
  .handler(async ({ input }) => {
    // The event id 'this-will-be-ignored' will be replaced by the publisher
    await publisher.publish('something-updated', withEventMeta({ id: input.id }, { id: 'this-will-be-ignored' }))
  })Client Implementation 
On the client, you can use the Client Retry Plugin, which automatically controls and passes lastEventId to the server when reconnecting. Alternatively, you can manage lastEventId manually:
import { getEventMeta } from '@orpc/client'
let lastEventId: string | undefined
while (true) {
  try {
    const iterator = await client.live('input', { lastEventId })
    for await (const payload of iterator) {
      lastEventId = getEventMeta(payload)?.id // Update lastEventId
      console.log(payload)
    }
  }
  catch {
    await new Promise(resolve => setTimeout(resolve, 1000)) // Wait 1 second before retrying
  }
}Available Adapters 
| Name | Resume Support | Description | 
|---|---|---|
| MemoryPublisher | ✅ | A simple in-memory publisher | 
| IORedisPublisher | ✅ | Adapter for ioredis | 
| UpstashRedisPublisher | ✅ | Adapter for Upstash Redis | 
INFO
If you'd like to add a new publisher adapter, please open an issue.
Memory Publisher 
import { MemoryPublisher } from '@orpc/experimental-publisher/memory'
const publisher = new MemoryPublisher<{
  'something-updated': {
    id: string
  }
}>({
  resumeRetentionSeconds: 60 * 2, // Retain events for 2 minutes to support resume
})INFO
Resume support is disabled by default in MemoryPublisher. Enable it by setting resumeRetentionSeconds to an appropriate value.
IORedis Publisher 
import { Redis } from 'ioredis'
import { IORedisPublisher } from '@orpc/experimental-publisher/ioredis'
const publisher = new IORedisPublisher<{
  'something-updated': {
    id: string
  }
}>({
  commander: new Redis(), // For executing short-lived commands
  subscriber: new Redis(), // For subscribing to events
  resumeRetentionSeconds: 60 * 2, // Retain events for 2 minutes to support resume
  prefix: 'orpc:publisher:', // avoid conflict with other keys
})This adapter requires two Redis instances: one for executing short-lived commands and another for subscribing to events.
INFO
Resume support is disabled by default in IORedisPublisher. Enable it by setting resumeRetentionSeconds to an appropriate value.
Upstash Redis Publisher 
import { Redis } from '@upstash/redis'
import { UpstashRedisPublisher } from '@orpc/experimental-publisher/upstash-redis'
const redis = Redis.fromEnv()
const publisher = new UpstashRedisPublisher<{
  'something-updated': {
    id: string
  }
}>(redis, {
  resumeRetentionSeconds: 60 * 2, // Retain events for 2 minutes to support resume
  prefix: 'orpc:publisher:', // avoid conflict with other keys
})INFO
Resume support is disabled by default in UpstashRedisPublisher. Enable it by setting resumeRetentionSeconds to an appropriate value.
