Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changeset/rpc-stream-support.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"effect-app": minor
"@effect-app/infra": minor
---

Add Effect RPC `Stream` support to the wrapper.

- New `Stream` request constructor on `TaggedRequestFor` parallel to `Query`/`Command`. Emits resources with `type: "stream"`.
- Server router (`@effect-app/infra` `routing.ts`) accepts stream resources whose handlers return a `Stream.Stream<A, E, R>` (or a function from input to one). Forwards `stream: true` to `Rpc.make` so `RpcSchema.Stream` wrapping is applied. Streams bypass `applyRequestTypeInterruptibility` and the `Effect.withSpan` wrapping (the RPC server adds its own span).
- Client (`apiClientFactory.ts`) detects stream resources, forwards `stream: true` when constructing `RpcGroup`, and exposes the per-request `handler` as a `Stream.Stream` (via `Stream.unwrap` over the `ManagedRuntime` context) instead of an `Effect`. `Invalidation.CommandResponseWithMetaData` continues to apply only to commands.
- New `RequestStreamHandler` / `RequestStreamHandlerWithInput` shapes in `clientFor.ts`; `RequestHandlers` dispatches on `type: "stream"`.
86 changes: 51 additions & 35 deletions packages/effect-app/src/client/apiClientFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import * as ManagedRuntime from "effect/ManagedRuntime"
import * as Option from "effect/Option"
import * as Predicate from "effect/Predicate"
import * as Schema from "effect/Schema"
import * as Stream from "effect/Stream"
import * as Struct from "effect/Struct"
import { Rpc, RpcClient, RpcGroup, RpcSerialization } from "effect/unstable/rpc"
import * as Config from "../Config.js"
Expand Down Expand Up @@ -41,7 +42,7 @@ export type Req = S.Top & {
config?: Record<string, any>
readonly id: string
readonly moduleName: string
readonly type: "command" | "query"
readonly type: "command" | "query" | "stream"
readonly "~decodingServices"?: unknown
}

