Recipes

Selected recipes and patterns for building real-world pipelines using the streaming API.

ETL Pipelines (streaming)

Use streaming for long-running ETL: batch, transform, and write without buffering the whole file.

Code Sandbox
import { ConvertBuddy } from "convert-buddy-js";

const csv = `user_id,first_name,last_name,email,signup_date,is_active
1,John,Doe,john@example.com,2024-01-15,1
2,Jane,Smith,jane@example.com,2024-02-20,1`;

async function run() {
  const cb = new ConvertBuddy({ outputFormat: 'ndjson' });

  const controller = await cb.stream(csv, {
    recordBatchSize: 100,
    onRecords: async (ctrl, records) => {
      // Transform & load each batch
      await saveBatchToDb(records);
      console.log('Processed batch:', records);
    },
    onDone: (final) => console.log('ETL complete:', final),
    profile: true
  });

  await controller.done;
}

async function saveBatchToDb(records) {
  // simulate async DB write
  await new Promise(r => setTimeout(r, 10));
}

run().catch(console.error);
Terminal
Waiting for output...

Auto-detect pipelines

Detect input format or structure and stream using the inferred config.

Code Sandbox
import { detectFormat, ConvertBuddy } from 'convert-buddy-js';

async function universalConvert(data) {
  const format = await detectFormat(data);
  console.log('Detected format:', format);
  
  const controller = new ConvertBuddy({ outputFormat: 'json' }).stream(data, { 
    inputFormat: format,
    onRecords: (ctrl, records, stats) => {
      console.log('Batch:', records);
      console.log('Progress:', ctrl.recordCount, 'records');
    },
    onDone: (final) => console.log('Done:', final),
    profile: true
  });
  
  await controller.done;
}

universalConvert('name,age\nAlice,30\nBob,25').catch(console.error);
Terminal
Waiting for output...

Notes

  • Prefer streaming examples (consistent API for small and large files)
  • Use detection when format is unknown; otherwise specify inputFormat to save time