Now.js Framework Documentation

Now.js Framework Documentation

QueueManager

EN 15 Dec 2025 06:42

QueueManager

Overview

QueueManager is the queue management system for asynchronous tasks in Now.js Framework. It supports priority, retry, and rate limiting.

When to use:

  • Need to order tasks
  • Need concurrency control
  • Need retry mechanism
  • Need batch processing

Why use it:

  • ✅ Multiple queue types (FIFO, priority, delayed)
  • ✅ Task prioritization
  • ✅ Concurrency control
  • ✅ Retry with exponential backoff
  • ✅ Rate limiting
  • ✅ Persistence option
  • ✅ Statistics tracking

Basic Usage

Initialization

await QueueManager.init({
  enabled: true,
  defaultConcurrency: 5,
  defaultMaxRetries: 3,
  defaultRetryDelay: 1000
});

Create Queue

// Create queue
const imageQueue = QueueManager.createQueue('images', {
  concurrency: 3,
  maxRetries: 2,
  retryDelay: 2000
});

// Or get queue interface
const queue = QueueManager.getQueue('images');

Add Tasks

// Add task as function
QueueManager.addTask('images', async () => {
  await processImage(imageData);
});

// Add task with options
QueueManager.addTask('uploads', uploadFile, {
  priority: 'high',
  maxRetries: 5,
  timeout: 60000
});

// Add bulk tasks
QueueManager.addBulkTasks('images', [
  () => processImage(image1),
  () => processImage(image2),
  () => processImage(image3)
]);

Queue Types

FIFO Queue (default)

// First In, First Out
QueueManager.createQueue('default', {
  type: 'fifo'
});

Priority Queue

// Higher priority tasks processed first
QueueManager.createQueue('jobs', {
  type: 'priority'
});

// Add with priority
QueueManager.addTask('jobs', criticalTask, { priority: 10 });
QueueManager.addTask('jobs', normalTask, { priority: 5 });
QueueManager.addTask('jobs', lowTask, { priority: 1 });

Delayed Queue

// Tasks with delay
QueueManager.addTask('emails', sendEmail, {
  delay: 5000 // Wait 5 seconds before processing
});

Task Options

QueueManager.addTask('queue', taskFunction, {
  // Priority (higher = processed first)
  priority: 5,

  // Retry settings
  maxRetries: 3,
  retryDelay: 1000,
  backoffStrategy: 'exponential', // 'fixed', 'linear', 'exponential'

  // Timeout
  timeout: 30000, // 30 seconds

  // Delay before processing
  delay: 0,

  // Metadata
  id: 'custom-id',
  name: 'Upload Image',
  data: { imageId: 123 }
});

Queue Control

Pause/Resume

// Pause queue
QueueManager.pauseQueue('images');

// Resume queue
QueueManager.resumeQueue('images');

// Check if paused
const isPaused = queue.isPaused();

Clear Queue

// Clear all pending tasks
QueueManager.clearQueue('images');

Queue Statistics

const stats = QueueManager.getQueueStats('images');
// {
//   pending: 10,
//   processing: 3,
//   completed: 50,
//   failed: 2,
//   avgProcessingTime: 1500
// }

Queue Interface

const queue = QueueManager.getQueue('images');

// Add tasks
queue.add(task, options);
queue.addBulk(tasks, options);

// Control
queue.pause();
queue.resume();
queue.clear();

// Info
queue.getLength();
queue.getStats();
queue.isPaused();
queue.isEnabled();

// Settings
queue.setConcurrency(5);
queue.setPriority(10);

Configuration

await QueueManager.init({
  // Enable/disable
  enabled: true,
  debug: false,

  // Concurrency
  defaultConcurrency: 5,

  // Retry settings
  defaultMaxRetries: 3,
  defaultRetryDelay: 1000,
  defaultBackoffStrategy: 'exponential',

  // Timeout
  defaultTimeout: 30000,

  // Rate limiting
  rateLimit: {
    enabled: true,
    maxRequests: 100,
    windowMs: 60000
  },

  // Persistence
  persistence: false,
  persistenceInterval: 5000
});

API Reference

QueueManager.createQueue(name, options)

