Back to Blog

SaaS CSV Import in 2026: Parsing, Validation, Background Jobs, and Progress Streaming

Build a production SaaS CSV import pipeline: streaming CSV parse, per-row validation, background job processing, real-time progress via SSE, error reporting, and duplicate detection.

Viprasol Tech Team
February 7, 2027
14 min read

SaaS CSV Import in 2026: Parsing, Validation, Background Jobs, and Progress Streaming

CSV import is one of the most underestimated features in SaaS. Every customer eventually needs to bulk-import contacts, products, transactions, or users. Done naively (upload โ†’ parse โ†’ insert in one HTTP request), it times out, loses progress on errors, and provides no feedback. Done properly, it becomes a competitive advantage.

This post builds a production CSV import pipeline: streaming parse with Papa Parse, per-row Zod validation, job queuing to a background worker, real-time progress via Server-Sent Events, and a detailed error report the user can download.


Architecture Overview

User uploads CSV โ†’
  1. Upload to S3 (not held in memory)
  2. Create ImportJob record (status: pending)
  3. Enqueue background job
  4. Return jobId to client

Client polls progress via SSE โ†’
  Background worker:
    5. Download CSV from S3
    6. Stream-parse rows
    7. Validate each row (Zod)
    8. Batch-insert valid rows
    9. Collect error rows
    10. Update job progress
    11. Mark complete, store error report

Database Schema

CREATE TYPE import_status AS ENUM ('pending', 'processing', 'completed', 'failed');

