Processing Stages
The TimeTiles data processing pipeline consists of eight sequential stages, each responsible for a specific aspect of data transformation and validation. This document provides detailed conceptual explanations of each stage.
Stage Overview
| Stage | Job Handler | Purpose | Batch Size |
|---|---|---|---|
| 1. Dataset Detection | dataset-detection-job | Parse file structure and create import jobs | N/A |
| 2. Analyze Duplicates | analyze-duplicates-job | Find duplicate rows | 5,000 |
| 3. Detect Schema | schema-detection-job | Build progressive schema | 10,000 |
| 4. Validate Schema | validate-schema-job | Compare with existing schema | 10,000 |
| 5. Await Approval | Manual process | Human review of changes | N/A |
| 6. Create Schema Version | create-schema-version-job | Persist approved schema version | N/A |
| 7. Geocode Batch | geocode-batch-job | Add location data | Unique locations |
| 8. Create Events | create-events-batch-job | Generate final records | 1,000 |
Stage 1: Dataset Detection
Trigger: File uploaded to import-files collection
Stage: Pre-processing (before import-job creation)
Job Handler: dataset-detection-job
Purpose
Parse uploaded files to detect datasets and sheets, creating individual import jobs for each detected dataset. This stage handles both single-sheet CSV files and multi-sheet Excel workbooks.
Process Overview
File Analysis:
- Determine file type (CSV, Excel, XLS)
- Enumerate all sheets in Excel files
- Analyze each sheet structure
Dataset Mapping:
- Match sheets to existing datasets by name or configuration
- Create new datasets if no match found
- Respect catalog associations
Job Creation:
- Create one
import-jobper dataset/sheet combination - Each job processes independently through all subsequent stages
- Jobs link back to parent import-file
Multi-Sheet Excel Handling
Excel files with multiple sheets create separate import-jobs:
Example: File “company_events.xlsx” with three sheets:
- Sheet “Conferences” → Dataset “conferences” → Import Job #1
- Sheet “Workshops” → Dataset “workshops” → Import Job #2
- Sheet “Webinars” → Dataset “webinars” → Import Job #3
Each import-job progresses through all eight stages independently.
Import File Status
The parent import-file status reflects the aggregate state:
- Processing: While ANY import-job is incomplete
- Completed: When ALL import-jobs succeed
- Failed: If ANY import-job fails
Next Stage
All created import-jobs automatically begin at Stage 2 (Analyze Duplicates).
Stage 2: Analyze Duplicates
Stage: analyze-duplicates
Job Handler: analyze-duplicates-job
Batch Size: 5,000 rows
Purpose
Identify duplicate rows early in the process to reduce processing volume for subsequent stages, avoid unnecessary API calls, and prevent duplicate event creation.
Process Overview
Strategy Selection:
- Use dataset’s configured deduplication strategy
- Different strategies suit different data patterns
- Strategy determines what constitutes a “duplicate”
Internal Duplicate Analysis:
- Scan entire file to find duplicates within the import
- Build hash map of unique identifiers
- Record row numbers of duplicate occurrences
- Preserve first occurrence, mark subsequent ones as duplicates
External Duplicate Check:
- Query existing events in database
- Find rows that match already-imported data
- Record row numbers of external duplicates
Summary Generation:
- Calculate total rows, unique rows, duplicate counts
- Store duplicate row numbers for later stages
- Provide statistics for reporting
Deduplication Strategies
External ID Strategy:
Uses a specific field as the unique identifier (e.g., “event_id”, “uuid”). Best for data with explicit IDs.
Computed Hash Strategy:
Hashes a combination of specific fields (e.g., “name” + “date” + “location”). Best for data without explicit IDs but with identifying field combinations.
Content Hash Strategy:
Hashes the entire row content. Best for ensuring absolutely no duplicate rows regardless of which fields differ.
Hybrid Strategy:
Tries external ID first, falls back to computed hash if external ID is missing. Best for datasets with partial ID coverage.
Performance Optimizations
- Processes file in 5,000-row batches to manage memory
- Uses Map data structure for O(1) duplicate lookups
- Chunks external duplicate queries to avoid database limits
- Can be skipped entirely if deduplication is disabled for the dataset
Output
Creates a duplicate analysis result stored in the import-job:
- List of internal duplicate row numbers with their first occurrence
- List of external duplicate row numbers with existing event IDs
- Summary statistics (total, unique, internal duplicates, external duplicates)
- Strategy used for analysis
Next Stage
Automatically transitions to Stage 3 (Detect Schema) when complete.
Stage 3: Detect Schema
Stage: detect-schema
Job Handler: schema-detection-job
Batch Size: 10,000 rows
Purpose
Progressively build a JSON Schema representation of the data by analyzing non-duplicate rows across multiple batches. This creates a complete understanding of the data structure, types, and patterns.
Process Overview
Batch Processing:
- Read file in 10,000-row chunks
- Skip rows identified as duplicates in Stage 2
- Process only unique data for schema detection
Progressive Schema Building:
- Use
ProgressiveSchemaBuilderto analyze data types - Each batch refines the schema understanding
- Handle type conflicts intelligently (e.g., “123” vs 123)
- Track field statistics across all batches
Geocoding Detection:
- Identify potential address fields (string patterns)
- Detect latitude/longitude field pairs (numeric ranges)
- Mark candidates for geocoding in Stage 7
State Persistence:
- Save builder state after each batch
- Enable recovery if processing is interrupted
- Maintain continuity across batches
Schema Detection Features
Type Inference:
Automatically detects field types:
- Strings (with max length tracking)
- Numbers (integer vs decimal, min/max values)
- Dates (various formats like ISO 8601, US, European)
- Booleans (true/false, yes/no, 1/0)
Required vs Optional:
- Tracks null/missing value frequencies
- Marks fields as required if present in all rows
- Marks fields as optional if missing in some rows
Field Statistics:
- Total values seen
- Null count
- Unique value count (for enum detection)
- Min/max values (for numbers and dates)
- Type distribution (handles mixed types)
Enum Detection:
- Identifies fields with limited unique values
- Configurable threshold (count or percentage-based)
- Useful for categorical fields like status, type, category
Progressive Building
The schema builder refines its understanding with each batch:
Batch 1: Initial type detection from first 10,000 rows Batch 2: Refines types, discovers new optional fields Batch 3: Adjusts min/max values, enum candidates Batch N: Final schema represents complete data understanding
This progressive approach handles:
- Type conflicts (promotes string if necessary)
- Late-appearing fields (marks as optional)
- Evolving value ranges (expands min/max)
- Large files (processes incrementally)
Geocoding Field Detection
Identifies fields suitable for geocoding:
Address Fields:
- String fields containing address-like patterns
- Common names like “address”, “location”, “venue”
- Content analysis for street, city, state patterns
Coordinate Fields:
- Numeric latitude fields (range -90 to 90)
- Numeric longitude fields (range -180 to 180)
- Common names like “lat”, “latitude”, “lng”, “longitude”
Output
Creates a complete JSON Schema stored in the import-job:
- Property definitions for all detected fields
- Type information with constraints (min/max, format, enum)
- Required field list
- Geocoding candidates (address and/or coordinate fields)
- Field statistics for each property
- Builder state for potential resumption
Next Stage
Automatically transitions to Stage 4 (Validate Schema) when complete.
Stage 4: Validate Schema
Stage: validate-schema
Job Handler: validate-schema-job
Batch Size: 10,000 rows (completes schema detection if needed)
Purpose
Compare the detected schema against the dataset’s current schema to identify breaking and non-breaking changes, then determine whether changes can be auto-approved or require manual approval.
Process Overview
Schema Finalization:
- Complete any remaining schema building from Stage 3
- Ensure schema represents all data in the file
Current Schema Retrieval:
- Get active schema version from
dataset-schemascollection - First import has no existing schema (auto-approve path)
Schema Comparison:
- Identify all differences between detected and existing schemas
- Classify each change as breaking or non-breaking
- Document specific changes for approval review
Approval Decision:
- Check dataset configuration for auto-approval settings
- Evaluate change classification against approval rules
- Determine next stage: approval, schema version creation, or direct to geocoding
Change Classification
Breaking Changes (Always Require Approval):
- Type changes: field type modified (string → number, etc.)
- Required field removed: previously required field now missing
- Constraint narrowing: min/max values become more restrictive
- Format changes: date format or other format changes
- Enum restriction: allowed values reduced
Non-Breaking Changes (Can Auto-Approve):
- New optional fields: additional fields that allow null
- Constraint expansion: min/max values become less restrictive
- Enum expansion: additional allowed values
- Type generalization: number → string (preserves all data)
Type Transformations
The system can automatically handle some type mismatches:
Built-in Transformation Strategies:
- Parse: Smart parsing (string “123” → number 123, string “true” → boolean true)
- Cast: Simple type conversion (number 123 → string “123”)
- Reject: Fail validation for type mismatches (strictest option)
Custom Transformations:
Datasets can define custom transformation functions for specific fields, enabling complex conversions like European number formats (“1.234,56” → 1234.56).
Auto-Approval Logic
Changes can be auto-approved when ALL of the following are true:
- Dataset has
autoGrowenabled - No breaking changes detected
- All new fields are optional
- Dataset schema is not locked
autoApproveNonBreakingis enabled
Changes require manual approval when ANY of the following are true:
- Breaking changes detected
- Dataset schema is locked
- Manual approval is required by configuration
- Security or compliance policies mandate review
Decision Tree
Schema Unchanged:
Proceed directly to Stage 7 (Geocode Batch), skipping approval and schema versioning.
Auto-Approved Changes:
Proceed to Stage 6 (Create Schema Version) to persist the new schema.
Requires Approval:
Proceed to Stage 5 (Await Approval) for human review.
Output
Creates a validation result stored in the import-job:
- Complete list of breaking changes with details
- Complete list of non-breaking changes with details
- Approval decision (auto-approved, requires approval, or unchanged)
- Reason for decision
- Changes summary for admin review
Next Stage
- Unchanged: Stage 7 (Geocode Batch)
- Auto-approved: Stage 6 (Create Schema Version)
- Requires approval: Stage 5 (Await Approval)
Stage 5: Await Approval
Stage: await-approval
Job Handler: Manual process (no background job)
Purpose
Pause processing for human review of schema changes that require approval due to breaking changes or policy requirements. This stage ensures data governance and prevents unintended schema modifications.
Process Overview
Notification:
- Alert dataset administrators of pending approval
- Provide link to admin interface for review
- Include change summary in notification
Review Interface:
- Present all breaking changes with clear descriptions
- Show all non-breaking changes for context
- Provide side-by-side comparison of old vs new schema
- Display sample data demonstrating changes
Decision Capture:
- Record approval or rejection
- Capture timestamp and approving user
- Store approval notes/reason
- Update import-job with decision
Post-Decision Processing:
- If approved: Transition to Stage 6 (Create Schema Version)
- If rejected: Mark import as failed, preserve data for analysis
Approval Data Structure
The import-job stores approval information:
- Approval status (pending, approved, rejected)
- Approving user ID and timestamp
- Approval reason or notes
- Complete list of changes being approved
- Original detected schema for reference
Manual Intervention Options
Administrators can:
- Approve: Accept changes and continue processing
- Reject: Decline changes and stop processing
- Modify configuration: Adjust dataset settings and retry
- Edit transformations: Add type transformations and retry
Approval Workflow
Typical Flow:
- Import reaches await-approval stage
- Admin receives notification
- Admin reviews changes in admin interface
- Admin approves or rejects
- Approval service updates import-job
- Pipeline continues automatically
Timeout Handling:
- No automatic timeout (waits indefinitely for decision)
- Admin can set custom timeout policies
- Timed-out imports can be marked as failed manually
Next Stage
- If approved: Stage 6 (Create Schema Version)
- If rejected: Import marked as failed (terminal state)
Stage 6: Create Schema Version
Stage: create-schema-version
Job Handler: create-schema-version-job
Batch Size: N/A (single operation)
Purpose
Create a new schema version record in the dataset-schemas collection after schema approval. This stage runs in a separate transaction to avoid circular dependencies and deadlocks during the approval process.
Process Overview
Validation Check:
- Verify import-job exists and is in correct stage
- Confirm schema has been approved (either auto or manual)
- Check that schema version doesn’t already exist
Duplicate Prevention:
- Skip processing if schema version already created
- Prevents duplicate versions from concurrent operations
- Idempotent operation for retry safety
Dataset Retrieval:
- Get associated dataset record
- Verify dataset still exists
- Collect configuration for version metadata
Version Creation:
- Use
SchemaVersioningServicefor consistent versioning - Store complete schema with field metadata
- Record approval information (auto or manual)
- Link to import-job that created this version
Job Update:
- Link created schema version to import-job
- Update import-job metadata with version ID
- Prepare for next stage
Why a Separate Stage?
This stage exists separately from validation and approval to:
Avoid Circular Dependencies:
- Schema validation queries existing schemas
- Creating schemas during validation creates circular references
- Separate stage breaks the dependency cycle
Prevent Deadlocks:
- Approval process locks import-job record
- Schema creation locks dataset-schemas
- Sequential stages prevent concurrent lock conflicts
Enable Transaction Safety:
- Approval can fail without corrupting schema versions
- Schema creation can retry without re-running approval
- Clean rollback on any step failure
Schema Version Contents
Each version record contains:
- Complete JSON Schema for the dataset
- Field metadata and statistics from detection
- Approval information (who, when, why)
- Auto-approval flag
- Import source references
- Version number
- Timestamp
Error Handling
Missing Job: Throws error if import-job not found (should never happen) No Approval: Skips processing if schema not approved (safeguard) Missing Dataset: Throws error if dataset not found (data integrity issue) Creation Failure: Marks import-job as failed, stops processing, preserves state
Next Stage
Automatically transitions to Stage 7 (Geocode Batch) when complete.
Stage 7: Geocode Batch
Stage: geocode-batch
Job Handler: geocode-batch-job
Batch Processing: Unique locations (not row-based batching)
Purpose
Enrich data with geographic coordinates by geocoding addresses or validating provided coordinates. This stage runs before event creation so location data is available immediately when events are created.
Process Overview
Candidate Evaluation:
- Check if geocoding candidates were detected in Stage 3
- Skip stage entirely if no location fields found
- Determine geocoding mode (address vs coordinates)
Unique Location Extraction:
- Read entire file to extract ALL unique location values
- Geocoding is NOT batched by rows
- Each unique address/coordinate pair processed once
- Results stored in lookup map by row number
Geocoding or Validation:
- Address mode: Send addresses to geocoding API
- Coordinate mode: Validate provided lat/lng pairs
- Hybrid mode: Use coordinates if present, geocode address otherwise
Result Storage:
- Store geocoding results by row number
- Cache results in import-job for later lookup
- Track geocoding progress and success rate
Geocoding Scenarios
Address Geocoding:
When the data contains address fields:
- Extract full address strings from rows
- Send to geocoding API (Nominatim, Google, etc.)
- Receive coordinates with confidence score
- Store formatted address from geocoding service
- Handle partial matches and low-confidence results
Coordinate Validation:
When the data contains latitude/longitude fields:
- Extract numeric coordinate pairs
- Validate ranges (lat: -90 to 90, lng: -180 to 180)
- Verify coordinates are valid numbers
- Mark as high confidence (provided data)
- Store as-is with validation flag
Hybrid Approach:
When both addresses and coordinates are present:
- Prefer provided coordinates if valid
- Fall back to address geocoding if coordinates invalid
- Cross-validate coordinates against geocoded address
- Flag discrepancies for review
Why Not Batched Like Other Stages?
Geocoding processes unique locations instead of rows because:
- Efficiency: Multiple rows often share the same location
- Cost: Geocoding APIs charge per request, not per row
- Performance: Geocoding is slow; minimizing calls is critical
- Cache Friendliness: Results apply to all rows with same location
Example:
- File has 10,000 rows
- Only 500 unique locations
- Geocodes 500 locations (not 10,000)
- Lookup map applies results to all 10,000 rows
Error Handling
Individual Geocoding Failures:
- Failed geocoding attempts are logged
- Processing continues without that location
- Events created without coordinates (validation handles)
- Errors don’t stop batch processing
Rate Limit Errors:
- Trigger exponential backoff
- Retry failed requests after delay
- Resume from failure point
- Respect API rate limits
Malformed Addresses:
- Skip with warning in logs
- Track skipped addresses for reporting
- Continue processing remaining addresses
Result Storage
Geocoding results stored in import-job:
- Map of row number → geocoding result
- Each result includes coordinates, confidence, formatted address
- Source indication (geocoded vs provided vs failed)
- Progress tracking (current/total unique locations)
Next Stage
Automatically transitions to Stage 8 (Create Events) when complete.
Stage 8: Create Events
Stage: create-events
Job Handler: create-events-batch-job
Batch Size: 1,000 rows
Purpose
Create the final event records in the database with all processing results applied. This is the culmination of the entire pipeline, producing the events that users will query and explore.
Process Overview
Batch Reading:
- Read 1,000 rows from file per batch
- Process multiple batches for large files
- Resume from last processed batch on interruption
Duplicate Filtering:
- Skip rows marked as duplicates in Stage 2
- Process only unique rows
- Count duplicates skipped for reporting
ID Generation:
- Generate unique ID using dataset’s configured strategy
- External ID from specified field
- Computed hash from field combination
- Auto-increment for datasets without IDs
- Hybrid approach (external ID with fallback)
Data Enrichment:
- Apply geocoding results from Stage 7 lookup
- Extract timestamp from data or use current time
- Extract location display name if present
- Determine coordinate source (geocoded, provided, none)
Event Creation:
- Create records in
eventscollection - Link to dataset and import-job
- Reference schema version from Stage 6
- Set validation status (pending initial validation)
Progress Tracking:
- Update progress counters after each batch
- Track successful creations and errors separately
- Provide real-time status for monitoring
Event Data Structure
Each created event contains:
- Original row data (complete, unmodified)
- Unique ID (generated according to strategy)
- Event timestamp (extracted or current)
- Location coordinates (if geocoded/provided)
- Coordinate source information (type, confidence)
- Dataset reference
- Import-job reference
- Schema version number
- Validation status
Timestamp Extraction
The system looks for timestamp fields in priority order:
- “timestamp”
- “date”
- “datetime”
- “created_at”
- “event_date”
- “event_time”
Falls back to current time if no valid timestamp found.
Error Handling
Individual Row Failures:
- Failed events logged in errors array
- Successful events update progress counter
- Batch continues processing remaining rows
- Final statistics include error count
Database Transaction Failures:
- Entire batch retries on transaction failure
- Preserves batch atomicity
- Prevents partial batch commits
- Exponential backoff on retries
Constraint Violations:
- Unique constraint violations logged
- Row skipped, processing continues
- Indicates duplicate detection may need tuning
Completion Processing
When all batches complete, the system:
Updates Import Job:
- Sets stage to “completed”
- Records final statistics
- Calculates processing duration
- Marks completion timestamp
Final Statistics:
- Total events created successfully
- Total duplicates skipped (internal + external)
- Total events geocoded (with location data)
- Total errors encountered
- Processing time per stage
Import File Update:
- Checks if ALL import-jobs completed
- Updates import-file status if all jobs done
- Maintains “processing” if any jobs remain
Notification:
- Alerts administrators of completion
- Provides summary statistics
- Links to created events for review
Next Stage
None - this is the final stage. Import transitions to “completed” status.
Stage Transition State Machine
The pipeline enforces strict stage transitions:
Valid Transitions:
analyze-duplicates→detect-schemadetect-schema→validate-schemavalidate-schema→await-approval(requires approval)validate-schema→create-schema-version(auto-approved)validate-schema→geocode-batch(schema unchanged)await-approval→create-schema-version(approved)await-approval→failed(rejected)create-schema-version→geocode-batchgeocode-batch→create-eventscreate-events→completed
Error Transitions:
- ANY stage →
failed(error handling)
Invalid Transitions:
All other transitions are rejected by the StageTransitionService to maintain state machine integrity.
Summary
These eight stages work together to transform uploaded files into structured, validated, and enriched events:
- Dataset Detection creates the foundation by parsing files and creating jobs
- Analyze Duplicates reduces processing load and prevents duplicate events
- Detect Schema understands the data structure and types
- Validate Schema ensures schema changes are intentional and safe
- Await Approval provides governance for breaking changes
- Create Schema Version persists approved schema for audit and rollback
- Geocode Batch enriches data with geographic coordinates
- Create Events produces the final, queryable event records
Each stage builds on the previous stages’ outputs, creating a robust and maintainable pipeline for data processing.