Create a new queue

Parameter Type Description
name string Queue name
options.concurrency number Number of concurrent tasks
options.maxRetries number Maximum retries
options.retryDelay number Delay before retry (ms)
options.backoffStrategy string 'fixed', 'linear', 'exponential'

Returns: Object - Queue interface

QueueManager.getQueue(name)

Get queue interface

Parameter Type Description
name string Queue name

Returns: Object - Queue interface

QueueManager.addTask(queueName, task, options)

Add a task

Parameter Type Description
queueName string Queue name
task function/object Task function or object
options object Task options

Returns: string - Task ID

QueueManager.addBulkTasks(queueName, tasks, options)

Add multiple tasks

Parameter Type Description
queueName string Queue name
tasks array Array of tasks
options object Options for all tasks

Returns: Array<string> - Task IDs

QueueManager.pauseQueue(name)

Pause a queue

QueueManager.resumeQueue(name)

Resume a queue

QueueManager.clearQueue(name)

Clear a queue

QueueManager.getQueueStats(name)

Get statistics

Returns: Object:

{
  pending: number,
  processing: number,
  completed: number,
  failed: number,
  avgProcessingTime: number
}

Events

Event When Triggered Detail
queue:task:added Task added to queue {queue, taskId}
queue:task:started Task started {queue, taskId}
queue:task:completed Task completed {queue, taskId, result}
queue:task:failed Task failed {queue, taskId, error}
queue:task:retry Task retrying {queue, taskId, attempt}
EventManager.on('queue:task:completed', (data) => {
  console.log(`Task ${data.taskId} completed`);
});

Real-World Examples

Image Processing Queue

// Initialize
await QueueManager.init({ enabled: true });

// Create queue for image processing
QueueManager.createQueue('images', {
  concurrency: 3,
  maxRetries: 2
});

// Add image processing tasks
async function processImages(files) {
  for (const file of files) {
    QueueManager.addTask('images', async () => {
      // Upload
      const url = await uploadImage(file);

      // Generate thumbnails
      await generateThumbnail(url, 'small');
      await generateThumbnail(url, 'medium');

      return url;
    }, {
      priority: file.priority,
      name: file.name,
      timeout: 120000 // 2 minutes
    });
  }
}

// Listen for completion
EventManager.on('queue:task:completed', (data) => {
  if (data.queue === 'images') {
    updateProgress();
  }
});

Email Queue with Rate Limiting

QueueManager.createQueue('emails', {
  concurrency: 2, // 2 emails at a time
  maxRetries: 3,
  retryDelay: 5000
});

async function sendBulkEmails(users, template) {
  for (const user of users) {
    QueueManager.addTask('emails', async () => {
      await EmailService.send({
        to: user.email,
        template,
        data: { name: user.name }
      });
    }, {
      delay: 1000 // 1 second between emails
    });
  }
}

// Start sending
await sendBulkEmails(users, 'welcome');

API Request Queue

QueueManager.createQueue('api', {
  concurrency: 5,
  maxRetries: 3,
  backoffStrategy: 'exponential'
});

// Queue multiple API requests
const taskIds = QueueManager.addBulkTasks('api',
  items.map(item => async () => {
    return await ApiService.post('/api/process', item);
  }),
  { priority: 'medium' }
);

// Wait for all to complete
const results = await Promise.all(
  taskIds.map(id => waitForTask(id))
);

Common Pitfalls

⚠️ 1. Enable Queue Before Use

// ❌ Queue disabled
QueueManager.addTask('queue', task);

// ✅ Enable first
await QueueManager.init({ enabled: true });
QueueManager.addTask('queue', task);

⚠️ 2. Handle Task Errors

QueueManager.addTask('queue', async () => {
  try {
    await riskyOperation();
  } catch (error) {
    // Log error, queue will retry automatically
    console.error('Task failed:', error);
    throw error; // Re-throw for retry
  }
});

⚠️ 3. Set Appropriate Timeout

// ❌ No timeout - may hang
QueueManager.addTask('queue', longRunningTask);

// ✅ Set appropriate timeout
QueueManager.addTask('queue', longRunningTask, {
  timeout: 120000 // 2 minutes
});