Agents
LangStream Version: 0.6.2
Compute chat completions (ai-chat-completions
)
ai-chat-completions
)Sends the messages to the AI Service to compute chat completions. The result is stored in the specified field.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| In case of multiple AI services configured, specify the id of the AI service to use. | string | ||
| Field to use to store the completion results in the output topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field. | string | ||
| Whether this step can be composed with other steps. | boolean | true | |
| Parameter for the completion request. The parameters are passed to the AI Service as is. | number | ||
| Field to use to store the log of the completion results in the output topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field. The log contains useful information for debugging the completion prompts. | string | ||
| Parameter for the completion request. The parameters are passed to the AI Service as is. | object | ||
| Parameter for the completion request. The parameters are passed to the AI Service as is. | integer | ||
| Messages to use for chat completions. You can use the Mustache syntax. | ✓ | ||
| Minimum number of chunks to send to the stream-to-topic topic. The chunks are sent as soon as they are available. The chunks are sent in the order they are received from the AI Service. To improve the TTFB (Time-To-First-Byte), the chunk size starts from 1 and doubles until it reaches the max-chunks-per-message value. | integer | 20 | |
| The model to use for chat completions. The model must be available in the AI Service. | string | ✓ | |
| Additional options for the model configuration. The structure depends on the model and AI provider. | object | ||
| Parameter for the completion request. The parameters are passed to the AI Service as is. | number | ||
| Parameter for the completion request. The parameters are passed to the AI Service as is. | array of string | ||
| Enable streaming of the results. Use in conjunction with the stream-to-topic parameter. | boolean | true | |
| Field to use to store the completion results in the stream-to-topic topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field. | string | ||
| Enable streaming of the results. If enabled, the results are streamed to the specified topic in small chunks. The entire messages will be sent to the output topic instead. | string | ||
| Parameter for the completion request. The parameters are passed to the AI Service as is. | number | ||
| Parameter for the completion request. The parameters are passed to the AI Service as is. | number | ||
| Parameter for the completion request. The parameters are passed to the AI Service as is. | string | ||
| Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'" | string |
Compute chat completions (ai-chat-completions
).messages
ai-chat-completions
).messagesDescription | Type | Required | Default Value | |
---|---|---|---|---|
| Role of the message. The role is used to identify the speaker in the chat. | string | ||
| Content of the message. You can use the Mustache syntax. | string | ✓ |
Compute text completions (ai-text-completions
)
ai-text-completions
)Sends the text to the AI Service to compute text completions. The result is stored in the specified field.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| In case of multiple AI services configured, specify the id of the AI service to use. | string | ||
| Field to use to store the completion results in the output topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field. | string | ||
| Whether this step can be composed with other steps. | boolean | true | |
| Parameter for the completion request. The parameters are passed to the AI Service as is. | number | ||
| Field to use to store the log of the completion results in the output topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field. The log contains useful information for debugging the completion prompts. | string | ||
| Parameter for the completion request. The parameters are passed to the AI Service as is. | object | ||
| Logprobs parameter (only valid for OpenAI). | string | ||
| Log probabilities to a field. | string | ||
| Parameter for the completion request. The parameters are passed to the AI Service as is. | integer | ||
| Minimum number of chunks to send to the stream-to-topic topic. The chunks are sent as soon as they are available. The chunks are sent in the order they are received from the AI Service. To improve the TTFB (Time-To-First-Byte), the chunk size starts from 1 and doubles until it reaches the max-chunks-per-message value. | integer | 20 | |
| The model to use for text completions. The model must be available in the AI Service. | string | ✓ | |
| Additional options for the model configuration. The structure depends on the model and AI provider. | object | ||
| Parameter for the completion request. The parameters are passed to the AI Service as is. | number | ||
| Prompt to use for text completions. You can use the Mustache syntax. | array of string | ✓ | |
| Parameter for the completion request. The parameters are passed to the AI Service as is. | array of string | ||
| Enable streaming of the results. Use in conjunction with the stream-to-topic parameter. | boolean | true | |
| Field to use to store the completion results in the stream-to-topic topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field. | string | ||
| Enable streaming of the results. If enabled, the results are streamed to the specified topic in small chunks. The entire messages will be sent to the output topic instead. | string | ||
| Parameter for the completion request. The parameters are passed to the AI Service as is. | number | ||
| Parameter for the completion request. The parameters are passed to the AI Service as is. | number | ||
| Parameter for the completion request. The parameters are passed to the AI Service as is. | string | ||
| Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'" | string |
Azure Blob Storage Source (azure-blob-storage-source
)
azure-blob-storage-source
)Reads data from Azure blobs. There are three supported ways to authenticate: - SAS token - Storage account name and key - Storage account connection string
Description | Type | Required | Default Value | |
---|---|---|---|---|
| The name of the Azure econtainer to read from. | string | langstream-azure-source | |
| Endpoint to connect to. Usually it's https://<storage-account>.blob.core.windows.net. | string | ✓ | |
| Comma separated list of file extensions to filter by. | string | pdf,docx,html,htm,md,txt | |
| Time in seconds to sleep after polling for new files. | integer | 5 | |
| Azure SAS token. If not provided, storage account name and key must be provided. | string | ||
| Azure storage account connection string. If not provided, SAS token must be provided. | string | ||
| Azure storage account key. If not provided, SAS token must be provided. | string | ||
| Azure storage account name. If not provided, SAS token must be provided. | string |
Apache Camel Source (camel-source
)
camel-source
)Use Apache Camel components as Source
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Additional parmaters to pass to the Camel component in the query string format. The values are automatically encoded | object | ||
| The Camel URI of the component to use as Source. | string | ✓ | |
| Header to use as key of the record | string | ||
| Maximum number of records to buffer | integer | 100 |
Cast record to another schema (cast
)
cast
)Transforms the data to a target compatible schema. Some step operations like cast or compute involve conversions from a type to another. When this happens the rules are: - timestamp, date and time related object conversions assume UTC time zone if it is not explicit. - date and time related object conversions to/from STRING use the RFC3339 format. - timestamp related object conversions to/from LONG and DOUBLE are done using the number of milliseconds since EPOCH (1970-01-01T00:00:00Z). - date related object conversions to/from INTEGER, LONG, FLOAT and DOUBLE are done using the number of days since EPOCH (1970-01-01). - time related object conversions to/from INTEGER, LONG and DOUBLE are done using the number of milliseconds since midnight (00:00:00).
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Whether this step can be composed with other steps. | boolean | true | |
| When used with KeyValue data, defines if the transformation is done on the key or on the value. If empty, the transformation applies to both the key and the value. | string | ||
| The target schema type. | string | ✓ | |
| Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'" | string |
Compute values from the record (compute
)
compute
)Computes new properties, values or field values based on an expression evaluated at runtime. If the field already exists, it will be overwritten.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Whether this step can be composed with other steps. | boolean | true | |
| An array of objects describing how to calculate the field values | ✓ | ||
| Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'" | string |
Compute values from the record (compute
).fields
compute
).fieldsDescription | Type | Required | Default Value | |
---|---|---|---|---|
| It is evaluated at runtime and the result of the evaluation is assigned to the field. Refer to the language expression documentation for more information on the expression syntax. | string | ✓ | |
| The name of the field to be computed. Prefix with key. or value. to compute the fields in the key or value parts of the message. In addition, you can compute values on the following message headers [destinationTopic, messageKey, properties.]. Please note that properties is a map of key/value pairs that are referenced by the dot notation, for example properties.key0. | string | ✓ | |
| If true, it marks the field as optional in the schema of the transformed message. This is useful when null is a possible value of the compute expression. | boolean | false | |
| The type of the computed field. This will translate to the schema type of the new field in the transformed message. The following types are currently supported :STRING, INT8, INT16, INT32, INT64, FLOAT, DOUBLE, BOOLEAN, DATE, TIME, TIMESTAMP, LOCAL_DATE_TIME, LOCAL_TIME, LOCAL_DATE, INSTANT. The type field is not required for the message headers [destinationTopic, messageKey, properties.] and STRING will be used. For the value and key, if it is not provided, then the type will be inferred from the result of the expression evaluation. | string |
Compute embeddings of the record (compute-ai-embeddings
)
compute-ai-embeddings
)Compute embeddings of the record. The embeddings are stored in the record under a specific field.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| In case of multiple AI services configured, specify the id of the AI service to use. | string | ||
| Additional arguments to pass to the AI Service. (HuggingFace only) | object | ||
| Batch size for submitting the embeddings requests. | integer | 10 | |
| Whether this step can be composed with other steps. | boolean | true | |
| Max number of concurrent requests to the AI Service. | integer | 4 | |
| Field where to store the embeddings. | string | ✓ | |
| Flushing is disabled by default in order to avoid latency spikes. You should enable this feature in the case of background processing. | integer | 0 | |
| Execute the agent over a list of documents | string | ||
| Model to use for the embeddings. The model must be available in the configured AI Service. | string | text-embedding-ada-002 | |
| URL of the model to use. (HuggingFace only). The default is computed from the model: "djl://ai.djl.huggingface.pytorch/{model}" | string | ||
| Additional options to pass to the AI Service. (HuggingFace only) | object | ||
| Text to create embeddings from. You can use Mustache syntax to compose multiple fields into a single text. Example: text: "{{{ value.field1 }}} {{{ value.field2 }}}" | string | ✓ | |
| Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'" | string |
Dispatch agent (dispatch
)
dispatch
)Dispatches messages to different destinations based on conditions.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Routes. |
Dispatch agent (dispatch
).routes
dispatch
).routesDescription | Type | Required | Default Value | |
---|---|---|---|---|
| Destination of the message. | string | ||
| Action on the message. Possible values are "dispatch" or "drop". | string | dispatch | |
| Condition to activate the route. This is a standard EL expression. | string |
Document to JSON (document-to-json
)
document-to-json
)Convert raw text document to JSON. The result will be a JSON object with the text content in the specified field.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Whether to copy the message properties/headers in the output message. | boolean | true | |
| Field name to write the text content to. | string | text |
Drop the record (drop
)
drop
)Drops the record from further processing. Use in conjunction with when to selectively drop records.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Whether this step can be composed with other steps. | boolean | true | |
| Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'" | string |
Drop fields (drop-fields
)
drop-fields
)Drops the record fields.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Whether this step can be composed with other steps. | boolean | true | |
| Fields to drop from the input record. | array of string | ✓ | |
| Part to drop. (value or key) | string | ||
| Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'" | string |
Flare Controller (flare-controller
)
flare-controller
)Apply to the Flare pattern to enhance the quality of text completion results.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| The field that contains the logprobs of tokens returned by the ai-text-completion agent. | string | ✓ | |
| Name of the topic to forward the message in case of requesting more documents. | string | ✓ | |
| Name of the field to set in order to request the retrival of more documents. | string | ✓ | |
| The field that contains the list of tokens returned by the ai-text-completion agent. | string | ✓ |
Flatten record fields (flatten
)
flatten
)Converts structured nested data into a new single-hierarchy-level structured data. The names of the new fields are built by concatenating the intermediate level field names.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Whether this step can be composed with other steps. | boolean | true | |
| The delimiter to use when concatenating the field names. | string | _ | |
| When used with KeyValue data, defines if the transformation is done on the key or on the value. If empty, the transformation applies to both the key and the value. | string | ||
| Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'" | string |
Http Request (http-request
)
http-request
)Agent for enriching data with an HTTP request.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Whether or not to follow redirects. | boolean | true | |
| Body to send with the request. You can use the Mustache syntax to inject value from the context. | string | ||
| Whether or not to handle cookies during the redirects. | boolean | true | |
| Headers to send with the request. You can use the Mustache syntax to inject value from the context. | object | ||
| Http method to use for the request. | string | GET | |
| The field that will hold the results, it can be the same as "field" to override it. | string | ✓ | |
| Query string to append to the url. You can use the Mustache syntax to inject value from the context. Note that the values will be automatically escaped. | object | ||
| Url to send the request to. For adding query string parameters, use the `query-string` field. | string | ✓ |
Identity function (identity
)
identity
)Simple agent to move data from the input to the output. Could be used for testing or sample applications.
Invoke LangServe (langserve-invoke
)
langserve-invoke
)Agent for invoking LangServe based applications
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Whether or not to follow redirects. | boolean | true | |
| Field in the response that will be used as the content of the record. | string | content | |
| Field in the response that will be used as the content of the record. | boolean | ||
| Fields of the generated records. | |||
| Whether or not to handle cookies during the redirects. | boolean | true | |
| Headers to send with the request. You can use the Mustache syntax to inject value from the context. | object | ||
| Http method to use for the request. | string | POST | |
| Minimum number of chunks to send to the stream-to-topic topic. The chunks are sent as soon as they are available. The chunks are sent in the order they are received from the AI Service. To improve the TTFB (Time-To-First-Byte), the chunk size starts from 1 and doubles until it reaches the max-chunks-per-message value. | integer | 20 | |
| The field that will hold the results, it can be the same as "field" to override it. | string | ✓ | value |
| Field to use to store the completion results in the stream-to-topic topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field. | string | ||
| Enable streaming of the results. If enabled, the results are streamed to the specified topic in small chunks. The entire messages will be sent to the output topic instead. | string | ||
| Url to send the request to. For adding query string parameters, use the `query-string` field. | string | ✓ |
Invoke LangServe (langserve-invoke
).fields
langserve-invoke
).fieldsDescription | Type | Required | Default Value | |
---|---|---|---|---|
| Expression to compute the value of the field. This is a standard EL expression. | string | ✓ | |
| Name of the field like value.xx, key.xxx, properties.xxx | string | ✓ |
Language detector (language-detector
)
language-detector
)Detect the language of a message’s data and limit further processing based on language codes.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Define a list of allowed language codes. If the message language is not in this list, the message is dropped. | array of string | ||
| The name of the message header to write the language code to. | string | language |
Log an event (log-event
)
log-event
)Log a line in the agent logs when a record is received.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Fields to log. | |||
| Template for a log message to print (Mustache). | string | ||
| Condition to trigger the operation. This is a standard EL expression. | string | true |
Log an event (log-event
).fields
log-event
).fieldsDescription | Type | Required | Default Value | |
---|---|---|---|---|
| Expression to compute the value of the field. This is a standard EL expression. | string | ✓ | |
| Name of the field like value.xx, key.xxx, properties.xxx | string | ✓ |
Merge key-value format (merge-key-value
)
merge-key-value
)Merges the fields of KeyValue records where both the key and value are structured types of the same schema type. Only AVRO and JSON are supported.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Whether this step can be composed with other steps. | boolean | true | |
| Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'" | string |
Python custom processor (python-function
)
python-function
)Run a your own Python processor. All the configuration properties are available the class init method.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Python class name to instantiate. This class must be present in the application's "python" files. | string | ✓ |
Python custom processor (python-processor
)
python-processor
)Run a your own Python processor. All the configuration properties are available the class init method.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Python class name to instantiate. This class must be present in the application's "python" files. | string | ✓ |
Python custom service (python-service
)
python-service
)Run a your own Python service. All the configuration properties are available in the class init method.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Python class name to instantiate. This class must be present in the application's "python" files. | string | ✓ |
Python custom sink (python-sink
)
python-sink
)Run a your own Python sink. All the configuration properties are available in the class init method.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Python class name to instantiate. This class must be present in the application's "python" files. | string | ✓ |
Python custom source (python-source
)
python-source
)Run a your own Python source. All the configuration properties are available in the class init method.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Python class name to instantiate. This class must be present in the application's "python" files. | string | ✓ |
Query (query
)
query
)Perform a vector search or simple query against a datasource.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Whether this step can be composed with other steps. | boolean | true | |
| Reference to a datasource id configured in the application. | string | ✓ | |
| Fields of the record to use as input parameters for the query. | array of string | ||
| List of fields to use as generated keys. The generated keys are returned in the output, depending on the database. | array of string | ||
| Loop over a list of items taken from the record. For instance value.documents. It must refer to a list of maps. In this case the output-field parameter but be like "record.fieldname" in order to replace or set a field in each record with the results of the query. In the query parameters you can refer to the record fields using "record.field". | string | ||
| Execution mode: query or execute. In query mode, the query is executed and the results are returned. In execute mode, the query is executed and the result is the number of rows affected (depending on the database). | string | query | |
| If true, only the first result of the query is stored in the output field. | boolean | false | |
| The name of the field to use to store the query result. | string | ✓ | |
| The query to use to extract the data. | string | ✓ | |
| Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'" | string |
Query a vector database (query-vector-db
)
query-vector-db
)Query a vector database using Vector Search capabilities.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Whether this step can be composed with other steps. | boolean | true | |
| Reference to a datasource id configured in the application. | string | ✓ | |
| Fields of the record to use as input parameters for the query. | array of string | ||
| List of fields to use as generated keys. The generated keys are returned in the output, depending on the database. | array of string | ||
| Loop over a list of items taken from the record. For instance value.documents. It must refer to a list of maps. In this case the output-field parameter but be like "record.fieldname" in order to replace or set a field in each record with the results of the query. In the query parameters you can refer to the record fields using "record.field". | string | ||
| Execution mode: query or execute. In query mode, the query is executed and the results are returned. In execute mode, the query is executed and the result is the number of rows affected (depending on the database). | string | query | |
| If true, only the first result of the query is stored in the output field. | boolean | false | |
| The name of the field to use to store the query result. | string | ✓ | |
| The query to use to extract the data. | string | ✓ | |
| Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'" | string |
Re-rank (re-rank
)
re-rank
)Agent for re-ranking documents based on a query.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Algorithm to use for re-ranking. 'none' or 'MMR'. | string | none | |
| Parameter for B25 algorithm. | number | 0.75 | |
| Result field for the embeddings. | string | ||
| The field that contains the documents to sort. | string | ✓ | |
| Parameter for B25 algorithm. | number | 1.5 | |
| Parameter for MMR algorithm. | number | 0.5 | |
| Maximum number of documents to keep. | integer | 100 | |
| The field that will hold the results, it can be the same as "field" to override it. | string | ✓ | |
| Field that contains the embeddings of the documents to sort. | string | ||
| Field that already contains the text that has been embedded. | string | ||
| Result field for the text. | string |
S3 Source (s3-source
)
s3-source
)Reads data from S3 bucket
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Access key for the S3 server. | string | minioadmin | |
| The name of the bucket to read from. | string | langstream-source | |
| The endpoint of the S3 server. | string | http://minio-endpoint.-not-set:9090 | |
| Comma separated list of file extensions to filter by. | string | pdf,docx,html,htm,md,txt | |
| Time in seconds to sleep after polling for new files. | integer | 5 | |
| Region for the S3 server. | string | ||
| Secret key for the S3 server. | string | minioadmin |
Kafka Connect Sink agent (sink
)
sink
)Run any Kafka Connect Sink. All the configuration properties are passed to the Kafka Connect Sink.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Java main class for the Kafka Sink connector. | string | ✓ |
Kafka Connect Source agent (source
)
source
)Run any Kafka Connect Source. All the configuration properties are passed to the Kafka Connect Source.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Java main class for the Kafka Source connector. | string | ✓ |
Text extractor (text-extractor
)
text-extractor
)Extracts text content from different document formats like PDF, JSON, XML, ODF, HTML and many others.
Text normaliser (text-normaliser
)
text-normaliser
)Apply normalisation to the text.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Whether to make the text lowercase. | boolean | true | |
| Whether to trim spaces from the text. | boolean | true |
Text splitter (text-splitter
)
text-splitter
)Split message content in chunks.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| RecursiveCharacterTextSplitter splitter option. Chunk overlap of the previous message. Checkout https://github.com/knuddelsgmbh/jtokkit for more details. | integer | 100 | |
| RecursiveCharacterTextSplitter splitter option. Chunk size of each message. Checkout https://github.com/knuddelsgmbh/jtokkit for more details. | integer | 200 | |
| RecursiveCharacterTextSplitter splitter option. Whether or not to keep separators. Checkout https://github.com/knuddelsgmbh/jtokkit for more details. | boolean | false | |
| RecursiveCharacterTextSplitter splitter option. Options are: r50k_base, p50k_base, p50k_edit and cl100k_base. Checkout https://github.com/knuddelsgmbh/jtokkit for more details. | string | cl100k_base | |
| RecursiveCharacterTextSplitter splitter option. The separator to use for splitting. Checkout https://github.com/knuddelsgmbh/jtokkit for more details. | array of string | "\n\n", "\n", " ", "" | |
| Splitter implementation to use. Currently supported: RecursiveCharacterTextSplitter. | string | RecursiveCharacterTextSplitter |
Timer source (timer-source
)
timer-source
)Periodically emits records to trigger the execution of pipelines.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Fields of the generated records. | |||
| Period of the timer in seconds. | integer | 60 |
Timer source (timer-source
).fields
timer-source
).fieldsDescription | Type | Required | Default Value | |
---|---|---|---|---|
| Expression to compute the value of the field. This is a standard EL expression. | string | ✓ | |
| Name of the field like value.xx, key.xxx, properties.xxx | string | ✓ |
Trigger event (trigger-event
)
trigger-event
)Emits a record on a side destination when a record is received.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Whether to continue processing the record downstream after emitting the event. If the when condition is false, the record is passed downstream anyway. This flag allows you to stop processing system events and trigger a different pipeline. | boolean | true | |
| Destination of the message. | string | ✓ | |
| Fields of the generated records. | |||
| Condition to trigger the event. This is a standard EL expression. | string | true |
Trigger event (trigger-event
).fields
trigger-event
).fieldsDescription | Type | Required | Default Value | |
---|---|---|---|---|
| Expression to compute the value of the field. This is a standard EL expression. | string | ✓ | |
| Name of the field like value.xx, key.xxx, properties.xxx | string | ✓ |
Unwrap key-value format (unwrap-key-value
)
unwrap-key-value
)If the record value is in KeyValue format, extracts the KeyValue's key or value and make it the record value.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Whether this step can be composed with other steps. | boolean | true | |
| Whether to unwrap the key instead of the value. | boolean | false | |
| Execute the step only when the condition is met. You can use the expression language to reference the message. Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'" | string |
Astra (vector-db-sink
)
vector-db-sink
)Writes data to DataStax Astra service. All the options from DataStax Kafka Sink are supported: https://docs.datastax.com/en/kafka/doc/kafka/kafkaConfigTasksTOC.html
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'astra'. | string | ✓ | |
| The keyspace of the table to write to. | string | ||
| Comma separated list of mapping between the table column and the record field. e.g. my_colum_id=key, my_column_name=value.name. | string | ✓ | |
| The name of the table to write to. The table must already exist. | string | ✓ |
Astra Vector DB (vector-db-sink
)
vector-db-sink
)Writes data to Apache Cassandra. All the options from DataStax Kafka Sink are supported: https://docs.datastax.com/en/kafka/doc/kafka/kafkaConfigTasksTOC.html
Description | Type | Required | Default Value | |
---|---|---|---|---|
| The name of the collection to write to. The collection must already exist. | string | ✓ | |
| Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'astra-vector-db'. | string | ✓ | |
| Fields of the collection to write to. | ✓ |
Astra Vector DB (vector-db-sink
).fields
vector-db-sink
).fieldsDescription | Type | Required | Default Value | |
---|---|---|---|---|
| JSTL Expression for computing the field value. | string | ✓ | |
| Field name | string | ✓ |
Cassandra (vector-db-sink
)
vector-db-sink
)Writes data to Apache Cassandra. All the options from DataStax Kafka Sink are supported: https://docs.datastax.com/en/kafka/doc/kafka/kafkaConfigTasksTOC.html
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'cassandra'. | string | ✓ | |
| The keyspace of the table to write to. | string | ||
| Comma separated list of mapping between the table column and the record field. e.g. my_colum_id=key, my_column_name=value.name. | string | ✓ | |
| The name of the table to write to. The table must already exist. | string | ✓ |
JDBC (vector-db-sink
)
vector-db-sink
)Writes data to any JDBC compatible database.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'jdbc'. | string | ✓ | |
| Fields of the table to write to. | ✓ | ||
| The name of the table to write to. The table must already exist. | string | ✓ |
JDBC (vector-db-sink
).fields
vector-db-sink
).fieldsDescription | Type | Required | Default Value | |
---|---|---|---|---|
| JSTL Expression for computing the field value. | string | ✓ | |
| Field name | string | ✓ | |
| Is this field part of the primary key? | boolean | false |
Milvus (vector-db-sink
)
vector-db-sink
)Writes data to Milvus/Zillis service.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Collection name | string | ||
| Collection name | string | ||
| Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'milvus'. | string | ✓ | |
| Fields definition. | ✓ |
Milvus (vector-db-sink
).fields
vector-db-sink
).fieldsDescription | Type | Required | Default Value | |
---|---|---|---|---|
| JSTL Expression for computing the field value. | string | ✓ | |
| Field name | string | ✓ |
OpenSearch (vector-db-sink
)
vector-db-sink
)Writes data to OpenSearch or AWS OpenSearch serverless.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Batch size for bulk operations. Hitting the batch size will trigger a flush. | integer | 10 | |
| OpenSearch bulk URL parameters. | object | ||
| Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'opensearch'. | string | ✓ | |
| Index fields definition. | ✓ | ||
| Flush interval in milliseconds | integer | 1000 | |
| JSTL Expression to compute the index _id field. Leave it empty to let OpenSearch auto-generate the _id field. | string |
OpenSearch (vector-db-sink
).bulk-parameters
vector-db-sink
).bulk-parametersDescription | Type | Required | Default Value | |
---|---|---|---|---|
| The pipeline ID for preprocessing documents. Refer to the OpenSearch documentation for more details. | string | ||
| Routes the request to the specified shard. Refer to the OpenSearch documentation for more details. | string | ||
| Set to true to require that all actions target an index alias rather than an index. Refer to the OpenSearch documentation for more details. | boolean | ||
| Whether to refresh the affected shards after performing the indexing operations. Default is false. true makes the changes show up in search results immediately, but hurts cluster performance. wait_for waits for a refresh. Requests take longer to return, but cluster performance doesn’t suffer. Note that AWS OpenSearch supports only false. Refer to the OpenSearch documentation for more details. | string | ||
| Specifies the number of active shards that must be available before OpenSearch processes the bulk request. Default is 1 (only the primary shard). Set to all or a positive integer. Values greater than 1 require replicas. For example, if you specify a value of 3, the index must have two replicas distributed across two additional nodes for the request to succeed. Refer to the OpenSearch documentation for more details. | string | ||
| How long to wait for the request to return. Refer to the OpenSearch documentation for more details. | string |
OpenSearch (vector-db-sink
).fields
vector-db-sink
).fieldsDescription | Type | Required | Default Value | |
---|---|---|---|---|
| JSTL Expression for computing the field value. | string | ✓ | |
| Field name | string | ✓ |
Pinecone (vector-db-sink
)
vector-db-sink
)Writes data to Pinecone service. To add metadata fields you can add vector.metadata.my-field: "value.my-field". The value is a JSTL Expression to compute the actual value.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'pinecone'. | string | ✓ | |
| JSTL Expression to compute the id. | string | ||
| Metadata to append. The key is the metadata name and the value the JSTL Expression to compute the actual value. | object | ||
| JSTL Expression to compute the namespace. | string | ||
| JSTL Expression to compute the vector. | string |
Apache Solr (vector-db-sink
)
vector-db-sink
)Writes data to Apache Solr service. The collection-name is configured at datasource level.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Commit within option | integer | 1000 | |
| Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'solr'. | string | ✓ | |
| Fields definition. | ✓ |
Apache Solr (vector-db-sink
).fields
vector-db-sink
).fieldsDescription | Type | Required | Default Value | |
---|---|---|---|---|
| JSTL Expression for computing the field value. | string | ✓ | |
| Field name | string | ✓ |
Web crawler source (webcrawler-source
)
webcrawler-source
)Crawl a website and extract the content of the pages.
Description | Type | Required | Default Value | |
---|---|---|---|---|
| Configuration for handling the agent status. Access key for the S3 server. | string | minioadmin | |
| Whether to emit non HTML documents to the pipeline (i.e. PDF Files). | boolean | false | |
| Domains that the crawler is allowed to access. | array of string | ||
| Configuration for handling the agent status. The name of the bucket. | string | langstream-source | |
| Configuration for handling the agent status. The S3 endpoint. | string | http://minio-endpoint.-not-set:9090 | |
| Paths that the crawler is not allowed to access. | array of string | ||
| Whether to handle cookies. | boolean | true | |
| Whether to scan the HTML documents to find links to other pages. | boolean | true | |
| Timeout for HTTP requests. (in milliseconds) | integer | 10000 | |
| Maximum depth of the crawl. | integer | 50 | |
| Maximum number of errors allowed before stopping. | integer | 5 | |
| Maximum number of unflushed pages before the agent persists the crawl data. | integer | 100 | |
| Maximum number of URLs that can be crawled. | integer | 1000 | |
| Minimum time between two requests to the same domain. (in milliseconds) | integer | 500 | |
| Configuration for handling the agent status. Region for the S3 server. | string | ||
| Time interval between reindexing of the pages. | integer | 86400 | |
| Whether to scan HTML documents for links to other sites. | boolean | true | |
| Configuration for handling the agent status. Secret key for the S3 server. | string | minioadmin | |
| The starting URLs for the crawl. | array of string | ||
| State storage configuration. "s3" or "disk" | string | s3 | |
| User agent to use for the requests. | string | Mozilla/5.0 (compatible; LangStream.ai/0.1; +https://langstream.ai) |
Last updated