SaaS CSV Import in 2026: Parsing, Validation, Background Jobs, and Progress Streaming
Build a production SaaS CSV import pipeline: streaming CSV parse, per-row validation, background job processing, real-time progress via SSE, error reporting, and duplicate detection.
SaaS CSV Import in 2026: Parsing, Validation, Background Jobs, and Progress Streaming
CSV import is one of the most underestimated features in SaaS. Every customer eventually needs to bulk-import contacts, products, transactions, or users. Done naively (upload โ parse โ insert in one HTTP request), it times out, loses progress on errors, and provides no feedback. Done properly, it becomes a competitive advantage.
This post builds a production CSV import pipeline: streaming parse with Papa Parse, per-row Zod validation, job queuing to a background worker, real-time progress via Server-Sent Events, and a detailed error report the user can download.
Architecture Overview
User uploads CSV โ
1. Upload to S3 (not held in memory)
2. Create ImportJob record (status: pending)
3. Enqueue background job
4. Return jobId to client
Client polls progress via SSE โ
Background worker:
5. Download CSV from S3
6. Stream-parse rows
7. Validate each row (Zod)
8. Batch-insert valid rows
9. Collect error rows
10. Update job progress
11. Mark complete, store error report
Database Schema
CREATE TYPE import_status AS ENUM ('pending', 'processing', 'completed', 'failed');
CREATE TABLE import_jobs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workspace_id UUID NOT NULL REFERENCES workspaces(id),
created_by UUID NOT NULL REFERENCES users(id),
entity_type TEXT NOT NULL, -- 'contacts', 'products', etc.
file_key TEXT NOT NULL, -- S3 key
original_name TEXT NOT NULL,
total_rows INTEGER,
processed_rows INTEGER NOT NULL DEFAULT 0,
valid_rows INTEGER NOT NULL DEFAULT 0,
error_rows INTEGER NOT NULL DEFAULT 0,
status import_status NOT NULL DEFAULT 'pending',
error_file_key TEXT, -- S3 key for downloadable error report
error_message TEXT, -- Fatal error if status = 'failed'
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_import_jobs_workspace ON import_jobs(workspace_id, created_at DESC);
๐ SaaS MVP in 8 Weeks โ Seriously
We have launched 50+ SaaS platforms. Multi-tenant architecture, Stripe billing, auth, role-based access, and cloud deployment โ all handled by one senior team.
- Week 1โ2: Architecture design + wireframes
- Week 3โ6: Core features built + tested
- Week 7โ8: Launch-ready on AWS/Vercel with CI/CD
- Post-launch: Maintenance plans from month 3
Upload Route: File to S3
// app/api/import/upload/route.ts
import { NextRequest, NextResponse } from "next/server";
import { getWorkspaceContext } from "@/lib/auth/workspace-context";
import { s3 } from "@/lib/aws/s3";
import { PutObjectCommand } from "@aws-sdk/client-s3";
import { db } from "@/lib/db";
import { queue } from "@/lib/queue";
import { randomUUID } from "crypto";
const MAX_FILE_SIZE = 50 * 1024 * 1024; // 50MB
const ALLOWED_TYPES = ["text/csv", "application/vnd.ms-excel", "text/plain"];
export async function POST(req: NextRequest) {
const ctx = await getWorkspaceContext();
if (!ctx) return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
const formData = await req.formData();
const file = formData.get("file") as File | null;
const entityType = formData.get("entityType") as string | null;
if (!file || !entityType) {
return NextResponse.json({ error: "Missing file or entityType" }, { status: 400 });
}
if (file.size > MAX_FILE_SIZE) {
return NextResponse.json({ error: "File exceeds 50MB limit" }, { status: 413 });
}
if (!ALLOWED_TYPES.includes(file.type) && !file.name.endsWith(".csv")) {
return NextResponse.json({ error: "Only CSV files are supported" }, { status: 415 });
}
// Upload raw file to S3
const fileKey = `imports/${ctx.workspaceId}/${randomUUID()}/${file.name}`;
const buffer = Buffer.from(await file.arrayBuffer());
await s3.send(new PutObjectCommand({
Bucket: process.env.S3_BUCKET!,
Key: fileKey,
Body: buffer,
ContentType: "text/csv",
Metadata: {
workspaceId: ctx.workspaceId,
uploadedBy: ctx.userId,
},
}));
// Create job record
const job = await db.importJob.create({
data: {
workspaceId: ctx.workspaceId,
createdBy: ctx.userId,
entityType,
fileKey,
originalName: file.name,
status: "pending",
},
});
// Enqueue for background processing
await queue.add("process-import", { jobId: job.id }, {
attempts: 3,
backoff: { type: "exponential", delay: 5000 },
});
return NextResponse.json({ jobId: job.id });
}
Background Worker: Parse and Import
// workers/import-worker.ts
import Papa from "papaparse";
import { Readable } from "stream";
import { GetObjectCommand } from "@aws-sdk/client-s3";
import { s3 } from "@/lib/aws/s3";
import { db } from "@/lib/db";
import { getImportSchema } from "./import-schemas";
import { BATCH_SIZE, uploadErrorReport } from "./import-helpers";
export async function processImport(jobId: string) {
const job = await db.importJob.findUnique({ where: { id: jobId } });
if (!job) throw new Error(`Import job ${jobId} not found`);
await db.importJob.update({
where: { id: jobId },
data: { status: "processing", startedAt: new Date() },
});
try {
// Download CSV from S3
const { Body } = await s3.send(new GetObjectCommand({
Bucket: process.env.S3_BUCKET!,
Key: job.fileKey,
}));
const schema = getImportSchema(job.entityType);
const errorRows: Array<{ row: number; data: Record<string, string>; errors: string[] }> = [];
const validBatch: Record<string, unknown>[] = [];
let rowIndex = 0;
let validCount = 0;
let processedCount = 0;
// Stream parse CSV
await new Promise<void>((resolve, reject) => {
Papa.parse(Readable.from(Body as AsyncIterable<Buffer>) as any, {
header: true,
skipEmptyLines: true,
transformHeader: (h) => h.trim().toLowerCase().replace(/\s+/g, "_"),
step: async (results, parser) => {
parser.pause(); // Back-pressure: pause until this row is handled
rowIndex++;
processedCount++;
const raw = results.data as Record<string, string>;
const parsed = schema.safeParse(raw);
if (parsed.success) {
validBatch.push({ ...parsed.data, workspaceId: job.workspaceId });
validCount++;
} else {
errorRows.push({
row: rowIndex,
data: raw,
errors: parsed.error.errors.map((e) => `${e.path.join(".")}: ${e.message}`),
});
}
// Flush valid rows in batches
if (validBatch.length >= BATCH_SIZE) {
await flushBatch(job.entityType, job.workspaceId, [...validBatch]);
validBatch.length = 0;
}
// Update progress every 100 rows
if (processedCount % 100 === 0) {
await db.importJob.update({
where: { id: jobId },
data: { processedRows: processedCount, validRows: validCount, errorRows: errorRows.length },
});
}
parser.resume();
},
complete: resolve,
error: reject,
});
});
// Flush remaining valid rows
if (validBatch.length > 0) {
await flushBatch(job.entityType, job.workspaceId, validBatch);
}
// Upload error report if needed
let errorFileKey: string | undefined;
if (errorRows.length > 0) {
errorFileKey = await uploadErrorReport(job, errorRows);
}
await db.importJob.update({
where: { id: jobId },
data: {
status: "completed",
processedRows: processedCount,
validRows: validCount,
errorRows: errorRows.length,
totalRows: rowIndex,
errorFileKey,
completedAt: new Date(),
},
});
} catch (err) {
await db.importJob.update({
where: { id: jobId },
data: {
status: "failed",
errorMessage: err instanceof Error ? err.message : "Unknown error",
completedAt: new Date(),
},
});
throw err; // Re-throw for queue retry
}
}
async function flushBatch(entityType: string, workspaceId: string, rows: Record<string, unknown>[]) {
if (entityType === "contacts") {
await db.contact.createMany({
data: rows as any,
skipDuplicates: true, // Skip rows with duplicate unique fields
});
}
// Add other entity types here
}
๐ก The Difference Between a SaaS Demo and a SaaS Business
Anyone can build a demo. We build SaaS products that handle real load, real users, and real payments โ with architecture that does not need to be rewritten at 1,000 users.
- Multi-tenant PostgreSQL with row-level security
- Stripe subscriptions, usage billing, annual plans
- SOC2-ready infrastructure from day one
- We own zero equity โ you own everything
Validation Schemas
// workers/import-schemas.ts
import { z } from "zod";
const contactSchema = z.object({
email: z.string().email("Invalid email address"),
first_name: z.string().min(1, "First name is required").max(100),
last_name: z.string().max(100).optional().default(""),
phone: z.string().regex(/^\+?[\d\s\-().]{7,20}$/, "Invalid phone number").optional(),
company: z.string().max(200).optional(),
tags: z.string().optional().transform((v) =>
v ? v.split(",").map((t) => t.trim()).filter(Boolean) : []
),
});
const productSchema = z.object({
name: z.string().min(1).max(255),
sku: z.string().min(1).max(100),
price: z.string()
.transform((v) => parseFloat(v.replace(/[^0-9.]/g, "")))
.pipe(z.number().min(0).max(1_000_000)),
stock: z.string()
.transform((v) => parseInt(v, 10))
.pipe(z.number().int().min(0)),
description: z.string().max(5000).optional().default(""),
});
export function getImportSchema(entityType: string) {
const schemas: Record<string, z.ZodSchema> = {
contacts: contactSchema,
products: productSchema,
};
const schema = schemas[entityType];
if (!schema) throw new Error(`No schema for entity type: ${entityType}`);
return schema;
}
Progress Streaming via SSE
// app/api/import/[jobId]/progress/route.ts
import { NextRequest } from "next/server";
import { getWorkspaceContext } from "@/lib/auth/workspace-context";
import { db } from "@/lib/db";
export async function GET(
req: NextRequest,
{ params }: { params: Promise<{ jobId: string }> }
) {
const { jobId } = await params;
const ctx = await getWorkspaceContext();
if (!ctx) return new Response("Unauthorized", { status: 401 });
const job = await db.importJob.findFirst({
where: { id: jobId, workspaceId: ctx.workspaceId },
});
if (!job) return new Response("Not found", { status: 404 });
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
const send = (data: object) => {
controller.enqueue(
encoder.encode(`data: ${JSON.stringify(data)}\n\n`)
);
};
// Poll DB every 1 second until terminal state
let done = false;
while (!done) {
const current = await db.importJob.findUnique({ where: { id: jobId } });
if (!current) break;
send({
status: current.status,
processedRows: current.processedRows,
totalRows: current.totalRows,
validRows: current.validRows,
errorRows: current.errorRows,
pct: current.totalRows
? Math.round((current.processedRows / current.totalRows) * 100)
: null,
});
if (current.status === "completed" || current.status === "failed") {
done = true;
} else {
await new Promise((r) => setTimeout(r, 1000));
}
}
controller.close();
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}
Frontend: Upload + Progress UI
// components/CsvImport/CsvImportDialog.tsx
"use client";
import { useState, useEffect, useRef } from "react";
import { Upload, CheckCircle, XCircle, AlertCircle } from "lucide-react";
export function CsvImportDialog({ entityType, onComplete }: {
entityType: string;
onComplete: () => void;
}) {
const [file, setFile] = useState<File | null>(null);
const [jobId, setJobId] = useState<string | null>(null);
const [progress, setProgress] = useState<{
status: string;
pct: number | null;
processedRows: number;
validRows: number;
errorRows: number;
} | null>(null);
const [uploading, setUploading] = useState(false);
const eventSourceRef = useRef<EventSource | null>(null);
const handleUpload = async () => {
if (!file) return;
setUploading(true);
const formData = new FormData();
formData.append("file", file);
formData.append("entityType", entityType);
const res = await fetch("/api/import/upload", { method: "POST", body: formData });
const { jobId: id } = await res.json();
setJobId(id);
setUploading(false);
};
useEffect(() => {
if (!jobId) return;
// Connect SSE
const es = new EventSource(`/api/import/${jobId}/progress`);
eventSourceRef.current = es;
es.onmessage = (e) => {
const data = JSON.parse(e.data);
setProgress(data);
if (data.status === "completed" || data.status === "failed") {
es.close();
if (data.status === "completed" && data.errorRows === 0) {
setTimeout(onComplete, 1500); // Auto-close after success
}
}
};
es.onerror = () => es.close();
return () => es.close();
}, [jobId]);
return (
<div className="space-y-4 p-6">
{!jobId && (
<>
<label className="block cursor-pointer">
<div className="border-2 border-dashed border-gray-200 rounded-xl p-8 text-center hover:border-blue-400 transition-colors">
<Upload className="mx-auto h-8 w-8 text-gray-400 mb-2" />
<p className="text-sm text-gray-600">
{file ? file.name : "Click to upload CSV"}
</p>
</div>
<input
type="file"
accept=".csv"
className="sr-only"
onChange={(e) => setFile(e.target.files?.[0] ?? null)}
/>
</label>
<button
onClick={handleUpload}
disabled={!file || uploading}
className="w-full px-4 py-2 bg-blue-600 text-white rounded-lg text-sm font-medium disabled:opacity-50"
>
{uploading ? "Uploading..." : "Start Import"}
</button>
</>
)}
{progress && (
<div className="space-y-4">
{/* Progress bar */}
{progress.pct !== null && (
<div>
<div className="flex justify-between text-xs text-gray-500 mb-1">
<span>Processing rows...</span>
<span>{progress.pct}%</span>
</div>
<div className="h-2 bg-gray-100 rounded-full overflow-hidden">
<div
className="h-full bg-blue-500 transition-all duration-500"
style={{ width: `${progress.pct}%` }}
/>
</div>
</div>
)}
{/* Stats */}
<div className="grid grid-cols-3 gap-3">
<div className="text-center p-3 bg-gray-50 rounded-lg">
<p className="text-lg font-bold text-gray-900">{progress.processedRows}</p>
<p className="text-xs text-gray-500">Processed</p>
</div>
<div className="text-center p-3 bg-green-50 rounded-lg">
<p className="text-lg font-bold text-green-700">{progress.validRows}</p>
<p className="text-xs text-green-600">Imported</p>
</div>
<div className="text-center p-3 bg-red-50 rounded-lg">
<p className="text-lg font-bold text-red-700">{progress.errorRows}</p>
<p className="text-xs text-red-600">Errors</p>
</div>
</div>
{/* Terminal states */}
{progress.status === "completed" && (
<div className="flex items-center gap-2 text-green-700 text-sm font-medium">
<CheckCircle className="h-5 w-5" />
Import complete โ {progress.validRows} rows imported
{progress.errorRows > 0 && (
<a href={`/api/import/${jobId}/errors`} download className="underline ml-2">
Download {progress.errorRows} error rows
</a>
)}
</div>
)}
{progress.status === "failed" && (
<div className="flex items-center gap-2 text-red-600 text-sm font-medium">
<XCircle className="h-5 w-5" />
Import failed โ please try again
</div>
)}
</div>
)}
</div>
);
}
Cost and Timeline
| Component | Timeline | Cost (USD) |
|---|---|---|
| Upload to S3 + job creation | 0.5โ1 day | $400โ$800 |
| Background worker + streaming parse | 1โ2 days | $800โ$1,600 |
| Zod validation schema per entity | 0.5 day each | $300โ$500 each |
| SSE progress endpoint | 0.5 day | $300โ$500 |
| Frontend upload + progress UI | 1 day | $600โ$1,000 |
| Error report download | 0.5 day | $300โ$500 |
| Full import pipeline (1 entity type) | 1โ2 weeks | $6,000โ$12,000 |
See Also
- SaaS Activity Feed โ Showing import events in activity timeline
- AWS Secrets Manager โ Managing S3 credentials securely
- PostgreSQL Connection Pooling โ Bulk inserts and connection limits
- SaaS Webhook System โ Notifying external systems after import
Working With Viprasol
We build CSV import pipelines for SaaS products โ from simple single-entity imports through complex multi-entity pipelines with cross-reference validation and transformation rules. Our team has shipped import systems that handle 500,000+ rows without timeouts.
What we deliver:
- S3-based file upload with size and type validation
- Streaming CSV parse with Papa Parse (no memory limit)
- Per-row Zod validation with detailed error collection
- Background worker with batch insert and duplicate skip
- Real-time SSE progress with downloadable error report
Explore our SaaS development services or contact us to build your import pipeline.
About the Author
Viprasol Tech Team
Custom Software Development Specialists
The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.
Building a SaaS Product?
We've helped launch 50+ SaaS platforms. Let's build yours โ fast.
Free consultation โข No commitment โข Response within 24 hours
Add AI automation to your SaaS product?
Viprasol builds custom AI agent crews that plug into any SaaS workflow โ automating repetitive tasks, qualifying leads, and responding across every channel your customers use.