Assets

Assets are external resources handled by LangStream at deployment time. You don't want to deal with creating Kafka topics or vector database tables every time you change your application, so LangStream allows you to automate the behavior of assets at deployment, update, and deletion.

The LangStream deployer creates resources during the app-setup job. If the job completes, the deployer creates another job called runtime-deployer which creates the agent CRDs. When runtime-deployer completes successfully, the application status changes to DEPLOYED. (If you get the ERROR_DEPLOYING status instead, check the logs with langstream apps logs <my-app-id>).

For example, when you initialize an Astra DB database, you might want to also create a keyspace and a table to save time. To do this, add the astra-keyspace and cassandra-table as assets to your pipeline.yaml:

name: "Write to Astra DB"
topics:
  - name: "input-topic"
    creation-mode: create-if-not-exists
assets:
  - name: "langstream-keyspace"
    asset-type: "astra-keyspace"
    creation-mode: create-if-not-exists
    config:
      keyspace: "langstream"
      datasource: "AstraDatasource"
  - name: "products-table"
    asset-type: "cassandra-table"
    creation-mode: create-if-not-exists
    config:
      table-name: "products"
      keyspace: "langstream"
      datasource: "AstraDatasource"
      create-statements:
        - "CREATE TABLE IF NOT EXISTS langstream.products (id int PRIMARY KEY,name TEXT,description TEXT);"
pipeline:
  - name: "Write to Astra"
    type: "vector-db-sink"
    input: "input-topic"
    configuration:
      datasource: "AstraDatasource"
      table-name: "products"
      keyspace: "langstream"
      mapping: "id=value.id,description=value.description,name=value.name"
      

Notice this pipeline is using datasource type AstraDatasource. The assets behavior described here is currently only available for AstraDatasource and CassandraDataSource.

Other commands available for assets include creating indexes within your tables, and truncating tables, as below.

    config:
      table-name: "products"
      keyspace: "langstream"
      datasource: "AstraDatasource"
      create-statements:
        - "CREATE TABLE IF NOT EXISTS langstream.products (id int PRIMARY KEY,name TEXT,description TEXT, embeddings VECTOR<FLOAT,1536>);"
        - "CREATE CUSTOM INDEX IF NOT EXISTS documents_ann_index ON documents.documents(embeddings) USING 'StorageAttachedIndex';"
      delete-statements:
        - "TRUNCATE TABLE langstream.products;"

For more on how AstraDatasource, CassandraDataSource, and LangStream assets interact at application deployment, see Data storage.

Configuration

Asset TypeFieldTypeDescription

cassandra-table

table-name

String

Name of the table.

keyspace

String

Name of the keyspace.

create-statements

List

List of statements to create the table.

delete-statements

List

List of statements to delete the table. Required only if deletion-mode is set to "delete".

cassandra-keyspace

keyspace

String

Name of the keyspace.

create-statements

List

List of statements to create the keyspace.

delete-statements

List

List of statements to delete the keyspace. Required only if deletion-mode is set to "delete".

secureBundle

N/A

If present, an exception is thrown indicating that "astra-keyspace" should be used.

database

N/A

If present, an exception is thrown indicating that "astra-keyspace" should be used.

astra-keyspace

keyspace

String

Name of the keyspace.

token

String

Token for accessing AstraDB.

database

String

Name of the database in AstraDB.

All Asset Types

datasource

Map

Nested configuration containing a configuration field with datasource-specific configurations.

Last updated