sink

The sink agent will attempt to write the input message data to the specified connector. LangStream does not pre-load any sink connectors. However it is compatible with Kafka connect. You can specify a Kafka sink connector (jar) as an application dependency and then use this agent to implement the connector.

Prerequisites

You will need the (public) URL of the sink connector’s jar. Here are resources to discover different Kafka connectors:

Once you have the jar, you’ll need its sha512 hash. Here are resources to calculate this:

Finally, you will need the connector’s configuration properties. If you are using an individually distributed jar, the properties should be available in the project’s documentation. Confluent connector hub offers a searchable database of connectors, which link to the documentation.

Example

This is an example using the Apache Cassandra sink for Kafka. Include the URL, sha512, and type as an application dependency in configuration.yaml

configuration:
  dependencies:
    - name: "Kafka Connect Sink for Apache Cassandra from DataStax"
      url: "https://github.com/datastax/kafka-sink/releases/download/1.5.0/kafka-connect-cassandra-sink-1.5.0.jar"
      sha512sum: "cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e"
      type: "java-library"

Then the agent would implement the sink connector with appropriate values

- name: "Write to Cassandra"
  type: "sink"
  input: "input-topic" # optional
  configuration:
    name: cassandra-sink
    "connector.class": com.datastax.oss.kafka.sink.CassandraSinkConnector
    "key.converter": org.apache.kafka.connect.storage.StringConverter
    "value.converter": org.apache.kafka.connect.storage.StringConverter
    "cloud.secureConnectBundle": "{{ secrets.cassandra.secure-connect-bundle }}"
    "auth.username": "{{ secrets.cassandra.username }}"
    "auth.password": "{{ secrets.cassandra.password }}"
    "topic.input-topic.vsearch.products.mapping": "id=value.id,description=value.description,name=value.name"

Topics

Input

  • Structured and unstructured text ?

  • Implicit topic ?

  • Templating ?

Output

  • None, it’s a sink.

Configuration

Checkout the full configuration properties in the API Reference page.

Last updated