Back to Blog

SaaS CSV Import in 2026: Parsing, Validation, Background Jobs, and Progress Streaming

Build a production SaaS CSV import pipeline: streaming CSV parse, per-row validation, background job processing, real-time progress via SSE, error reporting, and duplicate detection.

Viprasol Tech Team
February 7, 2027
14 min read

SaaS CSV Import in 2026: Parsing, Validation, Background Jobs, and Progress Streaming

CSV import is one of the most underestimated features in SaaS. Every customer eventually needs to bulk-import contacts, products, transactions, or users. Done naively (upload → parse → insert in one HTTP request), it times out, loses progress on errors, and provides no feedback. Done properly, it becomes a competitive advantage.

This post builds a production CSV import pipeline: streaming parse with Papa Parse, per-row Zod validation, job queuing to a background worker, real-time progress via Server-Sent Events, and a detailed error report the user can download.


Architecture Overview

User uploads CSV →
  1. Upload to S3 (not held in memory)
  2. Create ImportJob record (status: pending)
  3. Enqueue background job
  4. Return jobId to client

Client polls progress via SSE →
  Background worker:
    5. Download CSV from S3
    6. Stream-parse rows
    7. Validate each row (Zod)
    8. Batch-insert valid rows
    9. Collect error rows
    10. Update job progress
    11. Mark complete, store error report

Database Schema

CREATE TYPE import_status AS ENUM ('pending', 'processing', 'completed', 'failed');