CREATE TABLE import_jobs (
  id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  workspace_id    UUID NOT NULL REFERENCES workspaces(id),
  created_by      UUID NOT NULL REFERENCES users(id),
  entity_type     TEXT NOT NULL,     -- 'contacts', 'products', etc.
  file_key        TEXT NOT NULL,     -- S3 key
  original_name   TEXT NOT NULL,
  total_rows      INTEGER,
  processed_rows  INTEGER NOT NULL DEFAULT 0,
  valid_rows      INTEGER NOT NULL DEFAULT 0,
  error_rows      INTEGER NOT NULL DEFAULT 0,
  status          import_status NOT NULL DEFAULT 'pending',
  error_file_key  TEXT,              -- S3 key for downloadable error report
  error_message   TEXT,              -- Fatal error if status = 'failed'
  started_at      TIMESTAMPTZ,
  completed_at    TIMESTAMPTZ,
  created_at      TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX idx_import_jobs_workspace ON import_jobs(workspace_id, created_at DESC);

๐Ÿš€ SaaS MVP in 8 Weeks โ€” Seriously

We have launched 50+ SaaS platforms. Multi-tenant architecture, Stripe billing, auth, role-based access, and cloud deployment โ€” all handled by one senior team.

  • Week 1โ€“2: Architecture design + wireframes
  • Week 3โ€“6: Core features built + tested
  • Week 7โ€“8: Launch-ready on AWS/Vercel with CI/CD
  • Post-launch: Maintenance plans from month 3

Upload Route: File to S3

// app/api/import/upload/route.ts
import { NextRequest, NextResponse } from "next/server";
import { getWorkspaceContext } from "@/lib/auth/workspace-context";
import { s3 } from "@/lib/aws/s3";
import { PutObjectCommand } from "@aws-sdk/client-s3";
import { db } from "@/lib/db";
import { queue } from "@/lib/queue";
import { randomUUID } from "crypto";

const MAX_FILE_SIZE = 50 * 1024 * 1024; // 50MB
const ALLOWED_TYPES = ["text/csv", "application/vnd.ms-excel", "text/plain"];

export async function POST(req: NextRequest) {
  const ctx = await getWorkspaceContext();
  if (!ctx) return NextResponse.json({ error: "Unauthorized" }, { status: 401 });

  const formData = await req.formData();
  const file = formData.get("file") as File | null;
  const entityType = formData.get("entityType") as string | null;

  if (!file || !entityType) {
    return NextResponse.json({ error: "Missing file or entityType" }, { status: 400 });
  }
  if (file.size > MAX_FILE_SIZE) {
    return NextResponse.json({ error: "File exceeds 50MB limit" }, { status: 413 });
  }
  if (!ALLOWED_TYPES.includes(file.type) && !file.name.endsWith(".csv")) {
    return NextResponse.json({ error: "Only CSV files are supported" }, { status: 415 });
  }

  // Upload raw file to S3
  const fileKey = `imports/${ctx.workspaceId}/${randomUUID()}/${file.name}`;
  const buffer = Buffer.from(await file.arrayBuffer());

  await s3.send(new PutObjectCommand({
    Bucket: process.env.S3_BUCKET!,
    Key: fileKey,
    Body: buffer,
    ContentType: "text/csv",
    Metadata: {
      workspaceId: ctx.workspaceId,
      uploadedBy: ctx.userId,
    },
  }));

  // Create job record
  const job = await db.importJob.create({
    data: {
      workspaceId: ctx.workspaceId,
      createdBy: ctx.userId,
      entityType,
      fileKey,
      originalName: file.name,
      status: "pending",
    },
  });

  // Enqueue for background processing
  await queue.add("process-import", { jobId: job.id }, {
    attempts: 3,
    backoff: { type: "exponential", delay: 5000 },
  });

  return NextResponse.json({ jobId: job.id });
}

Background Worker: Parse and Import

// workers/import-worker.ts
import Papa from "papaparse";
import { Readable } from "stream";
import { GetObjectCommand } from "@aws-sdk/client-s3";
import { s3 } from "@/lib/aws/s3";
import { db } from "@/lib/db";
import { getImportSchema } from "./import-schemas";
import { BATCH_SIZE, uploadErrorReport } from "./import-helpers";

export async function processImport(jobId: string) {
  const job = await db.importJob.findUnique({ where: { id: jobId } });
  if (!job) throw new Error(`Import job ${jobId} not found`);

  await db.importJob.update({
    where: { id: jobId },
    data: { status: "processing", startedAt: new Date() },
  });

  try {
    // Download CSV from S3
    const { Body } = await s3.send(new GetObjectCommand({
      Bucket: process.env.S3_BUCKET!,
      Key: job.fileKey,
    }));

    const schema = getImportSchema(job.entityType);
    const errorRows: Array<{ row: number; data: Record<string, string>; errors: string[] }> = [];
    const validBatch: Record<string, unknown>[] = [];
    let rowIndex = 0;
    let validCount = 0;
    let processedCount = 0;

    // Stream parse CSV
    await new Promise<void>((resolve, reject) => {
      Papa.parse(Readable.from(Body as AsyncIterable<Buffer>) as any, {
        header: true,
        skipEmptyLines: true,
        transformHeader: (h) => h.trim().toLowerCase().replace(/\s+/g, "_"),
        step: async (results, parser) => {
          parser.pause(); // Back-pressure: pause until this row is handled
          rowIndex++;
          processedCount++;

          const raw = results.data as Record<string, string>;
          const parsed = schema.safeParse(raw);

          if (parsed.success) {
            validBatch.push({ ...parsed.data, workspaceId: job.workspaceId });
            validCount++;
          } else {
            errorRows.push({
              row: rowIndex,
              data: raw,
              errors: parsed.error.errors.map((e) => `${e.path.join(".")}: ${e.message}`),
            });
          }

          // Flush valid rows in batches
          if (validBatch.length >= BATCH_SIZE) {
            await flushBatch(job.entityType, job.workspaceId, [...validBatch]);
            validBatch.length = 0;
          }

          // Update progress every 100 rows
          if (processedCount % 100 === 0) {
            await db.importJob.update({
              where: { id: jobId },
              data: { processedRows: processedCount, validRows: validCount, errorRows: errorRows.length },
            });
          }

          parser.resume();
        },
        complete: resolve,
        error: reject,
      });
    });

    // Flush remaining valid rows
    if (validBatch.length > 0) {
      await flushBatch(job.entityType, job.workspaceId, validBatch);
    }

    // Upload error report if needed
    let errorFileKey: string | undefined;
    if (errorRows.length > 0) {
      errorFileKey = await uploadErrorReport(job, errorRows);
    }

    await db.importJob.update({
      where: { id: jobId },
      data: {
        status: "completed",
        processedRows: processedCount,
        validRows: validCount,
        errorRows: errorRows.length,
        totalRows: rowIndex,
        errorFileKey,
        completedAt: new Date(),
      },
    });
  } catch (err) {
    await db.importJob.update({
      where: { id: jobId },
      data: {
        status: "failed",
        errorMessage: err instanceof Error ? err.message : "Unknown error",
        completedAt: new Date(),
      },
    });
    throw err; // Re-throw for queue retry
  }
}

async function flushBatch(entityType: string, workspaceId: string, rows: Record<string, unknown>[]) {
  if (entityType === "contacts") {
    await db.contact.createMany({
      data: rows as any,
      skipDuplicates: true, // Skip rows with duplicate unique fields
    });
  }
  // Add other entity types here
}

๐Ÿ’ก The Difference Between a SaaS Demo and a SaaS Business

Anyone can build a demo. We build SaaS products that handle real load, real users, and real payments โ€” with architecture that does not need to be rewritten at 1,000 users.

  • Multi-tenant PostgreSQL with row-level security
  • Stripe subscriptions, usage billing, annual plans
  • SOC2-ready infrastructure from day one
  • We own zero equity โ€” you own everything

Validation Schemas

// workers/import-schemas.ts
import { z } from "zod";

const contactSchema = z.object({
  email: z.string().email("Invalid email address"),
  first_name: z.string().min(1, "First name is required").max(100),
  last_name: z.string().max(100).optional().default(""),
  phone: z.string().regex(/^\+?[\d\s\-().]{7,20}$/, "Invalid phone number").optional(),
  company: z.string().max(200).optional(),
  tags: z.string().optional().transform((v) =>
    v ? v.split(",").map((t) => t.trim()).filter(Boolean) : []
  ),
});

const productSchema = z.object({
  name: z.string().min(1).max(255),
  sku: z.string().min(1).max(100),
  price: z.string()
    .transform((v) => parseFloat(v.replace(/[^0-9.]/g, "")))
    .pipe(z.number().min(0).max(1_000_000)),
  stock: z.string()
    .transform((v) => parseInt(v, 10))
    .pipe(z.number().int().min(0)),
  description: z.string().max(5000).optional().default(""),
});

export function getImportSchema(entityType: string) {
  const schemas: Record<string, z.ZodSchema> = {
    contacts: contactSchema,
    products: productSchema,
  };
  const schema = schemas[entityType];
  if (!schema) throw new Error(`No schema for entity type: ${entityType}`);
  return schema;
}

Progress Streaming via SSE

// app/api/import/[jobId]/progress/route.ts
import { NextRequest } from "next/server";
import { getWorkspaceContext } from "@/lib/auth/workspace-context";
import { db } from "@/lib/db";

export async function GET(
  req: NextRequest,
  { params }: { params: Promise<{ jobId: string }> }
) {
  const { jobId } = await params;
  const ctx = await getWorkspaceContext();
  if (!ctx) return new Response("Unauthorized", { status: 401 });

  const job = await db.importJob.findFirst({
    where: { id: jobId, workspaceId: ctx.workspaceId },
  });
  if (!job) return new Response("Not found", { status: 404 });

  const encoder = new TextEncoder();

  const stream = new ReadableStream({
    async start(controller) {
      const send = (data: object) => {
        controller.enqueue(
          encoder.encode(`data: ${JSON.stringify(data)}\n\n`)
        );
      };

      // Poll DB every 1 second until terminal state
      let done = false;
      while (!done) {
        const current = await db.importJob.findUnique({ where: { id: jobId } });
        if (!current) break;

        send({
          status: current.status,
          processedRows: current.processedRows,
          totalRows: current.totalRows,
          validRows: current.validRows,
          errorRows: current.errorRows,
          pct: current.totalRows
            ? Math.round((current.processedRows / current.totalRows) * 100)
            : null,
        });

        if (current.status === "completed" || current.status === "failed") {
          done = true;
        } else {
          await new Promise((r) => setTimeout(r, 1000));
        }
      }

      controller.close();
    },
  });

  return new Response(stream, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      Connection: "keep-alive",
    },
  });
}

