Node.js Streams and Backpressure: Readable, Writable, Transform, and the Pipeline API
Master Node.js streams for production: understand backpressure mechanics, implement readable and writable streams correctly, build transform pipelines for data processing, use the pipeline() API for error handling, and process large files without memory exhaustion.
Node.js streams let you process data without loading it all into memory. A 10GB CSV file can be parsed, transformed, and inserted into a database using constant ~50MB of memory โ if you handle backpressure correctly.
Backpressure is the mechanism that prevents a fast producer from overwhelming a slow consumer. When you ignore it (using .pipe() without checking return values, or push() without respecting return values), you get memory exhaustion and out-of-process kills. The pipeline() API from Node.js v10 handles backpressure automatically and propagates errors correctly.
The Backpressure Problem
// BAD: No backpressure โ reader runs at full speed regardless of writer
const readable = fs.createReadStream("large-file.csv");
const writable = fs.createWriteStream("output.csv");
// readable.pipe(writable) handles backpressure
// But if you do this manually:
readable.on("data", (chunk) => {
const canContinue = writable.write(chunk);
// canContinue = false means writable buffer is full
// If you ignore this, writable buffers grow unbounded โ OOM
if (!canContinue) {
readable.pause(); // Backpressure: pause the producer
}
});
writable.on("drain", () => {
readable.resume(); // Buffer drained โ producer can continue
});
The pipeline() API manages this automatically:
import { pipeline } from "node:stream/promises";
import { createReadStream, createWriteStream } from "node:fs";
import { createGzip } from "node:zlib";
// pipeline() handles backpressure, cleans up on error, returns a Promise
await pipeline(
createReadStream("large-file.log"),
createGzip(),
createWriteStream("large-file.log.gz")
);
// pipeline auto-destroys all streams on error โ no leaks
Custom Readable Stream
// src/streams/database-readable.ts
import { Readable, ReadableOptions } from "node:stream";
import { Pool } from "pg";
interface DatabaseStreamOptions extends ReadableOptions {
pool: Pool;
query: string;
params?: unknown[];
batchSize?: number;
}
// Stream rows from PostgreSQL without loading all into memory
export class DatabaseReadableStream extends Readable {
private readonly pool: Pool;
private readonly query: string;
private readonly params: unknown[];
private readonly batchSize: number;
private offset = 0;
private done = false;
constructor(options: DatabaseStreamOptions) {
super({
objectMode: true, // Emit objects, not Buffers
highWaterMark: options.batchSize ?? 100, // Buffer up to N objects
...options,
});
this.pool = options.pool;
this.query = options.query;
this.params = options.params ?? [];
this.batchSize = options.batchSize ?? 100;
}
// Called by the stream machinery when it wants more data
// Only called when the consumer is ready (backpressure respected)
async _read() {
if (this.done) {
this.push(null); // Signal end of stream
return;
}
try {
const { rows } = await this.pool.query(
`${this.query} LIMIT $${this.params.length + 1} OFFSET $${this.params.length + 2}`,
[...this.params, this.batchSize, this.offset]
);
if (rows.length === 0) {
this.done = true;
this.push(null);
return;
}
this.offset += rows.length;
// Push each row โ _read won't be called again until consumer reads these
for (const row of rows) {
const canContinue = this.push(row);
if (!canContinue) {
// Internal buffer full โ stream machinery will call _read again when ready
return;
}
}
if (rows.length < this.batchSize) {
// Fewer rows than batch size โ we've reached the end
this.done = true;
this.push(null);
}
} catch (error) {
this.destroy(error as Error); // Propagate errors through the pipeline
}
}
}
๐ Looking for a Dev Team That Actually Delivers?
Most agencies sell you a project manager and assign juniors. Viprasol is different โ senior engineers only, direct Slack access, and a 5.0โ Upwork record across 100+ projects.
- React, Next.js, Node.js, TypeScript โ production-grade stack
- Fixed-price contracts โ no surprise invoices
- Full source code ownership from day one
- 90-day post-launch support included
Custom Transform Stream
// src/streams/csv-parser.transform.ts
import { Transform, TransformOptions } from "node:stream";
interface CSVRow {
[key: string]: string;
}
export class CSVParserTransform extends Transform {
private headers: string[] | null = null;
private buffer = "";
private lineCount = 0;
constructor(options?: TransformOptions) {
super({
readableObjectMode: true, // Output: objects
writableObjectMode: false, // Input: Buffer/string
...options,
});
}
_transform(
chunk: Buffer | string,
_encoding: BufferEncoding,
callback: (error?: Error | null) => void
): void {
this.buffer += chunk.toString();
const lines = this.buffer.split("\n");
// Keep the last incomplete line in the buffer
this.buffer = lines.pop() ?? "";
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;
if (this.headers === null) {
this.headers = this.parseLine(trimmed);
continue;
}
const values = this.parseLine(trimmed);
const row: CSVRow = {};
for (let i = 0; i < this.headers.length; i++) {
row[this.headers[i]] = values[i] ?? "";
}
this.lineCount++;
this.push(row); // Push parsed object downstream
}
callback(); // Signal that we're ready for more data
}
_flush(callback: (error?: Error | null) => void): void {
// Process any remaining data in the buffer
if (this.buffer.trim() && this.headers) {
const values = this.parseLine(this.buffer.trim());
const row: CSVRow = {};
for (let i = 0; i < this.headers.length; i++) {
row[this.headers[i]] = values[i] ?? "";
}
this.push(row);
}
callback();
}
private parseLine(line: string): string[] {
// Handle quoted fields with commas
const result: string[] = [];
let current = "";
let inQuotes = false;
for (let i = 0; i < line.length; i++) {
const char = line[i];
if (char === '"') {
inQuotes = !inQuotes;
} else if (char === "," && !inQuotes) {
result.push(current.trim());
current = "";
} else {
current += char;
}
}
result.push(current.trim());
return result;
}
get rowsProcessed() {
return this.lineCount;
}
}
Custom Writable Stream
// src/streams/batch-insert.writable.ts
import { Writable, WritableOptions } from "node:stream";
import { Pool } from "pg";
interface BatchInsertOptions extends WritableOptions {
pool: Pool;
tableName: string;
columns: string[];
batchSize?: number;
}
export class BatchInsertWritable extends Writable {
private readonly pool: Pool;
private readonly tableName: string;
private readonly columns: string[];
private readonly batchSize: number;
private batch: Record<string, unknown>[] = [];
private insertedCount = 0;
constructor(options: BatchInsertOptions) {
super({
objectMode: true,
highWaterMark: options.batchSize ?? 500,
...options,
});
this.pool = options.pool;
this.tableName = options.tableName;
this.columns = options.columns;
this.batchSize = options.batchSize ?? 500;
}
_write(
row: Record<string, unknown>,
_encoding: BufferEncoding,
callback: (error?: Error | null) => void
): void {
this.batch.push(row);
if (this.batch.length >= this.batchSize) {
this.flushBatch()
.then(() => callback())
.catch((err) => callback(err));
} else {
callback(); // Ready for next write
}
}
_final(callback: (error?: Error | null) => void): void {
// Flush remaining rows when stream ends
this.flushBatch()
.then(() => callback())
.catch((err) => callback(err));
}
private async flushBatch(): Promise<void> {
if (this.batch.length === 0) return;
const batchToInsert = this.batch.splice(0); // Drain batch
// Build parameterized bulk insert
const placeholders = batchToInsert.map((_, rowIndex) =>
`(${this.columns.map((_, colIndex) =>
`$${rowIndex * this.columns.length + colIndex + 1}`
).join(", ")})`
).join(", ");
const values = batchToInsert.flatMap((row) =>
this.columns.map((col) => row[col])
);
await this.pool.query(
`INSERT INTO ${this.tableName} (${this.columns.join(", ")})
VALUES ${placeholders}
ON CONFLICT DO NOTHING`,
values
);
this.insertedCount += batchToInsert.length;
}
get totalInserted() {
return this.insertedCount;
}
}
๐ Senior Engineers. No Junior Handoffs. Ever.
You get the senior developer, not a project manager who relays your requirements to someone you never meet. Every Viprasol project has a senior lead from kickoff to launch.
- MVPs in 4โ8 weeks, full platforms in 3โ5 months
- Lighthouse 90+ performance scores standard
- Works across US, UK, AU timezones
- Free 30-min architecture review, no commitment
Complete Pipeline Example
// src/scripts/import-users.ts
// Process a 2GB CSV, parse it, validate rows, insert into DB
// Memory usage: ~50MB constant regardless of file size
import { pipeline } from "node:stream/promises";
import { createReadStream } from "node:fs";
import { Transform } from "node:stream";
import { CSVParserTransform } from "../streams/csv-parser.transform";
import { BatchInsertWritable } from "../streams/batch-insert.writable";
import { pool } from "../db";
async function importUsers(csvPath: string): Promise<void> {
const parser = new CSVParserTransform();
// Validation + transformation step
const validator = new Transform({
objectMode: true,
transform(row: Record<string, string>, _encoding, callback) {
// Validate required fields
if (!row.email || !row.email.includes("@")) {
// Skip invalid rows (log but don't fail)
console.warn(`Skipping invalid row: ${JSON.stringify(row)}`);
callback(); // Don't push โ skip this row
return;
}
// Transform to database shape
this.push({
email: row.email.toLowerCase().trim(),
name: row.name?.trim() || null,
imported_at: new Date().toISOString(),
source: "csv_import",
});
callback();
},
});
const inserter = new BatchInsertWritable({
pool,
tableName: "users",
columns: ["email", "name", "imported_at", "source"],
batchSize: 1000,
});
console.log("Starting import...");
const startTime = Date.now();
// pipeline() manages backpressure and error propagation
await pipeline(
createReadStream(csvPath, { encoding: "utf8" }),
parser,
validator,
inserter
);
const elapsed = ((Date.now() - startTime) / 1000).toFixed(1);
console.log(
`Import complete: ${inserter.totalInserted} users in ${elapsed}s ` +
`(${parser.rowsProcessed} rows processed)`
);
}
// Error handling: pipeline rejects on any stream error
importUsers("users.csv").catch((error) => {
console.error("Import failed:", error);
process.exit(1);
});
Stream vs Buffer: When to Use Each
| Scenario | Use Streams | Use Buffer |
|---|---|---|
| Files > 100MB | โ Always | โ OOM risk |
| Database exports (> 10K rows) | โ Rows one batch at a time | โ Array in memory |
| HTTP response body < 1MB | Fine either way | โ Simpler |
| File compression/decompression | โ Built-in zlib streams | โ |
| Video/audio processing | โ Must stream | โ |
| JSON with known small size | Fine either way | โ Simpler |
| Real-time event processing | โ Transform streams | โ |
See Also
- Background Jobs Architecture โ async job processing
- PostgreSQL JSONB Patterns โ bulk data operations
- Data Pipeline Architecture โ ETL pipelines
- TypeScript Advanced Patterns โ generic stream types
Working With Viprasol
Large-scale data processing โ imports, exports, ETL pipelines, report generation โ requires correct stream handling to avoid memory exhaustion and incorrect error recovery. Our Node.js engineers build production stream pipelines with proper backpressure, batch processing, validation, and error handling for workloads from megabytes to terabytes.
About the Author
Viprasol Tech Team
Custom Software Development Specialists
The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.
Need a Modern Web Application?
From landing pages to complex SaaS platforms โ we build it all with Next.js and React.
Free consultation โข No commitment โข Response within 24 hours
Need a custom web application built?
We build React and Next.js web applications with Lighthouse โฅ90 scores, mobile-first design, and full source code ownership. Senior engineers only โ from architecture through deployment.