Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/vue-stream-query.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect-app/vue": minor
---

Add `streamQuery` support for stream-type Rpc handlers. When an Rpc is of type `"stream"`, the client now exposes a `.streamQuery` property (and `...StreamQuery` in helpers) that uses `streamedQuery` from `@tanstack/query-core` to accumulate chunks reactively as an `AsyncResult<A[], E>`.
4 changes: 2 additions & 2 deletions packages/vue/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@

```ts
useMutation(startExportCommand, {
select: (result) => pollUntilDone(result.jobId),
});
select: (result) => pollUntilDone(result.jobId)
})
```

### Patch Changes
Expand Down
81 changes: 71 additions & 10 deletions packages/vue/src/makeClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { type InvalidateOptions, type InvalidateQueryFilters, isCancelledError,
import { camelCase } from "change-case"
import { type Context, Effect, Exit, Hash, type Layer, type ManagedRuntime, S, Struct } from "effect-app"
import { type ApiClientFactory, type Req } from "effect-app/client"
import type { ExtractModuleName, RequestHandler, RequestHandlers, RequestHandlerWithInput, RequestInputFromMake, RequestsAny } from "effect-app/client/clientFor"
import type { ExtractModuleName, RequestHandler, RequestHandlers, RequestHandlerWithInput, RequestInputFromMake, RequestsAny, RequestStreamHandler, RequestStreamHandlerWithInput } from "effect-app/client/clientFor"
import type { InvalidationCallback } from "effect-app/client/makeClient"
import type * as ExitResult from "effect/Exit"
import { type Fiber } from "effect/Fiber"
Expand All @@ -13,7 +13,7 @@ import { type Commander, CommanderStatic } from "./commander.js"
import { type I18n } from "./intl.js"
import { type CommanderResolved, makeUseCommand } from "./makeUseCommand.js"
import { makeMutation, type MutationOptionsBase, useMakeMutation } from "./mutate.js"
import { type CustomUndefinedInitialQueryOptions, makeQuery } from "./query.js"
import { type CustomUndefinedInitialQueryOptions, makeQuery, makeStreamQuery } from "./query.js"
import { makeRunPromise } from "./runtime.js"
import { type Toast } from "./toast.js"

Expand Down Expand Up @@ -125,6 +125,13 @@ type CommandHandler<Req> = Req extends
? Request["type"] extends "command" ? RequestHandler<A, E, R, Request, Id> : never
: never

type StreamHandler<Req> = Req extends
RequestStreamHandlerWithInput<infer I, infer A, infer E, infer R, infer Request, infer Id>
? RequestStreamHandlerWithInput<I, A, E, R, Request, Id>
: Req extends RequestStreamHandler<infer A, infer E, infer R, infer Request, infer Id>
? RequestStreamHandler<A, E, R, Request, Id>
: never

export interface MutationExtensions<RT, Id extends string, I, A, E, R> {
/** Defines a Command based on this mutation, taking the `id` of the mutation as the `id` of the Command.
* The Mutation function will be taken as the first member of the Command, the Command required input will be the Mutation input.
Expand Down Expand Up @@ -222,6 +229,8 @@ export type MutationWithExtensions<RT, Req> = Req extends
declare const useQuery_: QueryImpl<any>["useQuery"]
// eslint-disable-next-line unused-imports/no-unused-vars
declare const useSuspenseQuery_: QueryImpl<any>["useSuspenseQuery"]
// eslint-disable-next-line unused-imports/no-unused-vars
declare const useStreamQuery_: QueryImpl<any>["useStreamQuery"]

export interface ProjectResult<RT, I, B, E, R, Request extends Req, Id extends string> {
request: (i: I) => Effect.Effect<B, E, R>
Expand Down Expand Up @@ -312,6 +321,32 @@ export type Queries<RT, Req> = Req extends
: never
: never

export interface StreamQueriesWithInput<Request extends Req, Id extends string, I, A, E> {
/**
* Stream helper for stream requests.
* Runs as a tracked Vue Query and returns reactive state with accumulated chunks.
* Data is an array of all chunks received so far.
*/
streamQuery: ReturnType<typeof useStreamQuery_<I, E, A, Request, Id>>
}
export interface StreamQueriesWithoutInput<Request extends Req, Id extends string, A, E> {
/**
* Stream helper for stream requests.
* Runs as a tracked Vue Query and returns reactive state with accumulated chunks.
* Data is an array of all chunks received so far.
*/
streamQuery: ReturnType<typeof useStreamQuery_<E, A, Request, Id>>
}

export type StreamQueries<RT, HandlerReq> = HandlerReq extends
RequestStreamHandlerWithInput<infer I, infer A, infer E, infer R, infer Request, infer Id>
? Exclude<R, RT> extends never ? StreamQueriesWithInput<Request, Id, I, A, E>
: { streamQuery: MissingDependencies<RT, R> & {} }
: HandlerReq extends RequestStreamHandler<infer A, infer E, infer R, infer Request, infer Id>
? Exclude<R, RT> extends never ? StreamQueriesWithoutInput<Request, Id, A, E>
: { streamQuery: MissingDependencies<RT, R> & {} }
: never

const _useMutation = makeMutation()

const wrapWithSpan = (self: { id: string; handler: any }, mut: any) => {
Expand Down Expand Up @@ -370,13 +405,20 @@ export type ClientFrom<M extends RequestsAny> = RequestHandlers<never, never, M,
export class QueryImpl<R> {
constructor(readonly getRuntime: () => Context.Context<R>) {
this.useQuery = makeQuery(this.getRuntime)
this.useStreamQuery = makeStreamQuery(this.getRuntime)
}
/**
* Effect results are passed to the caller, including errors.
* @deprecated use client helpers instead (.query())
*/
readonly useQuery: ReturnType<typeof makeQuery<R>>

/**
* Stream results are accumulated as an array of chunks and returned as reactive state.
* @deprecated use client helpers instead (.streamQuery())
*/
readonly useStreamQuery: ReturnType<typeof makeStreamQuery<R>>

/**
* The difference with useQuery is that this function will return a Promise you can await in the Setup,
* which ensures that either there always is a latest value, or an error occurs on load.
Expand Down Expand Up @@ -574,6 +616,7 @@ export const makeClient = <RT_, RTHooks>(
const query = new QueryImpl(getBaseRt)
const useQuery = query.useQuery
const useSuspenseQuery = query.useSuspenseQuery
const useStreamQuery = query.useStreamQuery

const mergeInvalidation = (
a?: MutationOptionsBase["queryInvalidation"],
Expand Down Expand Up @@ -617,15 +660,19 @@ export const makeClient = <RT_, RTHooks>(
) => {
const queries = Struct.keys(client).reduce(
(acc, key) => {
if (client[key].Request.type !== "query") {
return acc
const requestType = client[key].Request.type
if (requestType === "query") {
;(acc as any)[camelCase(key) + "Query"] = Object.assign(useQuery(client[key] as any), {
id: client[key].id
})
;(acc as any)[camelCase(key) + "SuspenseQuery"] = Object.assign(useSuspenseQuery(client[key] as any), {
id: client[key].id
})
} else if (requestType === "stream") {
;(acc as any)[camelCase(key) + "StreamQuery"] = Object.assign(useStreamQuery(client[key] as any), {
id: client[key].id
})
}
;(acc as any)[camelCase(key) + "Query"] = Object.assign(useQuery(client[key] as any), {
id: client[key].id
})
;(acc as any)[camelCase(key) + "SuspenseQuery"] = Object.assign(useSuspenseQuery(client[key] as any), {
id: client[key].id
})
return acc
},
{} as
Expand All @@ -647,6 +694,12 @@ export const makeClient = <RT_, RTHooks>(
QueryHandler<typeof client[Key]>
>["suspense"]
}
& {
[
Key in keyof typeof client as StreamHandler<typeof client[Key]> extends never ? never
: `${ToCamel<string & Key>}StreamQuery`
]: StreamQueries<RT, StreamHandler<typeof client[Key]>>["streamQuery"]
}
)
return queries
}
Expand Down Expand Up @@ -786,6 +839,12 @@ export const makeClient = <RT_, RTHooks>(
}
}
}
: requestType === "stream"
? {
...client[key],
request: h_,
streamQuery: useStreamQuery(client[key] as any)
}
: {
mutate: ((handler: any) => {
const fromRequestConfig = client[key].Request.config?.["invalidatesQueries"] as
Expand Down Expand Up @@ -840,6 +899,8 @@ export const makeClient = <RT_, RTHooks>(
& QueryRequestWithExtensions<QueryHandler<typeof client[Key]>>
& Queries<RT, QueryHandler<typeof client[Key]>>
& QueryProjection<RT, QueryHandler<typeof client[Key]>>)
& (StreamHandler<typeof client[Key]> extends never ? {}
: StreamQueries<RT, StreamHandler<typeof client[Key]>>)
& (CommandHandler<typeof client[Key]> extends never ? {}
: CommandRequestWithExtensions<RT | RTHooks, CommandHandler<typeof client[Key]>>)
& (CommandHandler<typeof client[Key]> extends never ? {}
Expand Down
172 changes: 148 additions & 24 deletions packages/vue/src/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@
/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-unsafe-return */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { type DefaultError, type Enabled, type InitialDataFunction, type NonUndefinedGuard, type PlaceholderDataFunction, type QueryKey, type QueryObserverOptions, type QueryObserverResult, type RefetchOptions, useQuery as useTanstackQuery, useQueryClient, type UseQueryDefinedReturnType, type UseQueryReturnType } from "@tanstack/vue-query"
import { Array, Cause, type Context, Effect, Option, S } from "effect-app"
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
import { type DefaultError, type Enabled, experimental_streamedQuery as streamedQuery, type InitialDataFunction, type NonUndefinedGuard, type PlaceholderDataFunction, type QueryKey, type QueryObserverOptions, type QueryObserverResult, type RefetchOptions, useQuery as useTanstackQuery, useQueryClient, type UseQueryDefinedReturnType, type UseQueryReturnType } from "@tanstack/vue-query"
import { Array, Cause, type Context, Effect, Exit, Option, S } from "effect-app"
import { makeQueryKey, type Req } from "effect-app/client"
import type { RequestHandler, RequestHandlerWithInput } from "effect-app/client/clientFor"
import type { RequestHandler, RequestHandlerWithInput, RequestStreamHandler, RequestStreamHandlerWithInput } from "effect-app/client/clientFor"
import { CauseException, ServiceUnavailableError } from "effect-app/client/errors"
import * as Channel from "effect/Channel"
import * as Pull from "effect/Pull"
import * as Scope from "effect/Scope"
import type * as Stream from "effect/Stream"
import { type Span } from "effect/Tracer"
import { isHttpClientError } from "effect/unstable/http/HttpClientError"
import * as AsyncResult from "effect/unstable/reactivity/AsyncResult"
Expand Down Expand Up @@ -75,6 +80,64 @@ export interface CustomDefinedPlaceholderQueryOptions<
| PlaceholderDataFunction<NonFunctionGuard<TQueryData>, TError, NonFunctionGuard<TQueryData>, TQueryKey>
}

function swrToQuery<E, A>(r: {
error: CauseException<E> | undefined
data: A | undefined
isValidating: boolean
}): AsyncResult.AsyncResult<A, E> {
if (r.error !== undefined) {
return AsyncResult.failureWithPrevious(
r.error.originalCause,
{
previous: r.data === undefined ? Option.none() : Option.some(AsyncResult.success(r.data)),
waiting: r.isValidating
}
)
}
if (r.data !== undefined) {
return AsyncResult.success<A, E>(r.data, { waiting: r.isValidating })
}

return AsyncResult.initial(r.isValidating)
}

function streamToAsyncIterableWithCauseException<A, E, R>(
self: Stream.Stream<A, E, R>,
context: Context.Context<R>,
id: string
): AsyncIterable<A> {
return {
[Symbol.asyncIterator]() {
const runPromise = Effect.runPromiseWith(context)
const runPromiseExit = Effect.runPromiseExitWith(context)
const scope = Scope.makeUnsafe()
let pull: any
let currentIter: Iterator<A> | undefined
return {
async next(): Promise<IteratorResult<A>> {
if (currentIter) {
const next = currentIter.next()
if (!next.done) return next
currentIter = undefined
}
pull ??= await runPromise(Channel.toPullScoped((self as any).channel, scope))
const exit = await runPromiseExit(pull)
if (Exit.isSuccess(exit)) {
currentIter = (exit.value as any)[Symbol.iterator]()
return currentIter!.next()
} else if (Pull.isDoneCause((exit as any).cause)) {
return { done: true, value: undefined }
}
throw new CauseException((exit as any).cause, id)
},
return(_) {
return runPromise(Effect.as(Scope.close(scope, Exit.void), { done: true, value: undefined }) as any)
}
}
}
}
}

export const makeQuery = <R>(getRuntime: () => Context.Context<R>) => {
const useQuery_: {
<I, A, E, Request extends Req, Name extends string>(
Expand Down Expand Up @@ -228,27 +291,6 @@ export const makeQuery = <R>(getRuntime: () => Context.Context<R>) => {
] as any
}

function swrToQuery<E, A>(r: {
error: CauseException<E> | undefined
data: A | undefined
isValidating: boolean
}): AsyncResult.AsyncResult<A, E> {
if (r.error !== undefined) {
return AsyncResult.failureWithPrevious(
r.error.originalCause,
{
previous: r.data === undefined ? Option.none() : Option.some(AsyncResult.success(r.data)),
waiting: r.isValidating
}
)
}
if (r.data !== undefined) {
return AsyncResult.success<A, E>(r.data, { waiting: r.isValidating })
}

return AsyncResult.initial(r.isValidating)
}

const useQuery: {
/**
* Effect results are passed to the caller, including errors.
Expand Down Expand Up @@ -351,6 +393,88 @@ export const makeQuery = <R>(getRuntime: () => Context.Context<R>) => {
// eslint-disable-next-line @typescript-eslint/no-empty-object-type
export interface MakeQuery2<R> extends ReturnType<typeof makeQuery<R>> {}

type StreamQueryResult<A, E> = readonly [
ComputedRef<AsyncResult.AsyncResult<A[], E>>,
ComputedRef<A[] | undefined>,
(options?: RefetchOptions) => Effect.Effect<QueryObserverResult<A[], CauseException<E>>, never, never>,
UseQueryReturnType<any, any>
]

export const makeStreamQuery = <R>(getRuntime: () => Context.Context<R>) => {
const streamQuery_: {
<E, A, Request extends Req, Name extends string>(
q: RequestStreamHandler<A, E, R, Request, Name>
): () => StreamQueryResult<A, E>
<Arg, E, A, Request extends Req, Name extends string>(
q: RequestStreamHandlerWithInput<Arg, A, E, R, Request, Name>
): (arg: Arg | WatchSource<Arg>) => StreamQueryResult<A, E>
} = (q: any) => (arg?: any) => {
const context = getRuntime()
const arr = arg
const req: { value: any } = !arg
? undefined as any
: typeof arr === "function"
? ({
get value() {
return arr()
}
})
: ref(arg)
const queryKey = makeQueryKey(q)
const handler = q.handler
const isWithInput = typeof handler === "function"

const r = useTanstackQuery<any[], CauseException<any>, any[]>(
{
throwOnError: false,
retry: (retryCount: number, error: unknown) => {
if (error instanceof CauseException) {
if (!isHttpClientError(error.cause) && !S.is(ServiceUnavailableError)(error.cause)) {
return false
}
}
return retryCount < 5
},
queryKey: isWithInput ? [...queryKey, req] : queryKey,
queryFn: streamedQuery({
streamFn: () => {
const stream = isWithInput
? handler(req.value)
: handler
return streamToAsyncIterableWithCauseException(stream, context, q.id)
}
})
}
)

const latestSuccess = shallowRef<any[]>()
const result = computed((): AsyncResult.AsyncResult<any[], any> =>
swrToQuery({
error: r.error.value ?? undefined,
data: r.data.value === undefined ? latestSuccess.value : r.data.value,
isValidating: r.isFetching.value
})
)
watch(result, (value) => latestSuccess.value = Option.getOrUndefined(AsyncResult.value(value)), { immediate: true })

return [
result,
computed(() => latestSuccess.value),
(options?: RefetchOptions) =>
Effect.currentSpan.pipe(
Effect.orElseSucceed(() => null),
Effect.flatMap((span) => Effect.promise(() => r.refetch({ ...options, updateMeta: { span } })))
),
r
] as any
}

return streamQuery_
}

// eslint-disable-next-line @typescript-eslint/no-empty-object-type
export interface MakeStreamQuery2<R> extends ReturnType<typeof makeStreamQuery<R>> {}

function orPrevious<E, A>(result: AsyncResult.AsyncResult<A, E>) {
return AsyncResult.isFailure(result) && Option.isSome(result.previousSuccess)
? AsyncResult.success(result.previousSuccess.value, { waiting: result.waiting })
Expand Down
Loading