Python sink

Along with the included pre-made agents, you can provide your own custom sink agent as a Python application.

The Python application needs to follow a specific directory structure for this agent to successfully run.

Within the “application” directory, create a directory named “python”.

Within that directory, place the .py file with the class that will be the entry point.

The directory will look like this:

|- Project directory
    |- application
        |- python
            |- application.py
    |- pipeline.yaml
    |- configuration.yaml
|- (optional) secrets.yaml

For more on developing custom Python sink agents, see the Agent Developer Guide.

Python sink example

from typing import List
from langstream import Sink, Record

class ExampleSink(Sink):

    def start(self):
        # Initialize any necessary resources or connections here
        pass

    def write(self, record: Record):
        # Process the records to the sink
        # Perform your processing logic here
        processed_data = self.process_data(record.value)

        # Print or log the processed data
        print("Processed Data:", processed_data)

        # Implement any necessary logic for storing the processed data

    def process_data(self, data):
        # Placeholder for your data processing logic
        # Modify this function according to your actual processing needs
        processed_data = data.upper()  # Example: Convert text to uppercase
        return processed_data

    def close(self):
        # Clean up any resources or connections here
        pass

Configure the agent to use the python class in pipeline.yaml:

pipeline:
  - name: "A custom Python sink"
    type: "python-sink"
    input: "input-topic"
    output: "output-topic"
    configuration:
      className: application.ExampleSink

Topics

Input

  • Structured and unstructured text ?

  • Implicit topic ?

  • Templating ?

Output

  • None, it’s a sink.

Configuration

Last updated