webcrawler-source

The webcrawler-source agent crawls a website and outputs the site's URL and an HTML document. Crawling a website is an ideal first step in a text embeddings pipeline.

This agent keeps the status of the crawling in a persistent storage. Storage won’t contain a copy of the crawl data, but a single JSON file with a name computed from the name of the agent and the id of the LangStream application.

By default, it requires an S3-compatible bucket that must be defined using bucketName, endpoint, access-key, secret-key and region properties. Another solution is to store the status in a persistent disk provided by LangStream. This can be achieved by setting state-storage: disk.

Example

Example webcrawler agent in a pipeline:

pipeline:
  - name: "Crawl the WebSite"
    type: "webcrawler-source"
    configuration:
      seed-urls: ["https://docs.langstream.ai/"]
      allowed-domains: ["https://docs.langstream.ai"]
      forbidden-paths: []
      min-time-between-requests: 500
      reindex-interval-seconds: 3600
      max-error-count: 5
      max-urls: 1000
      max-depth: 50
      handle-robots-file: true
      user-agent: "" # this is computed automatically, but you can override it
      scan-html-documents: true
      http-timeout: 10000
      handle-cookies: true
      max-unflushed-pages: 100
      state-storage: disk

Multiple URLs in pipeline

Multiple seed-urls and allowed-domains are allowed.

To add them to your pipeline, use this syntax:

seed-urls:
  - "http://example1.com"
  - "http://example2.com"
allowed-domains:
  - "http://example1.com"
  - "http://example2.com"  

Topics

Input

  • None, it’s a source

Output

  • Structured text (JSON) ?

  • Implicit topic ?

Configuration

Checkout the full configuration properties in the API Reference page.

Webcrawler Status

Example webcrawler workflow

Using the webcrawler source agent as a starting point, this workflow will crawl a website, get a page's raw HTML, and process that information into chunks for embedding in a vector database. The complete example code is available in the LangStream repository.

  1. Topics necessary for the application are declared: we will later pass the chunked embeddings to a vector database via the Kafka "chunks-topic".

name: "Crawl a website"
topics:
  - name: "chunks-topic"
    creation-mode: create-if-not-exists
  1. The webcrawler source agent configuration is declared. For help finding your credential information, see Secrets. For help configuring the webcrawler-source, see Configuration.

pipeline:
      seed-urls: ["https://docs.langstream.ai/"]
      allowed-domains: ["https://docs.langstream.ai"]
      forbidden-paths: []
      min-time-between-requests: 500
      reindex-interval-seconds: 3600
      max-error-count: 5
      max-urls: 1000
      max-depth: 50
      handle-robots-file: true
      user-agent: "" # this is computed automatically, but you can override it
      scan-html-documents: true
      http-timeout: 10000
      handle-cookies: true
      max-unflushed-pages: 100
      state-storage: disk

The webcrawler itself uses the Jsoup library to parse HTML with the WHATWG HTML spec. The webcrawler explores the web starting from a list of seed URLs and follows links within pages to discover more content.

For each seed-url, the webcrawler:

  • Sets up a connection using Jsoup.

  • Catches HTTP status errors and handles retries or skips based on the error code (HttpStatusException for HTTP errors and UnsupportedMimeTypeException for non-HTML content types)

  • If the content is HTML, the webcrawler fetches the links (hrefs) from the page and adds them to the list of URLs to be crawled, if the href passes the allowed-domain rule set in the pipeline configuration.

  • Gets the page's HTML and creates a document with the site url as a header and raw HTML as output.

The webcrawler then passes the document on to the next agent.

  1. The text-extractor agent extracts metadata and text from records using Apache Tika.

  - name: "Extract text"
    type: "text-extractor"
  1. The text-normaliser agent forces the text into lower case and removes leading and trailing spaces.

  - name: "Normalise text"
    type: "text-normaliser"
    configuration:
      make-lowercase: true
      trim-spaces: true
  1. The language detector agent identifies a record’s language. In this case, non-English records are skipped, and English records continue to the next step in the pipeline.

  - name: "Detect language"
    type: "language-detector"
    configuration:
       allowedLanguages: ["en"]
       property: "language"
  1. The text-splitter agent splits the document into chunks of text. This is an important part of the vectorization pipeline to understand, because it requires balancing between performance and accuracy. chunk_size controls the maximum number of characters of the chunked documents, and chunk_overlap controls the amount of overlap between chunks. A little overlap keeps results more consistent. chunk_size defaults to 1000 characters, and chunk_overlap defaults to 200 characters.

  - name: "Split into chunks"
    type: "text-splitter"
    configuration:
      splitter_type: "RecursiveCharacterTextSplitter"
      chunk_size: 400
      separators: ["\n\n", "\n", " ", ""]
      keep_separator: false
      chunk_overlap: 100
      length_function: "cl100k_base"
  1. The document-to-json agent converts the unstructured data to structured JSON.

  - name: "Convert to structured data"
    type: "document-to-json"
    configuration:
        text-field: text
        copy-properties: true
  1. The compute agent structures the JSON output into values the final compute step can work with.

  - name: "prepare-structure"
    type: "compute"
    configuration:
      fields:
         - name: "value.filename"
           expression: "properties.name"
           type: STRING
         - name: "value.chunk_id"
           expression: "properties.chunk_id"
           type: STRING
         - name: "value.language"
           expression: "properties.language"
           type: STRING
         - name: "value.chunk_num_tokens"
           expression: "properties.chunk_num_tokens"
           type: STRING
  1. Now that the text is processed and structured, the compute-ai-embeddings agent computes embeddings and sends them to the Kafka "chunks-topic".

  - name: "compute-embeddings"
    id: "step1"
    type: "compute-ai-embeddings"
    output: "chunks-topic"
    configuration:
      model: "text-embedding-ada-002" # This needs to match the name of the model deployment, not the base model
      embeddings-field: "value.embeddings_vector"
      text: "{{ value.text }}"
      batch-size: 10
      flush-interval: 500

10. Where to next? If you've got an Astra vector database, use the vector-db-sink agent to sink the vectorized embeddings via the Kafka "chunks-topic" to your database. From there, you can query your vector data, or ask questions with a chatbot. It's up to you!

  - name: "Write to Astra"
    type: "vector-db-sink"
    input: "chunks-topic"
    resources:
      size: 2
    configuration:
      datasource: "AstraDatasource"
      table-name: "documents"
      keyspace: "documents"
      mapping: "filename=value.filename, chunk_id=value.chunk_id, language=value.language, text=value.text, embeddings_vector=value.embeddings_vector, num_tokens=value.chunk_num_tokens"

Last updated