Queue Processing
Tapix processes imports through a three-stage queue pipeline. Each stage runs as an independent job, allowing imports to be processed in the background without blocking the user interface.
Import Pipeline
The import pipeline consists of three sequential jobs:
1. ValidateColumnJob
Validates each column's values against the rules defined in your importer. For each row:
- Runs validation rules on the mapped column values.
- Stores validation errors on the
ImportRowrecord. - Writes relationship matches for any entity link (relationship) fields.
2. ResolveMatchesJob
Determines whether each row should create a new record or update an existing one:
- Compares row data against existing database records using the importer's match columns.
- Sets the
matched_idon eachImportRowthat corresponds to an existing record.
3. ExecuteImportJob
Processes all rows and persists the data to your database. This is the final and most substantial stage of the pipeline.
Key characteristics:
- Unique execution -- Implements
ShouldBeUniqueto prevent the same import from being processed concurrently. - Chunk-based processing -- Rows are processed in configurable chunks (default: 500) to manage memory usage.
- Transaction safety -- Each chunk of row saves is wrapped in a database transaction.
- Progress tracking -- Dispatches an
ImportRowProcessedevent after each chunk, enabling real-time progress updates. - Failed row logging -- Rows that fail during processing are logged to the
tapix_failed_import_rowstable with error messages. - Relationship deduplication -- Deduplicates relationship records within the same import to avoid creating duplicate related models.
- Custom field batching -- Custom field data is collected per chunk and flushed via the configured adapter.
Configuration
All queue-related settings are available in the Tapix config file:
'queue' => env('TAPIX_QUEUE', 'default'),
'job_timeout' => 300,
'job_tries' => 3,
'job_backoff' => [10, 30],
'chunk_size' => 500,
| Option | Default | Description |
|---|---|---|
queue | default | The queue connection name used for import jobs. |
job_timeout | 300 | Maximum execution time in seconds for each job. |
job_tries | 3 | Number of retry attempts before a job is marked as failed. |
job_backoff | [10, 30] | Delay in seconds between retry attempts (progressive). |
chunk_size | 500 | Number of rows processed per chunk in ExecuteImportJob. |
Queue Timeout Configuration
The retry_after value in your queue connection configuration must exceed the job_timeout value. If retry_after is too short, the queue worker may re-dispatch a job that is still running, leading to duplicate processing.
retry_after value in your queue connection must exceed job_timeout. If it does not, the queue worker may restart a job that is still running, causing duplicate row processing. Set retry_after to at least job_timeout + 30 seconds.Recommended setting: retry_after >= job_timeout + 30 seconds.
For example, with the default job_timeout of 300 seconds, set retry_after to at least 330 in your config/queue.php:
'connections' => [
'redis' => [
'driver' => 'redis',
'retry_after' => 330,
// ...
],
],
Failed Rows
Rows that fail during ExecuteImportJob are stored in the tapix_failed_import_rows table. Each failed row record includes the original row data and the error message. Failed row reports can be downloaded for review.
Tenant Awareness
When multi-tenancy is enabled, import jobs use the TenantAware trait to preserve tenant context across queue dispatches. This ensures that all database operations within the job are scoped to the correct tenant, even when processed by a background worker.