Now.js Framework Documentation

Now.js Framework Documentation

QueueManager

TH 15 Dec 2025 08:52

QueueManager

ภาพรวม

QueueManager คือระบบจัดการ queue สำหรับ tasks แบบ asynchronous ใน Now.js Framework รองรับ priority, retry และ rate limiting

ใช้เมื่อ:

  • ต้องการจัดลำดับ tasks
  • ต้องการ concurrency control
  • ต้องการ retry mechanism
  • ต้องการ batch processing

ทำไมต้องใช้:

  • ✅ Multiple queue types (FIFO, priority, delayed)
  • ✅ Task prioritization
  • ✅ Concurrency control
  • ✅ Retry with exponential backoff
  • ✅ Rate limiting
  • ✅ Persistence option
  • ✅ Statistics tracking

การใช้งานพื้นฐาน

Initialization

await QueueManager.init({
  enabled: true,
  defaultConcurrency: 5,
  defaultMaxRetries: 3,
  defaultRetryDelay: 1000
});

Create Queue

// สร้าง queue
const imageQueue = QueueManager.createQueue('images', {
  concurrency: 3,
  maxRetries: 2,
  retryDelay: 2000
});

// หรือรับ queue interface
const queue = QueueManager.getQueue('images');

Add Tasks

// Add task as function
QueueManager.addTask('images', async () => {
  await processImage(imageData);
});

// Add task with options
QueueManager.addTask('uploads', uploadFile, {
  priority: 'high',
  maxRetries: 5,
  timeout: 60000
});

// Add bulk tasks
QueueManager.addBulkTasks('images', [
  () => processImage(image1),
  () => processImage(image2),
  () => processImage(image3)
]);

Queue Types

FIFO Queue (default)

// First In, First Out
QueueManager.createQueue('default', {
  type: 'fifo'
});

Priority Queue

// Higher priority tasks processed first
QueueManager.createQueue('jobs', {
  type: 'priority'
});

// Add with priority
QueueManager.addTask('jobs', criticalTask, { priority: 10 });
QueueManager.addTask('jobs', normalTask, { priority: 5 });
QueueManager.addTask('jobs', lowTask, { priority: 1 });

Delayed Queue

// Tasks with delay
QueueManager.addTask('emails', sendEmail, {
  delay: 5000 // Wait 5 seconds before processing
});

Task Options

QueueManager.addTask('queue', taskFunction, {
  // Priority (higher = processed first)
  priority: 5,

  // Retry settings
  maxRetries: 3,
  retryDelay: 1000,
  backoffStrategy: 'exponential', // 'fixed', 'linear', 'exponential'

  // Timeout
  timeout: 30000, // 30 seconds

  // Delay before processing
  delay: 0,

  // Metadata
  id: 'custom-id',
  name: 'Upload Image',
  data: { imageId: 123 }
});

Queue Control

Pause/Resume

// Pause queue
QueueManager.pauseQueue('images');

// Resume queue
QueueManager.resumeQueue('images');

// Check if paused
const isPaused = queue.isPaused();

Clear Queue

// Clear all pending tasks
QueueManager.clearQueue('images');

Queue Statistics

const stats = QueueManager.getQueueStats('images');
// {
//   pending: 10,
//   processing: 3,
//   completed: 50,
//   failed: 2,
//   avgProcessingTime: 1500
// }

Queue Interface

const queue = QueueManager.getQueue('images');

// Add tasks
queue.add(task, options);
queue.addBulk(tasks, options);

// Control
queue.pause();
queue.resume();
queue.clear();

// Info
queue.getLength();
queue.getStats();
queue.isPaused();
queue.isEnabled();

// Settings
queue.setConcurrency(5);
queue.setPriority(10);

การตั้งค่า

await QueueManager.init({
  // Enable/disable
  enabled: true,
  debug: false,

  // Concurrency
  defaultConcurrency: 5,

  // Retry settings
  defaultMaxRetries: 3,
  defaultRetryDelay: 1000,
  defaultBackoffStrategy: 'exponential',

  // Timeout
  defaultTimeout: 30000,

  // Rate limiting
  rateLimit: {
    enabled: true,
    maxRequests: 100,
    windowMs: 60000
  },

  // Persistence
  persistence: false,
  persistenceInterval: 5000
});

API อ้างอิง

QueueManager.createQueue(name, options)

สร้าง queue ใหม่

