Back to Blog

Node.js Streams and Backpressure: Readable, Writable, Transform, and the Pipeline API

Master Node.js streams for production: understand backpressure mechanics, implement readable and writable streams correctly, build transform pipelines for data processing, use the pipeline() API for error handling, and process large files without memory exhaustion.

Viprasol Tech Team
October 12, 2026
13 min read

Node.js streams let you process data without loading it all into memory. A 10GB CSV file can be parsed, transformed, and inserted into a database using constant ~50MB of memory โ€” if you handle backpressure correctly.

Backpressure is the mechanism that prevents a fast producer from overwhelming a slow consumer. When you ignore it (using .pipe() without checking return values, or push() without respecting return values), you get memory exhaustion and out-of-process kills. The pipeline() API from Node.js v10 handles backpressure automatically and propagates errors correctly.


The Backpressure Problem

// BAD: No backpressure โ€” reader runs at full speed regardless of writer
const readable = fs.createReadStream("large-file.csv");
const writable = fs.createWriteStream("output.csv");

// readable.pipe(writable) handles backpressure
// But if you do this manually:
readable.on("data", (chunk) => {
  const canContinue = writable.write(chunk);
  // canContinue = false means writable buffer is full
  // If you ignore this, writable buffers grow unbounded โ†’ OOM
  if (!canContinue) {
    readable.pause(); // Backpressure: pause the producer
  }
});

writable.on("drain", () => {
  readable.resume(); // Buffer drained โ€” producer can continue
});

The pipeline() API manages this automatically:

import { pipeline } from "node:stream/promises";
import { createReadStream, createWriteStream } from "node:fs";
import { createGzip } from "node:zlib";

// pipeline() handles backpressure, cleans up on error, returns a Promise
await pipeline(
  createReadStream("large-file.log"),
  createGzip(),
  createWriteStream("large-file.log.gz")
);
// pipeline auto-destroys all streams on error โ€” no leaks

Custom Readable Stream

// src/streams/database-readable.ts
import { Readable, ReadableOptions } from "node:stream";
import { Pool } from "pg";

interface DatabaseStreamOptions extends ReadableOptions {
  pool: Pool;
  query: string;
  params?: unknown[];
  batchSize?: number;
}

// Stream rows from PostgreSQL without loading all into memory
export class DatabaseReadableStream extends Readable {
  private readonly pool: Pool;
  private readonly query: string;
  private readonly params: unknown[];
  private readonly batchSize: number;
  private offset = 0;
  private done = false;

  constructor(options: DatabaseStreamOptions) {
    super({
      objectMode: true,           // Emit objects, not Buffers
      highWaterMark: options.batchSize ?? 100, // Buffer up to N objects
      ...options,
    });
    this.pool = options.pool;
    this.query = options.query;
    this.params = options.params ?? [];
    this.batchSize = options.batchSize ?? 100;
  }

  // Called by the stream machinery when it wants more data
  // Only called when the consumer is ready (backpressure respected)
  async _read() {
    if (this.done) {
      this.push(null); // Signal end of stream
      return;
    }

    try {
      const { rows } = await this.pool.query(
        `${this.query} LIMIT $${this.params.length + 1} OFFSET $${this.params.length + 2}`,
        [...this.params, this.batchSize, this.offset]
      );

      if (rows.length === 0) {
        this.done = true;
        this.push(null);
        return;
      }

      this.offset += rows.length;

      // Push each row โ€” _read won't be called again until consumer reads these
      for (const row of rows) {
        const canContinue = this.push(row);
        if (!canContinue) {
          // Internal buffer full โ€” stream machinery will call _read again when ready
          return;
        }
      }

      if (rows.length < this.batchSize) {
        // Fewer rows than batch size โ€” we've reached the end
        this.done = true;
        this.push(null);
      }
    } catch (error) {
      this.destroy(error as Error); // Propagate errors through the pipeline
    }
  }
}

๐ŸŒ Looking for a Dev Team That Actually Delivers?

Most agencies sell you a project manager and assign juniors. Viprasol is different โ€” senior engineers only, direct Slack access, and a 5.0โ˜… Upwork record across 100+ projects.

  • React, Next.js, Node.js, TypeScript โ€” production-grade stack
  • Fixed-price contracts โ€” no surprise invoices
  • Full source code ownership from day one
  • 90-day post-launch support included