CREATE TABLE import_jobs (
  id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  workspace_id    UUID NOT NULL REFERENCES workspaces(id),
  created_by      UUID NOT NULL REFERENCES users(id),
  entity_type     TEXT NOT NULL,     -- 'contacts', 'products', etc.
  file_key        TEXT NOT NULL,     -- S3 key
  original_name   TEXT NOT NULL,
  total_rows      INTEGER,
  processed_rows  INTEGER NOT NULL DEFAULT 0,
  valid_rows      INTEGER NOT NULL DEFAULT 0,
  error_rows      INTEGER NOT NULL DEFAULT 0,
  status          import_status NOT NULL DEFAULT 'pending',
  error_file_key  TEXT,              -- S3 key for downloadable error report
  error_message   TEXT,              -- Fatal error if status = 'failed'
  started_at      TIMESTAMPTZ,
  completed_at    TIMESTAMPTZ,
  created_at      TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX idx_import_jobs_workspace ON import_jobs(workspace_id, created_at DESC);

🚀 SaaS MVP in 8 Weeks — Seriously

We have launched 50+ SaaS platforms. Multi-tenant architecture, Stripe billing, auth, role-based access, and cloud deployment — all handled by one senior team.

  • Week 1–2: Architecture design + wireframes
  • Week 3–6: Core features built + tested
  • Week 7–8: Launch-ready on AWS/Vercel with CI/CD
  • Post-launch: Maintenance plans from month 3

Upload Route: File to S3

// app/api/import/upload/route.ts
import { NextRequest, NextResponse } from "next/server";
import { getWorkspaceContext } from "@/lib/auth/workspace-context";
import { s3 } from "@/lib/aws/s3";
import { PutObjectCommand } from "@aws-sdk/client-s3";
import { db } from "@/lib/db";
import { queue } from "@/lib/queue";
import { randomUUID } from "crypto";

const MAX_FILE_SIZE = 50 * 1024 * 1024; // 50MB
const ALLOWED_TYPES = ["text/csv", "application/vnd.ms-excel", "text/plain"];

export async function POST(req: NextRequest) {
  const ctx = await getWorkspaceContext();
  if (!ctx) return NextResponse.json({ error: "Unauthorized" }, { status: 401 });

  const formData = await req.formData();
  const file = formData.get("file") as File | null;
  const entityType = formData.get("entityType") as string | null;

  if (!file || !entityType) {
    return NextResponse.json({ error: "Missing file or entityType" }, { status: 400 });
  }
  if (file.size > MAX_FILE_SIZE) {
    return NextResponse.json({ error: "File exceeds 50MB limit" }, { status: 413 });
  }
  if (!ALLOWED_TYPES.includes(file.type) && !file.name.endsWith(".csv")) {
    return NextResponse.json({ error: "Only CSV files are supported" }, { status: 415 });
  }

  // Upload raw file to S3
  const fileKey = `imports/${ctx.workspaceId}/${randomUUID()}/${file.name}`;
  const buffer = Buffer.from(await file.arrayBuffer());

  await s3.send(new PutObjectCommand({
    Bucket: process.env.S3_BUCKET!,
    Key: fileKey,
    Body: buffer,
    ContentType: "text/csv",
    Metadata: {
      workspaceId: ctx.workspaceId,
      uploadedBy: ctx.userId,
    },
  }));

  // Create job record
  const job = await db.importJob.create({
    data: {
      workspaceId: ctx.workspaceId,
      createdBy: ctx.userId,
      entityType,
      fileKey,
      originalName: file.name,
      status: "pending",
    },
  });

  // Enqueue for background processing
  await queue.add("process-import", { jobId: job.id }, {
    attempts: 3,
    backoff: { type: "exponential", delay: 5000 },
  });

  return NextResponse.json({ jobId: job.id });
}

Background Worker: Parse and Import

// workers/import-worker.ts
import Papa from "papaparse";
import { Readable } from "stream";
import { GetObjectCommand } from "@aws-sdk/client-s3";
import { s3 } from "@/lib/aws/s3";
import { db } from "@/lib/db";
import { getImportSchema } from "./import-schemas";
import { BATCH_SIZE, uploadErrorReport } from "./import-helpers";

export async function processImport(jobId: string) {
  const job = await db.importJob.findUnique({ where: { id: jobId } });
  if (!job) throw new Error(`Import job ${jobId} not found`);

  await db.importJob.update({
    where: { id: jobId },
    data: { status: "processing", startedAt: new Date() },
  });

  try {
    // Download CSV from S3
    const { Body } = await s3.send(new GetObjectCommand({
      Bucket: process.env.S3_BUCKET!,
      Key: job.fileKey,
    }));

    const schema = getImportSchema(job.entityType);
    const errorRows: Array<{ row: number; data: Record<string, string>; errors: string[] }> = [];
    const validBatch: Record<string, unknown>[] = [];
    let rowIndex = 0;
    let validCount = 0;
    let processedCount = 0;

    // Stream parse CSV
    await new Promise<void>((resolve, reject) => {
      Papa.parse(Readable.from(Body as AsyncIterable<Buffer>) as any, {
        header: true,
        skipEmptyLines: true,
        transformHeader: (h) => h.trim().toLowerCase().replace(/\s+/g, "_"),
        step: async (results, parser) => {
          parser.pause(); // Back-pressure: pause until this row is handled
          rowIndex++;
          processedCount++;

          const raw = results.data as Record<string, string>;
          const parsed = schema.safeParse(raw);

          if (parsed.success) {
            validBatch.push({ ...parsed.data, workspaceId: job.workspaceId });
            validCount++;
          } else {
            errorRows.push({
              row: rowIndex,
              data: raw,
              errors: parsed.error.errors.map((e) => `${e.path.join(".")}: ${e.message}`),
            });
          }

          // Flush valid rows in batches
          if (validBatch.length >= BATCH_SIZE) {
            await flushBatch(job.entityType, job.workspaceId, [...validBatch]);
            validBatch.length = 0;
          }

          // Update progress every 100 rows
          if (processedCount % 100 === 0) {
            await db.importJob.update({
              where: { id: jobId },
              data: { processedRows: processedCount, validRows: validCount, errorRows: errorRows.length },
            });
          }

          parser.resume();
        },
        complete: resolve,
        error: reject,
      });
    });

    // Flush remaining valid rows
    if (validBatch.length > 0) {
      await flushBatch(job.entityType, job.workspaceId, validBatch);
    }

    // Upload error report if needed
    let errorFileKey: string | undefined;
    if (errorRows.length > 0) {
      errorFileKey = await uploadErrorReport(job, errorRows);
    }

    await db.importJob.update({
      where: { id: jobId },
      data: {
        status: "completed",
        processedRows: processedCount,
        validRows: validCount,
        errorRows: errorRows.length,
        totalRows: rowIndex,
        errorFileKey,
        completedAt: new Date(),
      },
    });
  } catch (err) {
    await db.importJob.update({
      where: { id: jobId },
      data: {
        status: "failed",
        errorMessage: err instanceof Error ? err.message : "Unknown error",
        completedAt: new Date(),
      },
    });
    throw err; // Re-throw for queue retry
  }
}

async function flushBatch(entityType: string, workspaceId: string, rows: Record<string, unknown>[]) {
  if (entityType === "contacts") {
    await db.contact.createMany({
      data: rows as any,
      skipDuplicates: true, // Skip rows with duplicate unique fields
    });
  }
  // Add other entity types here
}

SaaS - SaaS CSV Import in 2026: Parsing, Validation, Background Jobs, and Progress Streaming

💡 The Difference Between a SaaS Demo and a SaaS Business

Anyone can build a demo. We build SaaS products that handle real load, real users, and real payments — with architecture that does not need to be rewritten at 1,000 users.

  • Multi-tenant PostgreSQL with row-level security
  • Stripe subscriptions, usage billing, annual plans
  • SOC2-ready infrastructure from day one
  • We own zero equity — you own everything

Validation Schemas

// workers/import-schemas.ts
import { z } from "zod";

const contactSchema = z.object({
  email: z.string().email("Invalid email address"),
  first_name: z.string().min(1, "First name is required").max(100),
  last_name: z.string().max(100).optional().default(""),
  phone: z.string().regex(/^\+?[\d\s\-().]{7,20}$/, "Invalid phone number").optional(),
  company: z.string().max(200).optional(),
  tags: z.string().optional().transform((v) =>
    v ? v.split(",").map((t) => t.trim()).filter(Boolean) : []
  ),
});

const productSchema = z.object({
  name: z.string().min(1).max(255),
  sku: z.string().min(1).max(100),
  price: z.string()
    .transform((v) => parseFloat(v.replace(/[^0-9.]/g, "")))
    .pipe(z.number().min(0).max(1_000_000)),
  stock: z.string()
    .transform((v) => parseInt(v, 10))
    .pipe(z.number().int().min(0)),
  description: z.string().max(5000).optional().default(""),
});

export function getImportSchema(entityType: string) {
  const schemas: Record<string, z.ZodSchema> = {
    contacts: contactSchema,
    products: productSchema,
  };
  const schema = schemas[entityType];
  if (!schema) throw new Error(`No schema for entity type: ${entityType}`);
  return schema;
}

Progress Streaming via SSE

// app/api/import/[jobId]/progress/route.ts
import { NextRequest } from "next/server";
import { getWorkspaceContext } from "@/lib/auth/workspace-context";
import { db } from "@/lib/db";

export async function GET(
  req: NextRequest,
  { params }: { params: Promise<{ jobId: string }> }
) {
  const { jobId } = await params;
  const ctx = await getWorkspaceContext();
  if (!ctx) return new Response("Unauthorized", { status: 401 });

  const job = await db.importJob.findFirst({
    where: { id: jobId, workspaceId: ctx.workspaceId },
  });
  if (!job) return new Response("Not found", { status: 404 });

  const encoder = new TextEncoder();

  const stream = new ReadableStream({
    async start(controller) {
      const send = (data: object) => {
        controller.enqueue(
          encoder.encode(`data: ${JSON.stringify(data)}\n\n`)
        );
      };

      // Poll DB every 1 second until terminal state
      let done = false;
      while (!done) {
        const current = await db.importJob.findUnique({ where: { id: jobId } });
        if (!current) break;

        send({
          status: current.status,
          processedRows: current.processedRows,
          totalRows: current.totalRows,
          validRows: current.validRows,
          errorRows: current.errorRows,
          pct: current.totalRows
            ? Math.round((current.processedRows / current.totalRows) * 100)
            : null,
        });

        if (current.status === "completed" || current.status === "failed") {
          done = true;
        } else {
          await new Promise((r) => setTimeout(r, 1000));
        }
      }

      controller.close();
    },
  });

  return new Response(stream, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      Connection: "keep-alive",
    },
  });
}