Parameter Type Description
name string ชื่อ queue
options.concurrency number จำนวน concurrent tasks
options.maxRetries number Retry สูงสุด
options.retryDelay number Delay ก่อน retry (ms)
options.backoffStrategy string 'fixed', 'linear', 'exponential'

Returns: Object - Queue interface

QueueManager.getQueue(name)

รับ queue interface

Parameter Type Description
name string ชื่อ queue

Returns: Object - Queue interface

QueueManager.addTask(queueName, task, options)

เพิ่ม task

Parameter Type Description
queueName string ชื่อ queue
task function/object Task function หรือ object
options object Task options

Returns: string - Task ID

QueueManager.addBulkTasks(queueName, tasks, options)

เพิ่มหลาย tasks

Parameter Type Description
queueName string ชื่อ queue
tasks array Array of tasks
options object Options สำหรับทุก tasks

Returns: Array<string> - Task IDs

QueueManager.pauseQueue(name)

Pause queue

QueueManager.resumeQueue(name)

Resume queue

QueueManager.clearQueue(name)

Clear queue

QueueManager.getQueueStats(name)

Get statistics

Returns: Object:

{
  pending: number,
  processing: number,
  completed: number,
  failed: number,
  avgProcessingTime: number
}

เหตุการณ์

Event เมื่อเกิด Detail
queue:task:added Task เพิ่มใน queue {queue, taskId}
queue:task:started Task เริ่มทำงาน {queue, taskId}
queue:task:completed Task เสร็จ {queue, taskId, result}
queue:task:failed Task fail {queue, taskId, error}
queue:task:retry Task retry {queue, taskId, attempt}
EventManager.on('queue:task:completed', (data) => {
  console.log(`Task ${data.taskId} completed`);
});

ตัวอย่างการใช้งานจริง

Image Processing Queue

// Initialize
await QueueManager.init({ enabled: true });

// Create queue for image processing
QueueManager.createQueue('images', {
  concurrency: 3,
  maxRetries: 2
});

// Add image processing tasks
async function processImages(files) {
  for (const file of files) {
    QueueManager.addTask('images', async () => {
      // Upload
      const url = await uploadImage(file);

      // Generate thumbnails
      await generateThumbnail(url, 'small');
      await generateThumbnail(url, 'medium');

      return url;
    }, {
      priority: file.priority,
      name: file.name,
      timeout: 120000 // 2 minutes
    });
  }
}

// Listen for completion
EventManager.on('queue:task:completed', (data) => {
  if (data.queue === 'images') {
    updateProgress();
  }
});

Email Queue with Rate Limiting

QueueManager.createQueue('emails', {
  concurrency: 2, // 2 emails at a time
  maxRetries: 3,
  retryDelay: 5000
});

async function sendBulkEmails(users, template) {
  for (const user of users) {
    QueueManager.addTask('emails', async () => {
      await EmailService.send({
        to: user.email,
        template,
        data: { name: user.name }
      });
    }, {
      delay: 1000 // 1 second between emails
    });
  }
}

// Start sending
await sendBulkEmails(users, 'welcome');

API Request Queue

QueueManager.createQueue('api', {
  concurrency: 5,
  maxRetries: 3,
  backoffStrategy: 'exponential'
});

// Queue multiple API requests
const taskIds = QueueManager.addBulkTasks('api',
  items.map(item => async () => {
    return await ApiService.post('/api/process', item);
  }),
  { priority: 'medium' }
);

// Wait for all to complete
const results = await Promise.all(
  taskIds.map(id => waitForTask(id))
);

ข้อควรระวัง

⚠️ 1. Enable Queue ก่อนใช้

// ❌ Queue disabled
QueueManager.addTask('queue', task);

// ✅ ต้อง enable ก่อน
await QueueManager.init({ enabled: true });
QueueManager.addTask('queue', task);

⚠️ 2. Handle Task Errors

QueueManager.addTask('queue', async () => {
  try {
    await riskyOperation();
  } catch (error) {
    // Log error, queue will retry automatically
    console.error('Task failed:', error);
    throw error; // Re-throw for retry
  }
});

⚠️ 3. Set Appropriate Timeout

// ❌ ไม่มี timeout - อาจ hang
QueueManager.addTask('queue', longRunningTask);

// ✅ ตั้ง timeout ที่เหมาะสม
QueueManager.addTask('queue', longRunningTask, {
  timeout: 120000 // 2 minutes
});

เอกสารที่เกี่ยวข้อง