Custom Transform Stream

// src/streams/csv-parser.transform.ts
import { Transform, TransformOptions } from "node:stream";

interface CSVRow {
  [key: string]: string;
}

export class CSVParserTransform extends Transform {
  private headers: string[] | null = null;
  private buffer = "";
  private lineCount = 0;

  constructor(options?: TransformOptions) {
    super({
      readableObjectMode: true,  // Output: objects
      writableObjectMode: false, // Input: Buffer/string
      ...options,
    });
  }

  _transform(
    chunk: Buffer | string,
    _encoding: BufferEncoding,
    callback: (error?: Error | null) => void
  ): void {
    this.buffer += chunk.toString();
    const lines = this.buffer.split("\n");

    // Keep the last incomplete line in the buffer
    this.buffer = lines.pop() ?? "";

    for (const line of lines) {
      const trimmed = line.trim();
      if (!trimmed) continue;

      if (this.headers === null) {
        this.headers = this.parseLine(trimmed);
        continue;
      }

      const values = this.parseLine(trimmed);
      const row: CSVRow = {};

      for (let i = 0; i < this.headers.length; i++) {
        row[this.headers[i]] = values[i] ?? "";
      }

      this.lineCount++;
      this.push(row); // Push parsed object downstream
    }

    callback(); // Signal that we're ready for more data
  }

  _flush(callback: (error?: Error | null) => void): void {
    // Process any remaining data in the buffer
    if (this.buffer.trim() && this.headers) {
      const values = this.parseLine(this.buffer.trim());
      const row: CSVRow = {};
      for (let i = 0; i < this.headers.length; i++) {
        row[this.headers[i]] = values[i] ?? "";
      }
      this.push(row);
    }

    callback();
  }

  private parseLine(line: string): string[] {
    // Handle quoted fields with commas
    const result: string[] = [];
    let current = "";
    let inQuotes = false;

    for (let i = 0; i < line.length; i++) {
      const char = line[i];

      if (char === '"') {
        inQuotes = !inQuotes;
      } else if (char === "," && !inQuotes) {
        result.push(current.trim());
        current = "";
      } else {
        current += char;
      }
    }

    result.push(current.trim());
    return result;
  }

  get rowsProcessed() {
    return this.lineCount;
  }
}

Custom Writable Stream

// src/streams/batch-insert.writable.ts
import { Writable, WritableOptions } from "node:stream";
import { Pool } from "pg";

interface BatchInsertOptions extends WritableOptions {
  pool: Pool;
  tableName: string;
  columns: string[];
  batchSize?: number;
}

export class BatchInsertWritable extends Writable {
  private readonly pool: Pool;
  private readonly tableName: string;
  private readonly columns: string[];
  private readonly batchSize: number;
  private batch: Record<string, unknown>[] = [];
  private insertedCount = 0;

  constructor(options: BatchInsertOptions) {
    super({
      objectMode: true,
      highWaterMark: options.batchSize ?? 500,
      ...options,
    });
    this.pool = options.pool;
    this.tableName = options.tableName;
    this.columns = options.columns;
    this.batchSize = options.batchSize ?? 500;
  }

  _write(
    row: Record<string, unknown>,
    _encoding: BufferEncoding,
    callback: (error?: Error | null) => void
  ): void {
    this.batch.push(row);

    if (this.batch.length >= this.batchSize) {
      this.flushBatch()
        .then(() => callback())
        .catch((err) => callback(err));
    } else {
      callback(); // Ready for next write
    }
  }

  _final(callback: (error?: Error | null) => void): void {
    // Flush remaining rows when stream ends
    this.flushBatch()
      .then(() => callback())
      .catch((err) => callback(err));
  }

  private async flushBatch(): Promise<void> {
    if (this.batch.length === 0) return;

    const batchToInsert = this.batch.splice(0); // Drain batch

    // Build parameterized bulk insert
    const placeholders = batchToInsert.map((_, rowIndex) =>
      `(${this.columns.map((_, colIndex) =>
        `$${rowIndex * this.columns.length + colIndex + 1}`
      ).join(", ")})`
    ).join(", ");

    const values = batchToInsert.flatMap((row) =>
      this.columns.map((col) => row[col])
    );

    await this.pool.query(
      `INSERT INTO ${this.tableName} (${this.columns.join(", ")})
       VALUES ${placeholders}
       ON CONFLICT DO NOTHING`,
      values
    );

    this.insertedCount += batchToInsert.length;
  }

