Backend Development

Queues & Jobs

Background job processing with BullMQ.

Nuda Kit uses BullMQ via the @rlanz/bull-queue package for background job processing. Jobs are processed asynchronously using Redis as the message broker.

Configuration

Queue configuration is in config/queue.ts:

import { defineConfig } from '@rlanz/bull-queue'

export default defineConfig({
  defaultConnection: {
    host: env.get('QUEUE_REDIS_HOST'),
    port: env.get('QUEUE_REDIS_PORT'),
    password: env.get('QUEUE_REDIS_PASSWORD'),
  },

  jobs: {
    attempts: 3,           // Retry failed jobs 3 times
    removeOnComplete: 100, // Keep last 100 completed jobs
    removeOnFail: 100,     // Keep last 100 failed jobs
  },
})

Environment variables:

QUEUE_REDIS_HOST=localhost
QUEUE_REDIS_PORT=6379
QUEUE_REDIS_PASSWORD=

Running Queue Workers

To process jobs, you need to start queue workers. Each queue requires its own worker process.

Start Individual Workers

# Start emails queue worker
node ace queue:listen --queue=emails

# Start user deletions queue worker
node ace queue:listen --queue=user_deletions

Run All Queues (Development)

For development, run multiple terminals or use a process manager:

# Terminal 1: Email jobs
node ace queue:listen --queue=emails

# Terminal 2: User deletion jobs
node ace queue:listen --queue=user_deletions
Queue workers must be running for jobs to be processed. In production, use a process manager like PM2 or supervisor to keep workers alive.

Existing Queues

Nuda Kit comes with two pre-configured queues:

QueuePurposeJobs
emailsEmail deliveryVerification, password reset, magic links, invitations
user_deletionsAccount cleanupDelete user data, teams, and files

Existing Jobs

Jobs are located in app/jobs/:

JobQueueDescription
SendVerificationEmailJobemailsSends email verification link
SendResetPasswordEmailJobemailsSends password reset email
SendMagicLinkEmailJobemailsSends magic link for passwordless login
SendInvitationEmailJobemailsSends team invitation email
DeleteUserJobuser_deletionsDeletes user, owned teams, and uploaded files

Job Structure

Jobs extend the Job class and implement a handle method:

import { Job } from '@rlanz/bull-queue'

export default class SendVerificationEmailJob extends Job {
  // Required: Path to the job file
  static get $$filepath() {
    return import.meta.url
  }

  // Main job logic
  async handle(payload: { userId: number }) {
    const user = await User.findOrFail(payload.userId)
    await mail.send(new VerifyEmailNotification(user))
  }

  // Called when all retries are exhausted
  async rescue(payload: { userId: number }, error: Error) {
    // Log failure, notify admin, etc.
  }
}

Dispatching Jobs

Use Queue.dispatch() to add jobs to a queue:

import Queue from '@rlanz/bull-queue/services/main'
import SendVerificationEmailJob from '#jobs/send_verification_email_job'

// Dispatch to a specific queue
await Queue.dispatch(
  SendVerificationEmailJob,
  { userId: user.id },
  { queueName: 'emails' }
)

Dispatch Options

await Queue.dispatch(
  MyJob,
  payload,
  {
    queueName: 'my_queue',  // Queue name
    delay: 5000,            // Delay in ms before processing
    attempts: 5,            // Override default retry attempts
  }
)

Creating New Jobs

1. Create the Job File

Create a new file in app/jobs/:

// app/jobs/send_welcome_email_job.ts

import { Job } from '@rlanz/bull-queue'
import mail from '@adonisjs/mail/services/main'
import User from '#models/user'
import WelcomeEmail from '#mails/welcome_email'

interface SendWelcomeEmailPayload {
  userId: number
}

export default class SendWelcomeEmailJob extends Job {
  static get $$filepath() {
    return import.meta.url
  }

  async handle(payload: SendWelcomeEmailPayload) {
    const user = await User.findOrFail(payload.userId)
    await mail.send(new WelcomeEmail(user))
  }

  async rescue(payload: SendWelcomeEmailPayload, error: Error) {
    console.error(`Failed to send welcome email to user ${payload.userId}:`, error)
  }
}