Frontend: Upload + Progress UI

// components/CsvImport/CsvImportDialog.tsx
"use client";

import { useState, useEffect, useRef } from "react";
import { Upload, CheckCircle, XCircle, AlertCircle } from "lucide-react";

export function CsvImportDialog({ entityType, onComplete }: {
  entityType: string;
  onComplete: () => void;
}) {
  const [file, setFile] = useState<File | null>(null);
  const [jobId, setJobId] = useState<string | null>(null);
  const [progress, setProgress] = useState<{
    status: string;
    pct: number | null;
    processedRows: number;
    validRows: number;
    errorRows: number;
  } | null>(null);
  const [uploading, setUploading] = useState(false);
  const eventSourceRef = useRef<EventSource | null>(null);

  const handleUpload = async () => {
    if (!file) return;
    setUploading(true);

    const formData = new FormData();
    formData.append("file", file);
    formData.append("entityType", entityType);

    const res = await fetch("/api/import/upload", { method: "POST", body: formData });
    const { jobId: id } = await res.json();
    setJobId(id);
    setUploading(false);
  };

  useEffect(() => {
    if (!jobId) return;

    // Connect SSE
    const es = new EventSource(`/api/import/${jobId}/progress`);
    eventSourceRef.current = es;

    es.onmessage = (e) => {
      const data = JSON.parse(e.data);
      setProgress(data);

      if (data.status === "completed" || data.status === "failed") {
        es.close();
        if (data.status === "completed" && data.errorRows === 0) {
          setTimeout(onComplete, 1500); // Auto-close after success
        }
      }
    };

    es.onerror = () => es.close();
    return () => es.close();
  }, [jobId]);

  return (
    <div className="space-y-4 p-6">
      {!jobId && (
        <>
          <label className="block cursor-pointer">
            <div className="border-2 border-dashed border-gray-200 rounded-xl p-8 text-center hover:border-blue-400 transition-colors">
              <Upload className="mx-auto h-8 w-8 text-gray-400 mb-2" />
              <p className="text-sm text-gray-600">
                {file ? file.name : "Click to upload CSV"}
              </p>
            </div>
            <input
              type="file"
              accept=".csv"
              className="sr-only"
              onChange={(e) => setFile(e.target.files?.[0] ?? null)}
            />
          </label>

          <button
            onClick={handleUpload}
            disabled={!file || uploading}
            className="w-full px-4 py-2 bg-blue-600 text-white rounded-lg text-sm font-medium disabled:opacity-50"
          >
            {uploading ? "Uploading..." : "Start Import"}
          </button>
        </>
      )}

      {progress && (
        <div className="space-y-4">
          {/* Progress bar */}
          {progress.pct !== null && (
            <div>
              <div className="flex justify-between text-xs text-gray-500 mb-1">
                <span>Processing rows...</span>
                <span>{progress.pct}%</span>
              </div>
              <div className="h-2 bg-gray-100 rounded-full overflow-hidden">
                <div
                  className="h-full bg-blue-500 transition-all duration-500"
                  style={{ width: `${progress.pct}%` }}
                />
              </div>
            </div>
          )}

          {/* Stats */}
          <div className="grid grid-cols-3 gap-3">
            <div className="text-center p-3 bg-gray-50 rounded-lg">
              <p className="text-lg font-bold text-gray-900">{progress.processedRows}</p>
              <p className="text-xs text-gray-500">Processed</p>
            </div>
            <div className="text-center p-3 bg-green-50 rounded-lg">
              <p className="text-lg font-bold text-green-700">{progress.validRows}</p>
              <p className="text-xs text-green-600">Imported</p>
            </div>
            <div className="text-center p-3 bg-red-50 rounded-lg">
              <p className="text-lg font-bold text-red-700">{progress.errorRows}</p>
              <p className="text-xs text-red-600">Errors</p>
            </div>
          </div>

          {/* Terminal states */}
          {progress.status === "completed" && (
            <div className="flex items-center gap-2 text-green-700 text-sm font-medium">
              <CheckCircle className="h-5 w-5" />
              Import complete — {progress.validRows} rows imported
              {progress.errorRows > 0 && (
                <a href={`/api/import/${jobId}/errors`} download className="underline ml-2">
                  Download {progress.errorRows} error rows
                </a>
              )}
            </div>
          )}
          {progress.status === "failed" && (
            <div className="flex items-center gap-2 text-red-600 text-sm font-medium">
              <XCircle className="h-5 w-5" />
              Import failed — please try again
            </div>
          )}
        </div>
      )}
    </div>
  );
}

