wissel.net

Usability - Productivity - Business - The web - Singapore & Twins

Streaming CouchDB data


I'm a confessing fan of CouchDB, stream programming and the official CouchDB NodeJS library. Nano supports returning data as NodeJS Stream, so you can pipe it away. Most examples use file streams or process.stdout, while my goal was to process individual documents that are part of the stream

You can't walk into the same stream a second time

This old Buddhist saying holds true for NodeJS streams too. So any processing needs to happen in the chain of the stream. Let's start with the simple example of reading all documents from a couchDB:

const Nano = require("nano");
const nano = Nano(couchDBURL);
nano.listAsStream({ include_docs: true }).pipe(process.stdout);

This little snippet will read out all documents in your couchDB. You need to supply the couchDBURL value, e.g. http://localhost:5984/test. On a closer look, we see that the data returned arrives in continious buffers that don't match JSON document boundaries, so processing one document after the other needs extra work.

A blog entry in the StrongLoop blog provides the first clue what to do. To process CouchDB stream data we need both a Transform stream to chop incoming data into line by line and a writable stream for our results.

Our code, finally will look like this:

const Nano = require("nano");
const { Writable, Transform } = require("stream");

const streamOneDb = (couchDBURL, resultCallback) => {
  const nano = Nano(couchDBURL);
  nano
    .listAsStream({ include_docs: true })
    .on("error", (e) => console.error("error", e))
    .pipe(lineSplitter())
    .pipe(jsonMaker())
    .pipe(documentWriter(resultCallback));
};

Let's have a closer look at the new functions, the first two implement transform, the last one writable:

  • lineSplitter, as the name implies, cuts the buffer into separate lines for processing. As far as I could tell, CouchDB documents always returned on one line
  • jsonMaker, extracts the documents and discards the wrapper with document count that surrounds them
  • documentWriter, writing out the JSON object using a callback

Splitting lines

The little special: a chunk might end in the middle of a line, so we keep the last line around for the next iteration until it gets flushed out. The callback() triggers the next step in the pipe.

const lineSplitter = () =>
  new Transform({
    objectMode: true,
    transform(chunk, encoding, callback) {
      let raw = Buffer.from(chunk, encoding).toString();
      if (this._leftOver) {
        raw = this._leftOver + raw;
      }
      let lines = raw.split("\n");
      this._leftOver = lines.splice(lines.length - 1, 1)[0];
      for (var i in lines) {
        this.push(lines[i]);
      }
      callback();
    },
    flush(callback) {
      if (this._leftOver) {
        this.push(this._leftOver);
      }
      this._leftOver = null;
      callback();
    },
  });

Nota bene: the call to lineSplitter() returns a new instance, not executes the split, since each instance can only be used for one stream at all. I tried to just define the function and found NodeJS terminating without a trace when the same instance was used on a second stream.

Extracting documents

You might want to adjust this function when you are interested in other data.

const jsonMaker = () =>
  new Transform({
    objectMode: true,
    transform(rawLine, encoding, callback) {
      // remove the comma at the end of the line - CouchDB sent an array
      let line = rawLine.toString().replace(/,$/m, "").trim();
      if (line.startsWith('{"id":') && line.endsWith("}")) {
        try {
          let j = JSON.parse(line);
          // We only want the document
          if (j.doc) {
            this.push(JSON.stringify(j.doc));
          }
        } catch (e) {
          console.error(e.message);
        }
      }
      callback();
    },
  });

Nota bene: Streams process buffers or strings, not JSON Objects, so we need to stringify/parse between the modules

Writing out

In the last step we take the document and process it with whatever we need to do.

const documentWriter = (resultCallback) =>
  new Writable({
    write(chunk, encoding, callback) {
      let json = JSON.parse(Buffer.from(chunk, encoding).toString());
      // Process the code
      resultCallback(json);
      // Tell that we are done
      callback();
    },
  });

So far we have tried it on databases with 500k small documents and databases with documents exceeding 1MB JSON. Works like a charm.
I wonder if that would make a good addition to the nano library? The source is available as gist

As usual YMMV


Posted by on 16 October 2021 | Comments (1) | categories: CouchDB NodeJS

Comments

  1. posted by Karsten Lehmann on Wednesday 20 October 2021 AD:

    When I read streaming and CouchDB, I think of RXDB (https://rxdb.info/). But I haven't used that yet.