Frontend: Upload + Progress UI

// components/CsvImport/CsvImportDialog.tsx
"use client";

import { useState, useEffect, useRef } from "react";
import { Upload, CheckCircle, XCircle, AlertCircle } from "lucide-react";

export function CsvImportDialog({ entityType, onComplete }: {
  entityType: string;
  onComplete: () => void;
}) {
  const [file, setFile] = useState<File | null>(null);
  const [jobId, setJobId] = useState<string | null>(null);
  const [progress, setProgress] = useState<{
    status: string;
    pct: number | null;
    processedRows: number;
    validRows: number;
    errorRows: number;
  } | null>(null);
  const [uploading, setUploading] = useState(false);
  const eventSourceRef = useRef<EventSource | null>(null);

  const handleUpload = async () => {
    if (!file) return;
    setUploading(true);

    const formData = new FormData();
    formData.append("file", file);
    formData.append("entityType", entityType);

    const res = await fetch("/api/import/upload", { method: "POST", body: formData });
    const { jobId: id } = await res.json();
    setJobId(id);
    setUploading(false);
  };

  useEffect(() => {
    if (!jobId) return;

    // Connect SSE
    const es = new EventSource(`/api/import/${jobId}/progress`);
    eventSourceRef.current = es;

    es.onmessage = (e) => {
      const data = JSON.parse(e.data);
      setProgress(data);

      if (data.status === "completed" || data.status === "failed") {
        es.close();
        if (data.status === "completed" && data.errorRows === 0) {
          setTimeout(onComplete, 1500); // Auto-close after success
        }
      }
    };

    es.onerror = () => es.close();
    return () => es.close();
  }, [jobId]);

  return (
    <div className="space-y-4 p-6">
      {!jobId && (
        <>
          <label className="block cursor-pointer">
            <div className="border-2 border-dashed border-gray-200 rounded-xl p-8 text-center hover:border-blue-400 transition-colors">
              <Upload className="mx-auto h-8 w-8 text-gray-400 mb-2" />
              <p className="text-sm text-gray-600">
                {file ? file.name : "Click to upload CSV"}
              </p>
            </div>
            <input
              type="file"
              accept=".csv"
              className="sr-only"
              onChange={(e) => setFile(e.target.files?.[0] ?? null)}
            />
          </label>

          <button
            onClick={handleUpload}
            disabled={!file || uploading}
            className="w-full px-4 py-2 bg-blue-600 text-white rounded-lg text-sm font-medium disabled:opacity-50"
          >
            {uploading ? "Uploading..." : "Start Import"}
          </button>
        </>
      )}

      {progress && (
        <div className="space-y-4">
          {/* Progress bar */}
          {progress.pct !== null && (
            <div>
              <div className="flex justify-between text-xs text-gray-500 mb-1">
                <span>Processing rows...</span>
                <span>{progress.pct}%</span>
              </div>
              <div className="h-2 bg-gray-100 rounded-full overflow-hidden">
                <div
                  className="h-full bg-blue-500 transition-all duration-500"
                  style={{ width: `${progress.pct}%` }}
                />
              </div>
            </div>
          )}

          {/* Stats */}
          <div className="grid grid-cols-3 gap-3">
            <div className="text-center p-3 bg-gray-50 rounded-lg">
              <p className="text-lg font-bold text-gray-900">{progress.processedRows}</p>
              <p className="text-xs text-gray-500">Processed</p>
            </div>
            <div className="text-center p-3 bg-green-50 rounded-lg">
              <p className="text-lg font-bold text-green-700">{progress.validRows}</p>
              <p className="text-xs text-green-600">Imported</p>
            </div>
            <div className="text-center p-3 bg-red-50 rounded-lg">
              <p className="text-lg font-bold text-red-700">{progress.errorRows}</p>
              <p className="text-xs text-red-600">Errors</p>
            </div>
          </div>

          {/* Terminal states */}
          {progress.status === "completed" && (
            <div className="flex items-center gap-2 text-green-700 text-sm font-medium">
              <CheckCircle className="h-5 w-5" />
              Import complete โ€” {progress.validRows} rows imported
              {progress.errorRows > 0 && (
                <a href={`/api/import/${jobId}/errors`} download className="underline ml-2">
                  Download {progress.errorRows} error rows
                </a>
              )}
            </div>
          )}
          {progress.status === "failed" && (
            <div className="flex items-center gap-2 text-red-600 text-sm font-medium">
              <XCircle className="h-5 w-5" />
              Import failed โ€” please try again
            </div>
          )}
        </div>
      )}
    </div>
  );
}