Expand Down Expand Up @@ -122,12 +123,15 @@ export const makeRpcGroupFromRequestsAndModuleName = <M extends RequestsAny, con
const rpcs = RpcGroup
.make(
...typedValuesOf(filtered).map((_) => {
return Rpc.make((_ as any)._tag, {
payload: _ as any,
success: (_ as any).type === "command"
? Invalidation.CommandResponseWithMetaData((_ as any).success)
: (_ as any).success,
error: (_ as any).error
const r = _ as any
const isStream = r.type === "stream"
return Rpc.make(r._tag, {
payload: r,
success: r.type === "command"
? Invalidation.CommandResponseWithMetaData(r.success)
: r.success,
error: r.error,
...isStream ? { stream: true as const } : {}
})
})
)
Expand Down Expand Up @@ -224,40 +228,52 @@ const makeApiClientFactory = Effect
const fields = Struct.omit(Request.fields, ["_tag"] as const)
const requestAttr = `${meta.moduleName}.${h._tag}`
const isCommand = h.type === "command"
const isStream = h.type === "stream"

const buildEffect = (input: any) =>
mr.contextEffect.pipe(
Effect.flatMap((svcs) => {
const rpcEffect = TheClient
.use((client) =>
(client as any)[requestAttr]!(Request.make(input)) as Effect.Effect<any, any, never>
)
.pipe(
Effect.provide(layers),
Effect.provide(svcs)
)
return isCommand ? unwrapCommand(rpcEffect) : rpcEffect
})
)

const buildStream = (input: any) =>
Stream.unwrap(
mr.contextEffect.pipe(
Effect.flatMap((svcs) =>
TheClient
.useSync((client) => {
const rpcStream = (client as any)[requestAttr]!(
Request.make(input)
) as Stream.Stream<any, any, any>
return rpcStream.pipe(
Stream.provide(layers),
Stream.provide(svcs)
)
})
.pipe(Effect.provide(svcs))
)
)
)

// @ts-expect-error doc
prev[cur] = Object.keys(fields).length === 0
? {
handler: mr.contextEffect.pipe(
Effect.flatMap((svcs) => {
const rpcEffect = TheClient
.use((client) =>
(client as any)[requestAttr]!(Request.make({})) as Effect.Effect<any, any, never>
)
.pipe(
Effect.provide(layers),
Effect.provide(svcs)
)
return isCommand ? unwrapCommand(rpcEffect) : rpcEffect
})
),
handler: isStream ? buildStream({}) : buildEffect({}),
...requestMeta
}
: {
handler: (req: any) =>
mr.contextEffect.pipe(
Effect.flatMap((svcs) => {
const rpcEffect = TheClient
.use((client) =>
(client as any)[requestAttr]!(Request.make(req)) as Effect.Effect<any, any, never>
)
.pipe(
Effect.provide(layers),
Effect.provide(svcs)
)
return isCommand ? unwrapCommand(rpcEffect) : rpcEffect
})
),

handler: isStream
? (req: any) => buildStream(req)
: (req: any) => buildEffect(req),
...requestMeta
}

Expand Down
63 changes: 49 additions & 14 deletions packages/effect-app/src/client/clientFor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
/* eslint-disable @typescript-eslint/no-explicit-any */

import * as Record from "effect/Record"
import type * as Stream from "effect/Stream"
import type { Path } from "path-parser"
import qs from "query-string"
import type * as Effect from "../Effect.js"
Expand Down Expand Up @@ -98,6 +99,20 @@ export interface RequestHandlerWithInput<I, A, E, R, Request extends Req, Id ext
Request: Request
}

export interface RequestStreamHandler<A, E, R, Request extends Req, Id extends string> {
handler: Stream.Stream<A, E, R>
id: Id
options?: ClientForOptions
Request: Request
}

export interface RequestStreamHandlerWithInput<I, A, E, R, Request extends Req, Id extends string> {
handler: (i: I) => Stream.Stream<A, E, R>
id: Id
options?: ClientForOptions
Request: Request
}

// make sure this is exported or d.ts of apiClientFactory breaks?!
type ReqDecodingServices<M> = M extends { readonly "~decodingServices": infer DS } ? DS : never

Expand Down Expand Up @@ -126,20 +141,40 @@ type RequestInput<I extends { readonly make: (...args: any[]) => any }> = Normal
RequestInputFromMake<I>
>

export type RequestHandlers<R, E, M extends RequestsAny, ModuleName extends string> = {
[K in keyof M as M[K] extends Req ? K : never]: IsTagOnly<RequestInputFromMake<M[K]>> extends true ? RequestHandler<
S.Schema.Type<M[K]["success"]>,
S.Schema.Type<M[K]["error"]> | E,
R | ReqDecodingServices<M[K]>,
M[K],
`${ModuleName}.${K & string}`
type RequestHandlerFor<R, E, T extends Req, Id extends string> = T["type"] extends "stream"
? IsTagOnly<RequestInputFromMake<T>> extends true ? RequestStreamHandler<
S.Schema.Type<T["success"]>,
S.Schema.Type<T["error"]> | E,
R | ReqDecodingServices<T>,
T,
Id
>
: RequestHandlerWithInput<
RequestInput<M[K]>,
S.Schema.Type<M[K]["success"]>,
S.Schema.Type<M[K]["error"]> | E,
R | ReqDecodingServices<M[K]>,
M[K],
`${ModuleName}.${K & string}`
: RequestStreamHandlerWithInput<
RequestInput<T>,
S.Schema.Type<T["success"]>,
S.Schema.Type<T["error"]> | E,
R | ReqDecodingServices<T>,
T,
Id
>
: IsTagOnly<RequestInputFromMake<T>> extends true ? RequestHandler<
S.Schema.Type<T["success"]>,
S.Schema.Type<T["error"]> | E,
R | ReqDecodingServices<T>,
T,
Id
>
: RequestHandlerWithInput<
RequestInput<T>,
S.Schema.Type<T["success"]>,
S.Schema.Type<T["error"]> | E,
R | ReqDecodingServices<T>,
T,
Id
>

export type RequestHandlers<R, E, M extends RequestsAny, ModuleName extends string> = {
[K in keyof M as M[K] extends Req ? K : never]: Extract<M[K], Req> extends infer T extends Req
? RequestHandlerFor<R, E, T, `${ModuleName}.${K & string}`>
: never
}
13 changes: 10 additions & 3 deletions packages/effect-app/src/client/makeClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type TaggedRequestForResult<
Error extends S.Top,
Config,
ModuleName extends string,
Type extends "command" | "query",
Type extends "command" | "query" | "stream",
Resources = never
> =
& S.EnhancedClass<Self, TaggedRequestSchema<Tag, Payload>, {}>
Expand Down Expand Up @@ -150,7 +150,7 @@ export const makeRpcClient = <
return RequestClass
}

function makeTaggedRequestWithMeta<ModuleName extends string, Type extends "command" | "query">(
function makeTaggedRequestWithMeta<ModuleName extends string, Type extends "command" | "query" | "stream">(
moduleName: ModuleName,
type: Type
) {
Expand Down Expand Up @@ -344,6 +344,7 @@ export const makeRpcClient = <
function TaggedRequestFor<ModuleName extends string>(moduleName: ModuleName) {
const Query = makeTaggedRequestWithMeta(moduleName, "query")
const Command = makeTaggedRequestWithMeta(moduleName, "command")
const Stream = makeTaggedRequestWithMeta(moduleName, "stream")

return {
moduleName,
Expand All @@ -356,7 +357,13 @@ export const makeRpcClient = <
* Create command request classes for this module.
* Commands mutate state and should avoid returning complex read models.
*/
Command
Command,
/**
* Create stream request classes for this module.
* Streams produce a Stream of `success` values, may also fail with `error`.
* Handlers must return an `Effect`-compatible Stream rather than an Effect.
*/
Stream
} as const
}

Expand Down
Loading
Loading