LangStream Documentation
Langstream.aiLangStream GitHub RepoChangelog
  • LangStream Documentation
  • ❤️Langstream.ai
  • ⭐LangStream GitHub Repo
  • 📜Changelog
  • about
    • What is LangStream?
    • License
  • Get Started
  • installation
    • LangStream CLI
    • Docker
    • Minikube (mini-langstream)
    • Kubernetes
    • Build and install from source
  • Building Applications
    • Vector Databases
    • Application structure
      • Pipelines
      • Instances
      • Configuration
      • Topics
      • Assets
      • Secrets
      • YAML templating
      • Error Handling
      • Stateful agents
      • .langstreamignore
    • Sample App
    • Develop, test and deploy
    • Application Lifecycle
    • Expression Language
    • API Gateways
      • Websocket
      • HTTP
      • Message filtering
      • Gateway authentication
    • API Reference
      • Agents
      • Resources
      • Assets
  • LangStream CLI
    • CLI Commands
    • CLI Configuration
    • Web interface
  • Integrations
    • Large Language Models (LLMs)
      • OpenAI
      • Hugging Face
      • Google Vertex AI
      • Amazon Bedrock
      • Ollama
    • Data storage
      • Astra Vector DB
      • Astra
      • Cassandra
      • Pinecone
      • Milvus
      • Solr
      • JDBC
      • OpenSearch
    • Integrations
      • Apache Kafka Connect
      • Apache Camel
    • LangServe
  • Pipeline Agents
    • Agent Messaging
    • Builtin agents
      • Input & Output
        • webcrawler-source
        • s3-source
        • azure-blob-storage-source
        • sink
        • vector-db-sink
        • camel-source
      • AI Agents
        • ai-chat-completions
        • ai-text-completions
        • compute-ai-embeddings
        • flare-controller
      • Text Processors
        • document-to-json
        • language-detector
        • query
        • query-vector-db
        • re-rank
        • text-normaliser
        • text-extractor
        • text-splitter
        • http-request
      • Data Transform
        • cast
        • compute
        • drop
        • drop-fields
        • merge-key-value
        • unwrap-key-value
      • Flow control
        • dispatch
        • timer-source
        • trigger-event
    • Custom Agents
      • Python sink
      • Python source
      • Python processor
      • Python service
    • Agent Developer Guide
      • Agent Types
      • Agent Creation
      • Configuration and Testing
      • Environment variables
  • Messaging
    • Messaging
      • Apache Pulsar
      • Apache Kafka
      • Pravega.io
  • Patterns
    • RAG pattern
    • FLARE pattern
  • Examples
    • LangServe chatbot
    • LlamaIndex Cassandra sink
Powered by GitBook
On this page
  • Connecting to OpenSearch
  • Connecting to AWS OpenSearch service
  • Indexing
  • Configuration
Edit on GitHub
  1. Integrations
  2. Data storage

OpenSearch

PreviousJDBCNextIntegrations

Last updated 1 year ago

LangStream supports using OpenSearch as a vector database.

Learn more about performing vector search with OpenSearch in the

Only OpenSearch 2.x is officially supported.

Connecting to OpenSearch

Create a vector-database resource in your configuration.yaml file. A single resource is bound to a single index.

resources:
    - type: "vector-database"
      name: "OpenSearch"
      configuration:
        service: "opensearch"        
        username: "${secrets.opensearch.username}"
        password: "${secrets.opensearch.password}"
        host: "${secrets.opensearch.host}"
        port: "${secrets.opensearch.port}"
        index-name: "my-index-000"

Connecting to AWS OpenSearch service

resources:
    - type: "vector-database"
      name: "OpenSearch"
      configuration:
        service: "opensearch"
        username: "${secrets.opensearch.username}"
        password: "${secrets.opensearch.password}"
        host: "${secrets.opensearch.host}"
        region: "${secrets.opensearch.region}"
        index-name: "my-index-000"
  • username is the AWS Access Key

  • password is the AWS Secret Key

  • host is the endpoint provided by AWS. e.g. for AWS OpenSearch serverless it looks like this: xxxx..aoss.amazonaws.com

  • region is the AWS region. It has to match with the one used in the endpoint

Declare an index as asset

To bind the application to the OpenSearch index creation at startup, you must use the opensearch-index asset type.

You can configure settings and mappings as you prefer. Other configuration fields are not supported.

This is an example mixing normal fields with vector fields. The knn plugin is required in the target OpenSearch instance.

- name: "os-index"
  asset-type: "opensearch-index"
  creation-mode: create-if-not-exists
  config:
    datasource: "OpenSearch"
    settings: |
      {
            "index": {
                  "knn": true,
                  "knn.algo_param.ef_search": 100
            }
        }
    mappings: |
      {
            "properties": {
                  "content": {
                        "type": "text"
                  },
                  "embeddings": {
                        "type": "knn_vector",
                        "dimension": 1536
                  }
            }
        }

Search

Use the query-vector-db agent with the following parameters to perform searches on the index created above :

  - name: "lookup-related-documents"
    type: "query-vector-db"
    configuration:
      datasource: "OpenSearch"
      query: |
        {
          "size": 1,
          "query": {
            "knn": {
              "embeddings": {
                "vector": ?,
                "k": 1
              }
            }
          }
        }
      fields:
        - "value.question_embeddings"
      output-field: "value.related_documents"

You can use the '?' symbol as a placeholder for the fields.

The output-field will contain the query result. The result is an array with the following elements:

  • id: the document ID

  • document: the document source

  • score: the document score

  • index: the index name

For example, if you want to keep only one relevant field from the first result, use the compute agent after the search:

  - name: "lookup-related-documents"
    type: "query-vector-db"
    configuration:
      datasource: "OpenSearch"
      query: |
        {
          "size": 1,
          "query": {
            "match_all": {}
          }
        }
      output-field: "value.related_documents"
      only-first: true
  - name: "Format response"
    type: compute
    configuration:
      fields:
        - name: "value"
          type: STRING
          expression: "value.related_documents.document.content"

Indexing

Use the vector-db-sink agent to index data, with the following parameters:

  - name: "Write to Solr"
    type: "vector-db-sink"
    input: chunks-topic
    configuration:
      datasource: "OpenSearch"
      bulk-parameters:
        timeout: 2m
      fields:
        - name: "id"
          expression: "fn:concat(value.filename, value.chunk_id)"
        - name: "embeddings"
          expression: "fn:toListOfFloat(value.embeddings_vector)"
        - name: "text"
          expression: "value.text"

The request will be flushed depending on flush-interval and batch-size parameters.

Configuration

Refer to the documentation for the settings field. Refer to the documentation for the mappings field.

The query is the body sent to OpenSearch. Refer to the to learn which parameters are supported. Note that the query will be executed on the configured index. Multi-index queries are not supported, but you can declare multiple datasources and query different indexes in the same application.

All indexing is performed using the Bulk operation. You can customize the with the bulk-parameters property.

Check out the full configuration properties in the .

official documentation
settings
mappings
documentation
bulk parameters
API Reference page