Pipelines

A pipeline is a manifest of topics and processing steps.

The example manifest below defines a pipeline with 1 step. It receives messages via "input-topic", processes the messages with the ai-chat-completions agent, and sends the processed messages to "output-topic".

topics:
  - name: "input-topic"
    creation-mode: create-if-not-exists
  - name: "output-topic"
    creation-mode: create-if-not-exists
errors:
  on-failure: skip
pipeline:
  - name: "ai-chat-completions"
    type: "ai-chat-completions"
    input: "input-topic"
    configuration:
      model: "gpt-3.5-turbo"
      completion-field: "value"
      messages:
        - role: user
          content: "What can you tell me about {{ value}} ?"

Pipeline configuration

PropertyDescriptionType

module

String

The module reference for this pipeline. If not specified, a default module will be used.

id

String

Unique id of the pipeline. If not specified, it will be computed automatically.

name

String

The name of the pipeline.

topics

object[]

A collection of topics that will be bound to the application lifecycle, and used to transport data between steps. See Topics for more details

errors

String

Behavior followed when errors occur. See Error Handling.

assets

object[]

A collection of assets that will be bound to the application lifecycle. See Assets for more details

resources

object

Resources configuration for the pipeline agents.

pipeline

object[] (Required)

Pipeline agents configuration.

Pipeline agents configuration

Inside the pipeline property, you must specify a list of agents.

Each agent can be configured with the following properties.

NameTypeDescription

name

String (required)

Agent name

id

String (required)

type

String (required)

The type name of processing to be run. See AI Actions for supported types.

input

String

Reference to the topic name

output

String

Reference to the topic name

configuration

object

Given the chosen type, these are the config values used. Refer to the configuration area of each type for more info.

Agent resources

When deploying a pipeline, some agents might require additional CPU or memory to run properly. Depending on the traffic load and the agent architecture, you might need to scale the pipeline vertically or horizontally, or both.

The resources property allows you to specify the size (scale vertically) and the parallelism (scale horizontally) for the pipeline.

topics:
  ...

resources:
  parallelism: 3
  size: 2
  
pipeline:
  ...

Setting parallelism to 3 will deploy the pipeline with 3 different replicas. Each replica will use the configured size for CPU and memory.

The size parameter describes both CPU and memory. The value is a multiplier factor that is computed at runtime, starting from a base cpu/memory value. For example, if the default value for the memory - for a single replica - is 512 MB, specifying size: 2 will make the pipeline to use 1024MB per-replica.

The requested resources are implemented by using Kubernetes limits. Be aware that if an agent tries to use more memory than the requested, the pipeline will fail with OOMKilled error.

Multiple manifests in one application

When you place multiple yaml manifests in the same "application" folder, LangStream will create a single application with multiple modules. Each module is a pipeline of agent steps.

|- project-folder
    |- application
        |- pipeline.yaml
        |- second-pipeline.yaml
        |- gateways.yaml
        |- configuration.yaml

In this structure, both pipeline.yaml and second-pipeline.yaml will be created as a single application, with a separate module for each pipeline.

See the webcrawler pipeline agent and its source code for an example.

Last updated