  get totalInserted() {
    return this.insertedCount;
  }
}

๐Ÿš€ Senior Engineers. No Junior Handoffs. Ever.

You get the senior developer, not a project manager who relays your requirements to someone you never meet. Every Viprasol project has a senior lead from kickoff to launch.

  • MVPs in 4โ€“8 weeks, full platforms in 3โ€“5 months
  • Lighthouse 90+ performance scores standard
  • Works across US, UK, AU timezones
  • Free 30-min architecture review, no commitment

Complete Pipeline Example

// src/scripts/import-users.ts
// Process a 2GB CSV, parse it, validate rows, insert into DB
// Memory usage: ~50MB constant regardless of file size

import { pipeline } from "node:stream/promises";
import { createReadStream } from "node:fs";
import { Transform } from "node:stream";
import { CSVParserTransform } from "../streams/csv-parser.transform";
import { BatchInsertWritable } from "../streams/batch-insert.writable";
import { pool } from "../db";

async function importUsers(csvPath: string): Promise<void> {
  const parser = new CSVParserTransform();

  // Validation + transformation step
  const validator = new Transform({
    objectMode: true,
    transform(row: Record<string, string>, _encoding, callback) {
      // Validate required fields
      if (!row.email || !row.email.includes("@")) {
        // Skip invalid rows (log but don't fail)
        console.warn(`Skipping invalid row: ${JSON.stringify(row)}`);
        callback(); // Don't push โ€” skip this row
        return;
      }

      // Transform to database shape
      this.push({
        email: row.email.toLowerCase().trim(),
        name: row.name?.trim() || null,
        imported_at: new Date().toISOString(),
        source: "csv_import",
      });

      callback();
    },
  });

  const inserter = new BatchInsertWritable({
    pool,
    tableName: "users",
    columns: ["email", "name", "imported_at", "source"],
    batchSize: 1000,
  });

  console.log("Starting import...");
  const startTime = Date.now();

  // pipeline() manages backpressure and error propagation
  await pipeline(
    createReadStream(csvPath, { encoding: "utf8" }),
    parser,
    validator,
    inserter
  );

  const elapsed = ((Date.now() - startTime) / 1000).toFixed(1);
  console.log(
    `Import complete: ${inserter.totalInserted} users in ${elapsed}s ` +
    `(${parser.rowsProcessed} rows processed)`
  );
}

// Error handling: pipeline rejects on any stream error
importUsers("users.csv").catch((error) => {
  console.error("Import failed:", error);
  process.exit(1);
});

Stream vs Buffer: When to Use Each

ScenarioUse StreamsUse Buffer
Files > 100MBโœ… AlwaysโŒ OOM risk
Database exports (> 10K rows)โœ… Rows one batch at a timeโŒ Array in memory
HTTP response body < 1MBFine either wayโœ… Simpler
File compression/decompressionโœ… Built-in zlib streamsโŒ
Video/audio processingโœ… Must streamโŒ
JSON with known small sizeFine either wayโœ… Simpler
Real-time event processingโœ… Transform streamsโŒ

See Also


Working With Viprasol

Large-scale data processing โ€” imports, exports, ETL pipelines, report generation โ€” requires correct stream handling to avoid memory exhaustion and incorrect error recovery. Our Node.js engineers build production stream pipelines with proper backpressure, batch processing, validation, and error handling for workloads from megabytes to terabytes.

Backend engineering โ†’ | Talk to our engineers โ†’

Share this article:

About the Author

V

Viprasol Tech Team

Custom Software Development Specialists

The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.

MT4/MT5 EA DevelopmentAI Agent SystemsSaaS DevelopmentAlgorithmic Trading

Need a Modern Web Application?

From landing pages to complex SaaS platforms โ€” we build it all with Next.js and React.

Free consultation โ€ข No commitment โ€ข Response within 24 hours

Viprasol ยท Web Development

Need a custom web application built?

We build React and Next.js web applications with Lighthouse โ‰ฅ90 scores, mobile-first design, and full source code ownership. Senior engineers only โ€” from architecture through deployment.