Package com.google.genkit.core
Class Flow<I,O,S>
java.lang.Object
com.google.genkit.core.Flow<I,O,S>
- Type Parameters:
I- The input type for the flowO- The output type for the flowS- The streaming chunk type (use Void for non-streaming flows)
- All Implemented Interfaces:
Action<I,,O, S> Registerable
A Flow is a user-defined Action. It represents a function from input I to
output O. The Stream parameter S is for flows that support streaming their
results incrementally.
Flows are the primary way to organize AI application logic in Genkit. They provide:
- Observability through automatic tracing
- Integration with Genkit developer tools
- Easy deployment as API endpoints
- Built-in streaming support
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classStreamingFlowValue represents either a streamed chunk or the final output of a flow. -
Method Summary
Modifier and TypeMethodDescriptiondefine(Registry registry, String name, Class<I> inputClass, Class<O> outputClass, BiFunction<ActionContext, I, O> fn) Defines a new non-streaming flow and registers it.define(Registry registry, String name, Class<I> inputClass, Class<O> outputClass, BiFunction<ActionContext, I, O> fn, List<Middleware<I, O>> middleware) Defines a new non-streaming flow with middleware and registers it.static <I,O, S> Flow <I, O, S> defineStreaming(Registry registry, String name, Class<I> inputClass, Class<O> outputClass, ActionDef.StreamingFunction<I, O, S> fn) Defines a new streaming flow and registers it.getDesc()Returns the descriptor of the action containing metadata, schemas, etc.Returns the JSON schema for the action's input type.Returns additional metadata for the action.Returns the middleware chain for this flow.getName()Returns the name of the action.Returns the JSON schema for the action's output type.getType()Returns the type of the action.voidRegisters this primitive with the given registry.run(ActionContext ctx, I input) Runs the action with the given input.run(ActionContext ctx, I input, Consumer<S> streamCallback) Runs the action with the given input and streaming callback.static <T> Trun(ActionContext ctx, String name, Function<Void, T> fn) Runs a named step within the current flow.com.fasterxml.jackson.databind.JsonNoderunJson(ActionContext ctx, com.fasterxml.jackson.databind.JsonNode input, Consumer<com.fasterxml.jackson.databind.JsonNode> streamCallback) Runs the action with JSON input and returns JSON output.ActionRunResult<com.fasterxml.jackson.databind.JsonNode> runJsonWithTelemetry(ActionContext ctx, com.fasterxml.jackson.databind.JsonNode input, Consumer<com.fasterxml.jackson.databind.JsonNode> streamCallback) Runs the action with JSON input and returns the result with telemetry information.voidstream(ActionContext ctx, I input, Consumer<Flow.StreamingFlowValue<O, S>> consumer) Streams the flow output with the given input.withMiddleware(Middleware<I, O> middleware) Creates a copy of this flow with additional middleware.withMiddleware(List<Middleware<I, O>> middlewareList) Creates a copy of this flow with additional middleware.
-
Method Details
-
define
public static <I,O> Flow<I,O, defineVoid> (Registry registry, String name, Class<I> inputClass, Class<O> outputClass, BiFunction<ActionContext, I, O> fn) Defines a new non-streaming flow and registers it.- Type Parameters:
I- input typeO- output type- Parameters:
registry- the registry to register withname- the flow nameinputClass- the input type classoutputClass- the output type classfn- the flow function- Returns:
- the created flow
-
define
public static <I,O> Flow<I,O, defineVoid> (Registry registry, String name, Class<I> inputClass, Class<O> outputClass, BiFunction<ActionContext, I, O> fn, List<Middleware<I, O>> middleware) Defines a new non-streaming flow with middleware and registers it.- Type Parameters:
I- input typeO- output type- Parameters:
registry- the registry to register withname- the flow nameinputClass- the input type classoutputClass- the output type classfn- the flow functionmiddleware- the middleware to apply- Returns:
- the created flow
-
defineStreaming
public static <I,O, Flow<I,S> O, defineStreamingS> (Registry registry, String name, Class<I> inputClass, Class<O> outputClass, ActionDef.StreamingFunction<I, O, S> fn) Defines a new streaming flow and registers it.- Type Parameters:
I- input typeO- output typeS- stream chunk type- Parameters:
registry- the registry to register withname- the flow nameinputClass- the input type classoutputClass- the output type classfn- the streaming flow function- Returns:
- the created flow
-
run
Runs a named step within the current flow. Each call to run results in a new step with its own trace span.- Type Parameters:
T- the step output type- Parameters:
ctx- the action context (must be a flow context)name- the step namefn- the step function- Returns:
- the step result
- Throws:
GenkitException- if not called from within a flow
-
getName
Description copied from interface:ActionReturns the name of the action. -
getType
Description copied from interface:ActionReturns the type of the action. -
getDesc
Description copied from interface:ActionReturns the descriptor of the action containing metadata, schemas, etc. -
run
Description copied from interface:ActionRuns the action with the given input. -
run
Description copied from interface:ActionRuns the action with the given input and streaming callback. -
runJson
public com.fasterxml.jackson.databind.JsonNode runJson(ActionContext ctx, com.fasterxml.jackson.databind.JsonNode input, Consumer<com.fasterxml.jackson.databind.JsonNode> streamCallback) throws GenkitException Description copied from interface:ActionRuns the action with JSON input and returns JSON output. -
runJsonWithTelemetry
public ActionRunResult<com.fasterxml.jackson.databind.JsonNode> runJsonWithTelemetry(ActionContext ctx, com.fasterxml.jackson.databind.JsonNode input, Consumer<com.fasterxml.jackson.databind.JsonNode> streamCallback) throws GenkitException Description copied from interface:ActionRuns the action with JSON input and returns the result with telemetry information.- Specified by:
runJsonWithTelemetryin interfaceAction<I,O, S> - Parameters:
ctx- the action contextinput- the JSON inputstreamCallback- callback for receiving streaming JSON chunks, may be null- Returns:
- the action result including telemetry data
- Throws:
GenkitException- if the action fails
-
getInputSchema
Description copied from interface:ActionReturns the JSON schema for the action's input type.- Specified by:
getInputSchemain interfaceAction<I,O, S> - Returns:
- the input schema as a map, or null if not defined
-
getOutputSchema
Description copied from interface:ActionReturns the JSON schema for the action's output type.- Specified by:
getOutputSchemain interfaceAction<I,O, S> - Returns:
- the output schema as a map, or null if not defined
-
getMetadata
Description copied from interface:ActionReturns additional metadata for the action.- Specified by:
getMetadatain interfaceAction<I,O, S> - Returns:
- the metadata map
-
register
Description copied from interface:RegisterableRegisters this primitive with the given registry.- Specified by:
registerin interfaceRegisterable- Parameters:
registry- the registry to register with
-
getMiddlewareChain
Returns the middleware chain for this flow.- Returns:
- the middleware chain
-
withMiddleware
Creates a copy of this flow with additional middleware.- Parameters:
middleware- the middleware to add- Returns:
- a new flow with the middleware added
-
withMiddleware
Creates a copy of this flow with additional middleware.- Parameters:
middlewareList- the middleware to add- Returns:
- a new flow with the middleware added
-
stream
Streams the flow output with the given input. Returns a consumer that can be used with a yield-style iteration pattern.- Parameters:
ctx- the action contextinput- the flow inputconsumer- the consumer for streaming values
-