effect-io-ai

Package: @effect/workflow
Module: DurableQueue

DurableQueue.make

A DurableQueue wraps a PersistedQueue, providing a way to wait for items to finish processing using a DurableDeferred.

import { DurableQueue, Workflow } from "@effect/workflow"
import { Effect, Schema } from "effect"

// Define a DurableQueue that can be used to derive workers and offer items for
// processing.
const ApiQueue = DurableQueue.make({
  name: "ApiQueue",
  payload: {
    id: Schema.String
  },
  success: Schema.Void,
  error: Schema.Never,
  idempotencyKey(payload) {
    return payload.id
  }
})

const MyWorkflow = Workflow.make({
  name: "MyWorkflow",
  payload: {
    id: Schema.String
  },
  idempotencyKey: ({ id }) => id
})

const MyWorkflowLayer = MyWorkflow.toLayer(
  Effect.fn(function*() {
    // Add an item to the DurableQueue defined above.
    //
    // When the worker has finished processing the item, the workflow will
    // resume.
    //
    yield* DurableQueue.process(ApiQueue, { id: "api-call-1" })

    yield* Effect.log("Workflow succeeded!")
  })
)

// Define a worker layer that can process items from the DurableQueue.
const ApiWorker = DurableQueue.worker(
  ApiQueue,
  Effect.fn(function*({ id }) {
    yield* Effect.log(`Worker processing API call with id: ${id}`)
  }),
  { concurrency: 5 } // Process up to 5 items concurrently
)

Signature

declare const make: <Payload extends Schema.Schema.Any | Schema.Struct.Fields, Success extends Schema.Schema.Any = typeof Schema.Void, Error extends Schema.Schema.All = typeof Schema.Never>(options: { readonly name: string; readonly payload: Payload; readonly idempotencyKey: (payload: Payload extends Schema.Struct.Fields ? Schema.Struct<Payload>["Type"] : Payload["Type"]) => string; readonly success?: Success | undefined; readonly error?: Error | undefined; }) => DurableQueue<Payload extends Schema.Struct.Fields ? Schema.Struct<Payload> : Payload, Success, Error>

Source

Since v1.0.0