Cost and Timeline

ComponentTimelineCost (USD)
Upload to S3 + job creation0.5–1 day$400–$800
Background worker + streaming parse1–2 days$800–$1,600
Zod validation schema per entity0.5 day each$300–$500 each
SSE progress endpoint0.5 day$300–$500
Frontend upload + progress UI1 day$600–$1,000
Error report download0.5 day$300–$500
Full import pipeline (1 entity type)1–2 weeks$6,000–$12,000

Related Topics


Viprasol in Action

We build CSV import pipelines for SaaS products — from simple single-entity imports through complex multi-entity pipelines with cross-reference validation and transformation rules. Our team has shipped import systems that handle 500,000+ rows without timeouts.

What we deliver:

  • S3-based file upload with size and type validation
  • Streaming CSV parse with Papa Parse (no memory limit)
  • Per-row Zod validation with detailed error collection
  • Background worker with batch insert and duplicate skip
  • Real-time SSE progress with downloadable error report

Explore our SaaS development services or contact us to build your import pipeline.

SaaSTypeScriptPostgreSQLNode.jsBackground JobsUX
Share this article:

About the Author

V

Viprasol Tech Team

Custom Software Development Specialists

The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 1000+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement.

MT4/MT5 EA DevelopmentAI Agent SystemsSaaS DevelopmentAlgorithmic Trading

Building a SaaS Product?

We've helped launch 50+ SaaS platforms. Let's build yours — fast.

Free consultation • No commitment • Response within 24 hours

Viprasol · AI Agent Systems

Add AI automation to your SaaS product?

Viprasol builds custom AI agent crews that plug into any SaaS workflow — automating repetitive tasks, qualifying leads, and responding across every channel your customers use.