Ingestion Stages
The TimeTiles data ingestion pipeline consists of seven task handlers, sequenced by Payload Workflows. Each task is responsible for a specific aspect of data transformation and validation. This document provides detailed conceptual explanations of each task.
Stage Overview
| Task | Handler | Purpose | Batch Size |
|---|---|---|---|
| 1. Dataset Detection | dataset-detection | Parse file structure and create ingestion jobs | N/A |
| 2. Analyze Duplicates | analyze-duplicates | Find duplicate rows | 5,000 |
| 3. Detect Schema | schema-detection | Build progressive schema | 10,000 |
| 4. Validate Schema | validate-schema | Compare with existing schema | 10,000 |
| 5. Create Schema Version | create-schema-version | Persist approved schema version | N/A |
| 6. Geocode Batch | geocode-batch | Add location data | Unique locations |
| 7. Create Events | create-events-batch | Generate final records | 1,000 |
Tasks 2-7 run per sheet within the workflow. If validate-schema detects schema drift requiring human review, it returns { needsReview: true } and the sheet is paused. After the user approves, the ingest-process workflow resumes from task 5 (Create Schema Version).
Stage 1: Dataset Detection
Trigger: File uploaded to ingest-files collection
Stage: Pre-processing (before import-job creation)
Job Handler: dataset-detection-job
Purpose
Parse uploaded files to detect datasets and sheets, creating individual ingestion jobs for each detected dataset. This stage handles both single-sheet CSV files and multi-sheet Excel workbooks.
Process Overview
File Analysis:
- Determine file type (CSV, Excel, XLS)
- Enumerate all sheets in Excel files
- Analyze each sheet structure
Dataset Mapping:
- Match sheets to existing datasets by name or configuration
- Create new datasets if no match found
- Respect catalog associations
Job Creation:
- Create one ingestion job per dataset/sheet combination
- Each job processes independently through all subsequent stages
- Jobs link back to parent ingest-file
Multi-Sheet Excel Handling
Excel files with multiple sheets create separate ingestion jobs:
Example: File “company_events.xlsx” with three sheets:
- Sheet “Conferences” → Dataset “conferences” → Ingestion Job #1
- Sheet “Workshops” → Dataset “workshops” → Ingestion Job #2
- Sheet “Webinars” → Dataset “webinars” → Ingestion Job #3
Each ingestion job progresses through all task stages within the parent workflow.
Ingestion File Status
The parent ingest-file status reflects the aggregate state:
- Processing: While ANY ingestion job is incomplete
- Completed: When ALL ingestion jobs succeed
- Failed: If ANY ingestion job fails
Next Task
The workflow proceeds to the per-sheet pipeline, starting with Analyze Duplicates for each detected sheet.
Stage 2: Analyze Duplicates
Stage: analyze-duplicates
Job Handler: analyze-duplicates-job
Batch Size: 5,000 rows
Purpose
Identify duplicate rows early in the process to reduce processing volume for subsequent stages, avoid unnecessary API calls, and prevent duplicate event creation.
Process Overview
Strategy Selection:
- Use dataset’s configured deduplication strategy
- Different strategies suit different data patterns
- Strategy determines what constitutes a “duplicate”
Internal Duplicate Analysis:
- Scan entire file to find duplicates within the import
- Build hash map of unique identifiers
- Record row numbers of duplicate occurrences
- Preserve first occurrence, mark subsequent ones as duplicates
External Duplicate Check:
- Query existing events in database
- Find rows that match already-imported data
- Record row numbers of external duplicates
Summary Generation:
- Calculate total rows, unique rows, duplicate counts
- Store duplicate row numbers for later stages
- Provide statistics for reporting
Deduplication Strategies
External ID Strategy:
Uses a specific field as the unique identifier (e.g., “event_id”, “uuid”). Best for data with explicit IDs.
Computed Hash Strategy:
Hashes a combination of specific fields (e.g., “name” + “date” + “location”). Best for data without explicit IDs but with identifying field combinations.
Content Hash Strategy:
Hashes the entire row content. Best for ensuring absolutely no duplicate rows regardless of which fields differ.
Hybrid Strategy:
Tries external ID first, falls back to computed hash if external ID is missing. Best for datasets with partial ID coverage.
Performance Optimizations
- Processes file in 5,000-row batches to manage memory
- Uses Map data structure for O(1) duplicate lookups
- Chunks external duplicate queries to avoid database limits
- Can be skipped entirely if deduplication is disabled for the dataset
Output
Creates a duplicate analysis result stored in the ingestion job:
- List of internal duplicate row numbers with their first occurrence
- List of external duplicate row numbers with existing event IDs
- Summary statistics (total, unique, internal duplicates, external duplicates)
- Strategy used for analysis
Next Task
The workflow proceeds to Detect Schema.
Stage 3: Detect Schema
Stage: detect-schema
Job Handler: schema-detection-job
Batch Size: 10,000 rows
Purpose
Progressively build a JSON Schema representation of the data by analyzing non-duplicate rows across multiple batches. This creates a complete understanding of the data structure, types, and patterns.
Process Overview
Batch Processing:
- Read file in 10,000-row chunks
- Skip rows identified as duplicates in Stage 2
- Process only unique data for schema detection
Progressive Schema Building:
- Use
ProgressiveSchemaBuilderto analyze data types - Each batch refines the schema understanding
- Handle type conflicts intelligently (e.g., “123” vs 123)
- Track field statistics across all batches
Geocoding Detection:
- Identify potential address fields (string patterns)
- Detect latitude/longitude field pairs (numeric ranges)
- Mark candidates for geocoding in Stage 7
State Persistence:
- Save builder state after each batch
- Enable recovery if processing is interrupted
- Maintain continuity across batches
Schema Detection Features
Type Inference:
Automatically detects field types:
- Strings (with max length tracking)
- Numbers (integer vs decimal, min/max values)
- Dates (various formats like ISO 8601, US, European)
- Booleans (true/false, yes/no, 1/0)
Required vs Optional:
- Tracks null/missing value frequencies
- Marks fields as required if present in all rows
- Marks fields as optional if missing in some rows
Field Statistics:
- Total values seen
- Null count
- Unique value count (for enum detection)
- Min/max values (for numbers and dates)
- Type distribution (handles mixed types)
Enum Detection:
- Identifies fields with limited unique values
- Configurable threshold (count or percentage-based)
- Useful for categorical fields like status, type, category
Progressive Building
The schema builder refines its understanding with each batch:
Batch 1: Initial type detection from first 10,000 rows Batch 2: Refines types, discovers new optional fields Batch 3: Adjusts min/max values, enum candidates Batch N: Final schema represents complete data understanding
This progressive approach handles:
- Type conflicts (promotes string if necessary)
- Late-appearing fields (marks as optional)
- Evolving value ranges (expands min/max)
- Large files (processes incrementally)
Geocoding Field Detection
Identifies fields suitable for geocoding:
Address Fields:
- String fields containing address-like patterns
- Common names like “address”, “location”, “venue”
- Content analysis for street, city, state patterns
Coordinate Fields:
- Numeric latitude fields (range -90 to 90)
- Numeric longitude fields (range -180 to 180)
- Common names like “lat”, “latitude”, “lng”, “longitude”
Output
Creates a complete JSON Schema stored in the ingestion job:
- Property definitions for all detected fields
- Type information with constraints (min/max, format, enum)
- Required field list
- Geocoding candidates (address and/or coordinate fields)
- Field statistics for each property
- Builder state for potential resumption
Next Task
The workflow proceeds to Validate Schema.
Stage 4: Validate Schema
Stage: validate-schema
Job Handler: validate-schema-job
Batch Size: 10,000 rows (completes schema detection if needed)
Purpose
Compare the detected schema against the dataset’s current schema to identify breaking and non-breaking changes, then determine whether changes can be auto-approved or require manual approval.
Process Overview
Schema Finalization:
- Complete any remaining schema building from Stage 3
- Ensure schema represents all data in the file
Current Schema Retrieval:
- Get active schema version from
dataset-schemascollection - First import has no existing schema (auto-approve path)
Schema Comparison:
- Identify all differences between detected and existing schemas
- Classify each change as breaking or non-breaking
- Document specific changes for approval review
Approval Decision:
- Check dataset configuration for auto-approval settings
- Evaluate change classification against approval rules
- Determine next stage: approval, schema version creation, or direct to geocoding
Change Classification
Breaking Changes (Always Require Approval):
- Type changes: field type modified (string → number, etc.)
- Required field removed: previously required field now missing
- Constraint narrowing: min/max values become more restrictive
- Format changes: date format or other format changes
- Enum restriction: allowed values reduced
Non-Breaking Changes (Can Auto-Approve):
- New optional fields: additional fields that allow null
- Constraint expansion: min/max values become less restrictive
- Enum expansion: additional allowed values
- Type generalization: number → string (preserves all data)
Type Transformations
The system can automatically handle some type mismatches:
Built-in Transformation Strategies:
- Parse: Smart parsing (string “123” → number 123, string “true” → boolean true)
- Cast: Simple type conversion (number 123 → string “123”)
- Reject: Fail validation for type mismatches (strictest option)
Custom Transformations:
Datasets can define custom transformation functions for specific fields, enabling complex conversions like European number formats (“1.234,56” → 1234.56).
Auto-Approval Logic
Changes can be auto-approved when ALL of the following are true:
- Dataset has
autoGrowenabled - No breaking changes detected
- All new fields are optional
- Dataset schema is not locked
autoApproveNonBreakingis enabled
Changes require manual approval when ANY of the following are true:
- Breaking changes detected
- Dataset schema is locked
- Manual approval is required by configuration
- Security or compliance policies mandate review
Decision Tree
Schema Unchanged:
Task returns success. The workflow proceeds directly to Geocode Batch, skipping schema versioning.
Auto-Approved Changes:
Task returns success. The workflow proceeds to Create Schema Version to persist the new schema.
Requires Human Review (Schema Drift):
Task returns { needsReview: true }. The ingestion job is set to NEEDS_REVIEW. The sheet is skipped in the current workflow. After the user reviews and approves, the ingest-process workflow resumes from Create Schema Version.
Output
Creates a validation result stored in the ingestion job:
- Complete list of breaking changes with details
- Complete list of non-breaking changes with details
- Approval decision (auto-approved, needs review, or unchanged)
- Reason for decision
- Changes summary for admin review
Next Task
- Unchanged or auto-approved: Create Schema Version (within same workflow)
- Needs review: Sheet paused;
ingest-processworkflow queued after approval
NEEDS_REVIEW: Human Review
When validate-schema detects schema drift requiring approval, the ingestion job enters the NEEDS_REVIEW state. This replaces the old AWAIT_APPROVAL blocking stage.
Key differences from the old model:
- NEEDS_REVIEW only pauses the affected sheet, not the entire pipeline
- Other sheets in a multi-sheet file continue processing
- Manual uploads never trigger NEEDS_REVIEW (the wizard handles schema configuration upfront)
- Only automated imports (scheduled, scraper) can trigger NEEDS_REVIEW when schema drift is detected
Review flow:
validate-schemareturns{ needsReview: true }with schema comparison details- Ingestion job set to
NEEDS_REVIEW; sheet skipped in the workflow’s processing loop - User reviews in admin interface, sees breaking changes, new fields, and suggested renames
- User approves —
ingest-jobsafterChange hook queuesingest-processworkflow ingest-processruns Create Schema Version, Geocode Batch, and Create Events for that job
Stage 5: Create Schema Version
Stage: create-schema-version
Job Handler: create-schema-version-job
Batch Size: N/A (single operation)
Purpose
Create a new schema version record in the dataset-schemas collection after schema approval. This stage runs in a separate transaction to avoid circular dependencies and deadlocks during the approval process.
Process Overview
Validation Check:
- Verify import-job exists and is in correct stage
- Confirm schema has been approved (either auto or manual)
- Check that schema version doesn’t already exist
Duplicate Prevention:
- Skip processing if schema version already created
- Prevents duplicate versions from concurrent operations
- Idempotent operation for retry safety
Dataset Retrieval:
- Get associated dataset record
- Verify dataset still exists
- Collect configuration for version metadata
Version Creation:
- Use
SchemaVersioningServicefor consistent versioning - Store complete schema with field metadata
- Record approval information (auto or manual)
- Link to import-job that created this version
Job Update:
- Link created schema version to import-job
- Update import-job metadata with version ID
- Prepare for next stage
Why a Separate Stage?
This stage exists separately from validation and approval to:
Avoid Circular Dependencies:
- Schema validation queries existing schemas
- Creating schemas during validation creates circular references
- Separate stage breaks the dependency cycle
Prevent Deadlocks:
- Approval process locks import-job record
- Schema creation locks dataset-schemas
- Sequential stages prevent concurrent lock conflicts
Enable Transaction Safety:
- Approval can fail without corrupting schema versions
- Schema creation can retry without re-running approval
- Clean rollback on any step failure
Schema Version Contents
Each version record contains:
- Complete JSON Schema for the dataset
- Field metadata and statistics from detection
- Approval information (who, when, why)
- Auto-approval flag
- Import source references
- Version number
- Timestamp
Error Handling
Missing Job: Throws error if import-job not found (should never happen) No Approval: Skips processing if schema not approved (safeguard) Missing Dataset: Throws error if dataset not found (data integrity issue) Creation Failure: Marks import-job as failed, stops processing, preserves state
Next Task
The workflow proceeds to Geocode Batch.
Stage 6: Geocode Batch
Stage: geocode-batch
Job Handler: geocode-batch-job
Batch Processing: Unique locations (not row-based batching)
Purpose
Enrich data with geographic coordinates by geocoding addresses or validating provided coordinates. This stage runs before event creation so location data is available immediately when events are created.
Process Overview
Candidate Evaluation:
- Check if geocoding candidates were detected in Stage 3
- Skip stage entirely if no location fields found
- Determine geocoding mode (address vs coordinates)
Unique Location Extraction:
- Read entire file to extract ALL unique location values
- Geocoding is NOT batched by rows
- Each unique address/coordinate pair processed once
- Results stored in lookup map by row number
Geocoding or Validation:
- Address mode: Send addresses to geocoding API
- Coordinate mode: Validate provided lat/lng pairs
- Hybrid mode: Use coordinates if present, geocode address otherwise
Result Storage:
- Store geocoding results by row number
- Cache results in import-job for later lookup
- Track geocoding progress and success rate
Geocoding Scenarios
Address Geocoding:
When the data contains address fields:
- Extract full address strings from rows
- Send to geocoding API (Nominatim, Google, etc.)
- Receive coordinates with confidence score
- Store formatted address from geocoding service
- Handle partial matches and low-confidence results
Coordinate Validation:
When the data contains latitude/longitude fields:
- Extract numeric coordinate pairs
- Validate ranges (lat: -90 to 90, lng: -180 to 180)
- Verify coordinates are valid numbers
- Mark as high confidence (provided data)
- Store as-is with validation flag
Hybrid Approach:
When both addresses and coordinates are present:
- Prefer provided coordinates if valid
- Fall back to address geocoding if coordinates invalid
- Cross-validate coordinates against geocoded address
- Flag discrepancies for review
Why Not Batched Like Other Stages?
Geocoding processes unique locations instead of rows because:
- Efficiency: Multiple rows often share the same location
- Cost: Geocoding APIs charge per request, not per row
- Performance: Geocoding is slow; minimizing calls is critical
- Cache Friendliness: Results apply to all rows with same location
Example:
- File has 10,000 rows
- Only 500 unique locations
- Geocodes 500 locations (not 10,000)
- Lookup map applies results to all 10,000 rows
Error Handling
Individual Geocoding Failures:
- Failed geocoding attempts are logged
- Processing continues without that location
- Events created without coordinates (validation handles)
- Errors don’t stop batch processing
Rate Limit Errors:
- Trigger exponential backoff
- Retry failed requests after delay
- Resume from failure point
- Respect API rate limits
Malformed Addresses:
- Skip with warning in logs
- Track skipped addresses for reporting
- Continue processing remaining addresses
Result Storage
Geocoding results stored in import-job:
- Map of row number → geocoding result
- Each result includes coordinates, confidence, formatted address
- Source indication (geocoded vs provided vs failed)
- Progress tracking (current/total unique locations)
Next Task
The workflow proceeds to Create Events.
Stage 7: Create Events
Stage: create-events
Job Handler: create-events-batch-job
Batch Size: 1,000 rows
Purpose
Create the final event records in the database with all processing results applied. This is the culmination of the entire ingestion pipeline, producing the events that users will query and explore.
Process Overview
Batch Reading:
- Read 1,000 rows from file per batch
- Process multiple batches for large files
- Resume from last processed batch on interruption
Duplicate Filtering:
- Skip rows marked as duplicates in Stage 2
- Process only unique rows
- Count duplicates skipped for reporting
ID Generation:
- Generate unique ID using dataset’s configured strategy
- External ID from specified field
- Computed hash from field combination
- Auto-increment for datasets without IDs
- Hybrid approach (external ID with fallback)
Data Enrichment:
- Apply geocoding results from Stage 7 lookup
- Extract timestamp from data or use current time
- Extract location display name if present
- Determine coordinate source (geocoded, provided, none)
Event Creation:
- Create records in
eventscollection - Link to dataset and import-job
- Reference schema version from Stage 6
- Set validation status (pending initial validation)
Progress Tracking:
- Update progress counters after each batch
- Track successful creations and errors separately
- Provide real-time status for monitoring
Event Data Structure
Each created event contains:
- Original row data (complete, unmodified)
- Unique ID (generated according to strategy)
- Event timestamp (extracted or current)
- Location coordinates (if geocoded/provided)
- Coordinate source information (type, confidence)
- Dataset reference
- Import-job reference
- Schema version number
- Validation status
Timestamp Extraction
The system looks for timestamp fields in priority order:
- “timestamp”
- “date”
- “datetime”
- “created_at”
- “event_date”
- “event_time”
Falls back to current time if no valid timestamp found.
Error Handling
Individual Row Failures:
- Failed events logged in errors array
- Successful events update progress counter
- Batch continues processing remaining rows
- Final statistics include error count
Database Transaction Failures:
- Entire batch retries on transaction failure
- Preserves batch atomicity
- Prevents partial batch commits
- Exponential backoff on retries
Constraint Violations:
- Unique constraint violations logged
- Row skipped, processing continues
- Indicates duplicate detection may need tuning
Completion Processing
When all batches complete, the system:
Updates Import Job:
- Sets stage to “completed”
- Records final statistics
- Calculates processing duration
- Marks completion timestamp
Final Statistics:
- Total events created successfully
- Total duplicates skipped (internal + external)
- Total events geocoded (with location data)
- Total errors encountered
- Processing time per stage
Import File Update:
- Checks if ALL ingest-jobs completed
- Updates ingest-file status if all jobs done
- Maintains “processing” if any jobs remain
Notification:
- Alerts administrators of completion
- Provides summary statistics
- Links to created events for review
Completion
This is the final task. The ingestion job transitions to “completed” status. The workflow marks the sheet as done and moves to the next sheet (if any remain).
Workflow Task Sequencing
Task sequencing is defined by the workflow handlers in lib/jobs/workflows/. There is no separate state machine or transition graph — the workflow handler is a linear function that calls tasks in order:
dataset-detection → for each sheet:
analyze-duplicates → detect-schema → validate-schema →
create-schema-version → geocode-batch → create-eventsIf a task returns { needsReview: true }, the sheet is skipped. If a task throws, per-sheet try/catch marks the sheet as FAILED and continues to the next sheet.
The ingest-process workflow (for post-review resumption) runs a shorter sequence: create-schema-version → geocode-batch → create-events.
Summary
These seven task handlers work together to ingest uploaded files into structured, validated, and enriched events:
- Dataset Detection creates the foundation by parsing files and creating jobs
- Analyze Duplicates reduces processing load and prevents duplicate events
- Detect Schema understands the data structure and types
- Validate Schema ensures schema changes are intentional and safe
- Create Schema Version persists approved schema for audit and rollback
- Geocode Batch enriches data with geographic coordinates
- Create Events produces the final, queryable event records
Each task builds on the previous tasks’ outputs, orchestrated by Payload Workflows with built-in retry and caching.