Python sink

Along with the included pre-made agents, you can provide your own custom sink agent as a Python application.

The Python application needs to follow a specific directory structure for this agent to successfully run.

Within the “application” directory, create a directory named “python”.

Within that directory, place the .py file with the class that will be the entry point.

The directory will look like this:

|- Project directory
    |- application
        |- python
    |- pipeline.yaml
    |- configuration.yaml
|- (optional) secrets.yaml

For more on developing custom Python sink agents, see the Agent Developer Guide.

Python sink example

from typing import List
from langstream import Sink, Record

class ExampleSink(Sink):

    def start(self):
        # Initialize any necessary resources or connections here

    def write(self, record: Record):
        # Process the records to the sink
        # Perform your processing logic here
        processed_data = self.process_data(record.value)

        # Print or log the processed data
        print("Processed Data:", processed_data)

        # Implement any necessary logic for storing the processed data

    def process_data(self, data):
        # Placeholder for your data processing logic
        # Modify this function according to your actual processing needs
        processed_data = data.upper()  # Example: Convert text to uppercase
        return processed_data

    def close(self):
        # Clean up any resources or connections here

Configure the agent to use the python class in pipeline.yaml:

  - name: "A custom Python sink"
    type: "python-sink"
    input: "input-topic"
    output: "output-topic"
      className: application.ExampleSink



  • Structured and unstructured text ?

  • Implicit topic ?

  • Templating ?


  • None, it’s a sink.




String (required)

A combination of the file name and the class name.

Example: For the file that has class MySink, the value would be my-python-app.MySink


Additional configuration properties specific to the application.

Last updated