Backend — jobs and processes
Long-running work (AI generation, CSV import, approved-batch apply) runs through Laravel’s database-backed queue system. Every queued unit of work creates a Process record that the frontend can poll or stream for live progress updates.
Queue driver and configuration
The default connection is database, controlled by QUEUE_CONNECTION in the environment (config/queue.php:16). All job rows land in the jobs table (DB_QUEUE_TABLE; default jobs). The named queue for AI work is aigen — jobs are dispatched with .onQueue($newProcess->queue) where $newProcess->queue comes from the originating Process record’s queue column.
Key database-connection settings (config/queue.php:38-45):
| Setting | Default | Env override |
|---|---|---|
retry_after | 90 s | DB_QUEUE_RETRY_AFTER |
after_commit | false | — |
| Failed jobs table | failed_jobs | — |
| Driver | database-uuids | QUEUE_FAILED_DRIVER |
⚠️ Warning:
retry_after(90 s) is shorter than each job’s$timeout(180 s). This means the queue worker could re-claim a job before the first attempt times out. All five jobs set$tries = 2, so a second attempt is permitted; however, in practice the 180 s timeout triggers the Laravel worker’sfailed()hook rather than a silent reattempt. Keep this asymmetry in mind if you tune either value.
Job roster
All five job classes live in app/Jobs/ and implement ShouldQueue. Every class declares $timeout = 180 and $tries = 2 (GenerateAiLocationImportJob.php:20-21; same pattern in all five).
| Class | Process type string | Dispatched by |
|---|---|---|
GenerateAiLocationImportJob | ai_location_import_generate | AiLocationImportController::store |
ApplyAiLocationImportJob | ai_location_import_apply | AiLocationImportController::import |
GenerateAiLocationImprovementJob | ai_location_improvement_generate | AiLocationImprovementController::store |
ApplyAiLocationImprovementJob | ai_location_improvement_apply | AiLocationImprovementController::apply |
ImportCsvLocationsJob | csv_location_import | LocationController::import |
Each job constructor accepts a single int $processId and resolves the Process model at the start of handle(). This keeps the serialized job payload small and avoids stale Eloquent model data.
Shared execution pattern
Every job follows the same five-step skeleton:
- Load
$process = Process::query()->findOrFail($this->processId). - Early-exit via
$processService->shouldStop($process)— if the process was cancelled before the worker picked it up, the job exits silently without marking it failed. - Call
$processService->start(...)— advances status torunning, recordsstarted_at, incrementsattempts. - Invoke the delegated controller method with a progress callback. The callback calls
$processService->progress(...)at each meaningful step. - Terminal: call
$processService->waitingReview(...)(AI generate jobs) or$processService->complete(...)(apply + CSV jobs).
The failed(\Throwable $exception) hook on all five jobs calls $processService->fail(...), which records the exception message in error_message and stamps finished_at (GenerateAiLocationImportJob.php:57-71).
Job-specific terminal states
GenerateAiLocationImportJobandGenerateAiLocationImprovementJob— both end inwaiting_review(progress clamped to ≥ 90 %). The draft batch is created and the user must approve or reject items before an apply job is dispatched.ApplyAiLocationImportJob,ApplyAiLocationImprovementJob, andImportCsvLocationsJob— all end incompletedwithprogress_percent = 100. The summary patch includes counts forimported/applied,failed,skipped,tags_created, and optionallyremainingandcategories_created.
The Process model
app/Models/Process.php is a plain Eloquent model — no custom scopes or boot logic. Key columns:
| Column | Type | Purpose |
|---|---|---|
workspace_id | int | Scopes the process to a workspace |
user_id | int | Owner — used for access control in ProcessController |
type | string | Process type string (see job roster table above) |
status | string | State machine value — see below |
progress_percent | int | 0–100 |
current_step | string | Human-readable step identifier |
summary | JSON array | Accumulated key/value pairs patched at each progress call |
error_message | string | Populated only on failed |
queue | string | Named queue the job runs on (stored for retry dispatch) |
attempts | int | Incremented by ProcessService::start on each attempt |
related_type / related_id | string / int | Optional polymorphic link to a batch or other resource |
input_payload | JSON array | Serialised input — used by ProcessController::retry to re-dispatch |
started_at / finished_at | datetime | Set by ProcessService |
summary and input_payload are cast to array (Process.php:27-32). summary is a shallow key/value bag that accumulates across progress() calls via ProcessService::mergeSummary (null values are stripped on each merge).
Process has one relationship: events() → hasMany(ProcessEvent::class)->orderBy('id') (Process.php:44-46).
The ProcessEvent model
app/Models/ProcessEvent.php records every discrete state transition as an append-only event log. Each call to any ProcessService mutation method appends one ProcessEvent row.
| Column | Type | Purpose |
|---|---|---|
process_id | int | Parent process |
type | string | queued, message, failed, cancelled, or custom |
status | string | Snapshot of Process.status at the time of the event |
progress_percent | int | Snapshot of progress at event time |
step | string | Step identifier |
message | string | Human-readable description |
payload | JSON array | Snapshot of Process.summary at event time |
payload is cast to array (ProcessEvent.php:20). Events are immutable once created.
ProcessService state machine
app/Services/ProcessService.php owns all state transitions via six public methods. The six valid statuses are defined as class constants (ProcessService.php:11-16):
queued → running → completed
→ waiting_review → [manual user action] → [new process]
→ failed
queued → cancelled
running → cancelled
waiting_review → cancelled| Method | Target status | Sets finished_at? |
|---|---|---|
create() | queued | No |
start() | running | No — sets started_at |
progress() | running | No |
waitingReview() | waiting_review | No |
complete() | completed | Yes |
fail() | failed | Yes |
cancel() | cancelled | Yes |
shouldStop() re-fetches the process from the database ($process->fresh()) and returns true if status === 'cancelled'. Jobs call this at the top of handle() to detect mid-dispatch cancellation.
Every state-changing method calls the private update() helper, which saves the model and appends one ProcessEvent (ProcessService.php:126-141). Progress values are clamped to 0–100. summary is always the merged result of existing summary plus the $summaryPatch argument; null values are stripped.
Frontend integration
Polling
GET /api/processes/{id} — returns the current Process as a ProcessResource JSON object. Clients may poll this endpoint at any interval.
GET /api/processes/{id}/events?after_id={n} — returns all ProcessEvent rows with id > after_id, ordered ascending. Use after_id = 0 (or omit) for the full event log (ProcessController.php:51-61).
Server-sent events (SSE)
GET /api/processes/{id}/stream — returns a StreamedResponse with Content-Type: text/event-stream. The stream implementation (ProcessController.php:63-104):
- Polls new
ProcessEventrows in awhileloop, sleeping 1 000 ms between iterations. - Each event is emitted as
event: process-event\ndata: {JSON}\n\n. - The loop terminates after 25 seconds or when the process enters any terminal/hold state (
completed,failed,cancelled,waiting_review). - Response headers disable all buffering:
Cache-Control: no-cache, no-transform,X-Accel-Buffering: no.
💡 Tip: The 25-second SSE window is shorter than the 180-second job timeout. For long jobs the frontend must reconnect (using the last received
after_idto avoid replaying events). TheuseProcessMonitorhook in the Next.js codebase handles reconnection automatically.
Retry and cancel
POST /api/processes/{id}/retry — only allowed when status === 'failed'. Creates a new Process record (copying workspace_id, type, queue, related_type, related_id, input_payload, and summary), then dispatches the correct job class based on $process->type via a match expression (ProcessController.php:126-133). Returns the new ProcessResource.
POST /api/processes/{id}/cancel — allowed when status is queued, running, or waiting_review. Calls ProcessService::cancel() directly and returns the updated ProcessResource. Running jobs detect cancellation via shouldStop() on their next iteration.
Route listing (for reference)
Defined in routes/api.php:86-91, all under the auth:api + auth_session + audit middleware group:
GET /api/processes
GET /api/processes/{id}
GET /api/processes/{id}/events
GET /api/processes/{id}/stream
POST /api/processes/{id}/retry
POST /api/processes/{id}/cancelProcessController::index returns the 30 most-recent processes for the authenticated user, sorted so active statuses (queued, running, waiting_review) float to the top (ProcessController.php:34-43).
All queries scope to user_id = $request->user()->id and optionally filter by workspace_id if the workspace_id query parameter is present.