Cost and Timeline

ComponentTimelineCost (USD)
Upload to S3 + job creation0.5โ€“1 day$400โ€“$800
Background worker + streaming parse1โ€“2 days$800โ€“$1,600
Zod validation schema per entity0.5 day each$300โ€“$500 each
SSE progress endpoint0.5 day$300โ€“$500
Frontend upload + progress UI1 day$600โ€“$1,000
Error report download0.5 day$300โ€“$500
Full import pipeline (1 entity type)1โ€“2 weeks$6,000โ€“$12,000

See Also


Working With Viprasol

We build CSV import pipelines for SaaS products โ€” from simple single-entity imports through complex multi-entity pipelines with cross-reference validation and transformation rules. Our team has shipped import systems that handle 500,000+ rows without timeouts.

What we deliver:

  • S3-based file upload with size and type validation
  • Streaming CSV parse with Papa Parse (no memory limit)
  • Per-row Zod validation with detailed error collection
  • Background worker with batch insert and duplicate skip
  • Real-time SSE progress with downloadable error report

Explore our SaaS development services or contact us to build your import pipeline.

Share this article:

About the Author

V

Viprasol Tech Team

Custom Software Development Specialists

The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.

MT4/MT5 EA DevelopmentAI Agent SystemsSaaS DevelopmentAlgorithmic Trading

Building a SaaS Product?

We've helped launch 50+ SaaS platforms. Let's build yours โ€” fast.

Free consultation โ€ข No commitment โ€ข Response within 24 hours

Viprasol ยท AI Agent Systems

Add AI automation to your SaaS product?

Viprasol builds custom AI agent crews that plug into any SaaS workflow โ€” automating repetitive tasks, qualifying leads, and responding across every channel your customers use.