2. Dispatch the Job

import Queue from '@rlanz/bull-queue/services/main'
import SendWelcomeEmailJob from '#jobs/send_welcome_email_job'

await Queue.dispatch(
  SendWelcomeEmailJob,
  { userId: user.id },
  { queueName: 'emails' }
)

Creating New Queues

You can create as many queues as needed. Simply use a new queue name when dispatching:

// Dispatch to a new "notifications" queue
await Queue.dispatch(
  SendPushNotificationJob,
  { userId: user.id, message: 'Hello!' },
  { queueName: 'notifications' }
)

Then start a worker for the new queue:

node ace queue:listen --queue=notifications

Queue Organization Tips

QueueUse Case
emailsEmail delivery (SMTP can be slow)
notificationsPush notifications, SMS
exportsLarge file exports, reports
importsData imports, CSV processing
cleanupScheduled maintenance tasks

Event-Driven Jobs

Jobs are often dispatched from event listeners. This keeps controllers clean:

// app/listeners/send_verification_email.ts

import { inject } from '@adonisjs/core'
import UserCreated from '#events/user_created'
import Queue from '@rlanz/bull-queue/services/main'
import SendVerificationEmailJob from '#jobs/send_verification_email_job'

@inject()
export default class SendVerificationEmail {
  async handle(event: UserCreated) {
    await Queue.dispatch(
      SendVerificationEmailJob,
      event.user.id,
      { queueName: 'emails' }
    )
  }
}

Job Lifecycle

┌─────────────────────────────────────────────────────────┐
│                     Job Dispatched                       │
└─────────────────────────┬───────────────────────────────┘
                          ▼
┌─────────────────────────────────────────────────────────┐
│                    Added to Queue                        │
│                 (stored in Redis)                        │
└─────────────────────────┬───────────────────────────────┘
                          ▼
┌─────────────────────────────────────────────────────────┐
│                  Worker Picks Up Job                     │
│            (queue:listen must be running)                │
└─────────────────────────┬───────────────────────────────┘
                          ▼
                    ┌─────────────┐
                    │   handle()  │
                    │   called    │
                    └──────┬──────┘
                           │
              ┌────────────┴────────────┐
              ▼                         ▼
        ┌──────────┐              ┌──────────┐
        │ Success  │              │  Failed  │
        └────┬─────┘              └────┬─────┘
             │                         │
             ▼                    ┌────┴────┐
        ┌─────────┐               ▼         ▼
        │Completed│          ┌────────┐ ┌────────┐
        └─────────┘          │ Retry  │ │rescue()│
                             │(≤3x)   │ │called  │
                             └────────┘ └────────┘

Testing Jobs

Use the queue fake helper to test job dispatching without actually processing:

import { test } from '@japa/runner'
import { QueueFake } from '#tests/helpers/queue_fake'
import SendVerificationEmailJob from '#jobs/send_verification_email_job'

test.group('User Registration', (group) => {
  let queueFake: QueueFake

  group.each.setup(() => {
    queueFake = new QueueFake()
    queueFake.setup()
  })

  group.each.teardown(() => {
    queueFake.restore()
  })

  test('dispatches verification email job', async ({ assert }) => {
    // ... create user ...

    const jobs = queueFake.getDispatchedJobs()
    
    assert.lengthOf(jobs, 1)
    assert.equal(jobs[0].jobClass, SendVerificationEmailJob)
    assert.equal(jobs[0].options?.queueName, 'emails')
  })
})

Production Setup

PM2 Configuration

// ecosystem.config.js
module.exports = {
  apps: [
    {
      name: 'api',
      script: 'build/bin/server.js',
    },
    {
      name: 'queue-emails',
      script: 'node',
      args: 'ace queue:listen --queue=emails',
      cwd: './build',
    },
    {
      name: 'queue-deletions',
      script: 'node',
      args: 'ace queue:listen --queue=user_deletions',
      cwd: './build',
    },
  ],
}

Scaling Workers

You can run multiple workers per queue for higher throughput:

# Run 3 email workers
pm2 start queue-emails -i 3

Resources