Nuda Kit uses BullMQ via the @rlanz/bull-queue package for background job processing. Jobs are processed asynchronously using Redis as the message broker.
Queue configuration is in config/queue.ts:
import { defineConfig } from '@rlanz/bull-queue'
export default defineConfig({
defaultConnection: {
host: env.get('QUEUE_REDIS_HOST'),
port: env.get('QUEUE_REDIS_PORT'),
password: env.get('QUEUE_REDIS_PASSWORD'),
},
jobs: {
attempts: 3, // Retry failed jobs 3 times
removeOnComplete: 100, // Keep last 100 completed jobs
removeOnFail: 100, // Keep last 100 failed jobs
},
})
Environment variables:
QUEUE_REDIS_HOST=localhost
QUEUE_REDIS_PORT=6379
QUEUE_REDIS_PASSWORD=
To process jobs, you need to start queue workers. Each queue requires its own worker process.
# Start emails queue worker
node ace queue:listen --queue=emails
# Start user deletions queue worker
node ace queue:listen --queue=user_deletions
For development, run multiple terminals or use a process manager:
# Terminal 1: Email jobs
node ace queue:listen --queue=emails
# Terminal 2: User deletion jobs
node ace queue:listen --queue=user_deletions
Nuda Kit comes with two pre-configured queues:
| Queue | Purpose | Jobs |
|---|---|---|
emails | Email delivery | Verification, password reset, magic links, invitations |
user_deletions | Account cleanup | Delete user data, teams, and files |
Jobs are located in app/jobs/:
| Job | Queue | Description |
|---|---|---|
SendVerificationEmailJob | emails | Sends email verification link |
SendResetPasswordEmailJob | emails | Sends password reset email |
SendMagicLinkEmailJob | emails | Sends magic link for passwordless login |
SendInvitationEmailJob | emails | Sends team invitation email |
DeleteUserJob | user_deletions | Deletes user, owned teams, and uploaded files |
Jobs extend the Job class and implement a handle method:
import { Job } from '@rlanz/bull-queue'
export default class SendVerificationEmailJob extends Job {
// Required: Path to the job file
static get $$filepath() {
return import.meta.url
}
// Main job logic
async handle(payload: { userId: number }) {
const user = await User.findOrFail(payload.userId)
await mail.send(new VerifyEmailNotification(user))
}
// Called when all retries are exhausted
async rescue(payload: { userId: number }, error: Error) {
// Log failure, notify admin, etc.
}
}
Use Queue.dispatch() to add jobs to a queue:
import Queue from '@rlanz/bull-queue/services/main'
import SendVerificationEmailJob from '#jobs/send_verification_email_job'
// Dispatch to a specific queue
await Queue.dispatch(
SendVerificationEmailJob,
{ userId: user.id },
{ queueName: 'emails' }
)
await Queue.dispatch(
MyJob,
payload,
{
queueName: 'my_queue', // Queue name
delay: 5000, // Delay in ms before processing
attempts: 5, // Override default retry attempts
}
)
Create a new file in app/jobs/:
// app/jobs/send_welcome_email_job.ts
import { Job } from '@rlanz/bull-queue'
import mail from '@adonisjs/mail/services/main'
import User from '#models/user'
import WelcomeEmail from '#mails/welcome_email'
interface SendWelcomeEmailPayload {
userId: number
}
export default class SendWelcomeEmailJob extends Job {
static get $$filepath() {
return import.meta.url
}
async handle(payload: SendWelcomeEmailPayload) {
const user = await User.findOrFail(payload.userId)
await mail.send(new WelcomeEmail(user))
}
async rescue(payload: SendWelcomeEmailPayload, error: Error) {
console.error(`Failed to send welcome email to user ${payload.userId}:`, error)
}
}
import Queue from '@rlanz/bull-queue/services/main'
import SendWelcomeEmailJob from '#jobs/send_welcome_email_job'
await Queue.dispatch(
SendWelcomeEmailJob,
{ userId: user.id },
{ queueName: 'emails' }
)
You can create as many queues as needed. Simply use a new queue name when dispatching:
// Dispatch to a new "notifications" queue
await Queue.dispatch(
SendPushNotificationJob,
{ userId: user.id, message: 'Hello!' },
{ queueName: 'notifications' }
)
Then start a worker for the new queue:
node ace queue:listen --queue=notifications
| Queue | Use Case |
|---|---|
emails | Email delivery (SMTP can be slow) |
notifications | Push notifications, SMS |
exports | Large file exports, reports |
imports | Data imports, CSV processing |
cleanup | Scheduled maintenance tasks |
Jobs are often dispatched from event listeners. This keeps controllers clean:
// app/listeners/send_verification_email.ts
import { inject } from '@adonisjs/core'
import UserCreated from '#events/user_created'
import Queue from '@rlanz/bull-queue/services/main'
import SendVerificationEmailJob from '#jobs/send_verification_email_job'
@inject()
export default class SendVerificationEmail {
async handle(event: UserCreated) {
await Queue.dispatch(
SendVerificationEmailJob,
event.user.id,
{ queueName: 'emails' }
)
}
}
┌─────────────────────────────────────────────────────────┐
│ Job Dispatched │
└─────────────────────────┬───────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ Added to Queue │
│ (stored in Redis) │
└─────────────────────────┬───────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ Worker Picks Up Job │
│ (queue:listen must be running) │
└─────────────────────────┬───────────────────────────────┘
▼
┌─────────────┐
│ handle() │
│ called │
└──────┬──────┘
│
┌────────────┴────────────┐
▼ ▼
┌──────────┐ ┌──────────┐
│ Success │ │ Failed │
└────┬─────┘ └────┬─────┘
│ │
▼ ┌────┴────┐
┌─────────┐ ▼ ▼
│Completed│ ┌────────┐ ┌────────┐
└─────────┘ │ Retry │ │rescue()│
│(≤3x) │ │called │
└────────┘ └────────┘
Use the queue fake helper to test job dispatching without actually processing:
import { test } from '@japa/runner'
import { QueueFake } from '#tests/helpers/queue_fake'
import SendVerificationEmailJob from '#jobs/send_verification_email_job'
test.group('User Registration', (group) => {
let queueFake: QueueFake
group.each.setup(() => {
queueFake = new QueueFake()
queueFake.setup()
})
group.each.teardown(() => {
queueFake.restore()
})
test('dispatches verification email job', async ({ assert }) => {
// ... create user ...
const jobs = queueFake.getDispatchedJobs()
assert.lengthOf(jobs, 1)
assert.equal(jobs[0].jobClass, SendVerificationEmailJob)
assert.equal(jobs[0].options?.queueName, 'emails')
})
})
// ecosystem.config.js
module.exports = {
apps: [
{
name: 'api',
script: 'build/bin/server.js',
},
{
name: 'queue-emails',
script: 'node',
args: 'ace queue:listen --queue=emails',
cwd: './build',
},
{
name: 'queue-deletions',
script: 'node',
args: 'ace queue:listen --queue=user_deletions',
cwd: './build',
},
],
}
You can run multiple workers per queue for higher throughput:
# Run 3 email workers
pm2 start queue-emails -i 3