Class Flow<I,O,S>

java.lang.Object
com.google.genkit.core.Flow<I,O,S>
Type Parameters:
I - The input type for the flow
O - The output type for the flow
S - The streaming chunk type (use Void for non-streaming flows)
All Implemented Interfaces:
Action<I,O,S>, Registerable

public class Flow<I,O,S> extends Object implements Action<I,O,S>
A Flow is a user-defined Action. It represents a function from input I to output O. The Stream parameter S is for flows that support streaming their results incrementally.

Flows are the primary way to organize AI application logic in Genkit. They provide:

  • Observability through automatic tracing
  • Integration with Genkit developer tools
  • Easy deployment as API endpoints
  • Built-in streaming support
  • Method Details

    • define

      public static <I, O> Flow<I,O,Void> define(Registry registry, String name, Class<I> inputClass, Class<O> outputClass, BiFunction<ActionContext,I,O> fn)
      Defines a new non-streaming flow and registers it.
      Type Parameters:
      I - input type
      O - output type
      Parameters:
      registry - the registry to register with
      name - the flow name
      inputClass - the input type class
      outputClass - the output type class
      fn - the flow function
      Returns:
      the created flow
    • define

      public static <I, O> Flow<I,O,Void> define(Registry registry, String name, Class<I> inputClass, Class<O> outputClass, BiFunction<ActionContext,I,O> fn, List<Middleware<I,O>> middleware)
      Defines a new non-streaming flow with middleware and registers it.
      Type Parameters:
      I - input type
      O - output type
      Parameters:
      registry - the registry to register with
      name - the flow name
      inputClass - the input type class
      outputClass - the output type class
      fn - the flow function
      middleware - the middleware to apply
      Returns:
      the created flow
    • defineStreaming

      public static <I, O, S> Flow<I,O,S> defineStreaming(Registry registry, String name, Class<I> inputClass, Class<O> outputClass, ActionDef.StreamingFunction<I,O,S> fn)
      Defines a new streaming flow and registers it.
      Type Parameters:
      I - input type
      O - output type
      S - stream chunk type
      Parameters:
      registry - the registry to register with
      name - the flow name
      inputClass - the input type class
      outputClass - the output type class
      fn - the streaming flow function
      Returns:
      the created flow
    • run

      public static <T> T run(ActionContext ctx, String name, Function<Void,T> fn) throws GenkitException
      Runs a named step within the current flow. Each call to run results in a new step with its own trace span.
      Type Parameters:
      T - the step output type
      Parameters:
      ctx - the action context (must be a flow context)
      name - the step name
      fn - the step function
      Returns:
      the step result
      Throws:
      GenkitException - if not called from within a flow
    • getName

      public String getName()
      Description copied from interface: Action
      Returns the name of the action.
      Specified by:
      getName in interface Action<I,O,S>
      Returns:
      the action name
    • getType

      public ActionType getType()
      Description copied from interface: Action
      Returns the type of the action.
      Specified by:
      getType in interface Action<I,O,S>
      Returns:
      the action type
    • getDesc

      public ActionDesc getDesc()
      Description copied from interface: Action
      Returns the descriptor of the action containing metadata, schemas, etc.
      Specified by:
      getDesc in interface Action<I,O,S>
      Returns:
      the action descriptor
    • run

      public O run(ActionContext ctx, I input) throws GenkitException
      Description copied from interface: Action
      Runs the action with the given input.
      Specified by:
      run in interface Action<I,O,S>
      Parameters:
      ctx - the action context
      input - the input to the action
      Returns:
      the output of the action
      Throws:
      GenkitException - if the action fails
    • run

      public O run(ActionContext ctx, I input, Consumer<S> streamCallback) throws GenkitException
      Description copied from interface: Action
      Runs the action with the given input and streaming callback.
      Specified by:
      run in interface Action<I,O,S>
      Parameters:
      ctx - the action context
      input - the input to the action
      streamCallback - callback for receiving streaming chunks, may be null
      Returns:
      the output of the action
      Throws:
      GenkitException - if the action fails
    • runJson

      public com.fasterxml.jackson.databind.JsonNode runJson(ActionContext ctx, com.fasterxml.jackson.databind.JsonNode input, Consumer<com.fasterxml.jackson.databind.JsonNode> streamCallback) throws GenkitException
      Description copied from interface: Action
      Runs the action with JSON input and returns JSON output.
      Specified by:
      runJson in interface Action<I,O,S>
      Parameters:
      ctx - the action context
      input - the JSON input
      streamCallback - callback for receiving streaming JSON chunks, may be null
      Returns:
      the JSON output
      Throws:
      GenkitException - if the action fails
    • runJsonWithTelemetry

      public ActionRunResult<com.fasterxml.jackson.databind.JsonNode> runJsonWithTelemetry(ActionContext ctx, com.fasterxml.jackson.databind.JsonNode input, Consumer<com.fasterxml.jackson.databind.JsonNode> streamCallback) throws GenkitException
      Description copied from interface: Action
      Runs the action with JSON input and returns the result with telemetry information.
      Specified by:
      runJsonWithTelemetry in interface Action<I,O,S>
      Parameters:
      ctx - the action context
      input - the JSON input
      streamCallback - callback for receiving streaming JSON chunks, may be null
      Returns:
      the action result including telemetry data
      Throws:
      GenkitException - if the action fails
    • getInputSchema

      public Map<String,Object> getInputSchema()
      Description copied from interface: Action
      Returns the JSON schema for the action's input type.
      Specified by:
      getInputSchema in interface Action<I,O,S>
      Returns:
      the input schema as a map, or null if not defined
    • getOutputSchema

      public Map<String,Object> getOutputSchema()
      Description copied from interface: Action
      Returns the JSON schema for the action's output type.
      Specified by:
      getOutputSchema in interface Action<I,O,S>
      Returns:
      the output schema as a map, or null if not defined
    • getMetadata

      public Map<String,Object> getMetadata()
      Description copied from interface: Action
      Returns additional metadata for the action.
      Specified by:
      getMetadata in interface Action<I,O,S>
      Returns:
      the metadata map
    • register

      public void register(Registry registry)
      Description copied from interface: Registerable
      Registers this primitive with the given registry.
      Specified by:
      register in interface Registerable
      Parameters:
      registry - the registry to register with
    • getMiddlewareChain

      public MiddlewareChain<I,O> getMiddlewareChain()
      Returns the middleware chain for this flow.
      Returns:
      the middleware chain
    • withMiddleware

      public Flow<I,O,S> withMiddleware(Middleware<I,O> middleware)
      Creates a copy of this flow with additional middleware.
      Parameters:
      middleware - the middleware to add
      Returns:
      a new flow with the middleware added
    • withMiddleware

      public Flow<I,O,S> withMiddleware(List<Middleware<I,O>> middlewareList)
      Creates a copy of this flow with additional middleware.
      Parameters:
      middlewareList - the middleware to add
      Returns:
      a new flow with the middleware added
    • stream

      public void stream(ActionContext ctx, I input, Consumer<Flow.StreamingFlowValue<O,S>> consumer)
      Streams the flow output with the given input. Returns a consumer that can be used with a yield-style iteration pattern.
      Parameters:
      ctx - the action context
      input - the flow input
      consumer - the consumer for streaming values