Now.js Framework Documentation
QueueManager
QueueManager
Overview
QueueManager is the queue management system for asynchronous tasks in Now.js Framework. It supports priority, retry, and rate limiting.
When to use:
- Need to order tasks
- Need concurrency control
- Need retry mechanism
- Need batch processing
Why use it:
- ✅ Multiple queue types (FIFO, priority, delayed)
- ✅ Task prioritization
- ✅ Concurrency control
- ✅ Retry with exponential backoff
- ✅ Rate limiting
- ✅ Persistence option
- ✅ Statistics tracking
Basic Usage
Initialization
await QueueManager.init({
enabled: true,
defaultConcurrency: 5,
defaultMaxRetries: 3,
defaultRetryDelay: 1000
});Create Queue
// Create queue
const imageQueue = QueueManager.createQueue('images', {
concurrency: 3,
maxRetries: 2,
retryDelay: 2000
});
// Or get queue interface
const queue = QueueManager.getQueue('images');Add Tasks
// Add task as function
QueueManager.addTask('images', async () => {
await processImage(imageData);
});
// Add task with options
QueueManager.addTask('uploads', uploadFile, {
priority: 'high',
maxRetries: 5,
timeout: 60000
});
// Add bulk tasks
QueueManager.addBulkTasks('images', [
() => processImage(image1),
() => processImage(image2),
() => processImage(image3)
]);Queue Types
FIFO Queue (default)
// First In, First Out
QueueManager.createQueue('default', {
type: 'fifo'
});Priority Queue
// Higher priority tasks processed first
QueueManager.createQueue('jobs', {
type: 'priority'
});
// Add with priority
QueueManager.addTask('jobs', criticalTask, { priority: 10 });
QueueManager.addTask('jobs', normalTask, { priority: 5 });
QueueManager.addTask('jobs', lowTask, { priority: 1 });Delayed Queue
// Tasks with delay
QueueManager.addTask('emails', sendEmail, {
delay: 5000 // Wait 5 seconds before processing
});Task Options
QueueManager.addTask('queue', taskFunction, {
// Priority (higher = processed first)
priority: 5,
// Retry settings
maxRetries: 3,
retryDelay: 1000,
backoffStrategy: 'exponential', // 'fixed', 'linear', 'exponential'
// Timeout
timeout: 30000, // 30 seconds
// Delay before processing
delay: 0,
// Metadata
id: 'custom-id',
name: 'Upload Image',
data: { imageId: 123 }
});Queue Control
Pause/Resume
// Pause queue
QueueManager.pauseQueue('images');
// Resume queue
QueueManager.resumeQueue('images');
// Check if paused
const isPaused = queue.isPaused();Clear Queue
// Clear all pending tasks
QueueManager.clearQueue('images');Queue Statistics
const stats = QueueManager.getQueueStats('images');
// {
// pending: 10,
// processing: 3,
// completed: 50,
// failed: 2,
// avgProcessingTime: 1500
// }Queue Interface
const queue = QueueManager.getQueue('images');
// Add tasks
queue.add(task, options);
queue.addBulk(tasks, options);
// Control
queue.pause();
queue.resume();
queue.clear();
// Info
queue.getLength();
queue.getStats();
queue.isPaused();
queue.isEnabled();
// Settings
queue.setConcurrency(5);
queue.setPriority(10);Configuration
await QueueManager.init({
// Enable/disable
enabled: true,
debug: false,
// Concurrency
defaultConcurrency: 5,
// Retry settings
defaultMaxRetries: 3,
defaultRetryDelay: 1000,
defaultBackoffStrategy: 'exponential',
// Timeout
defaultTimeout: 30000,
// Rate limiting
rateLimit: {
enabled: true,
maxRequests: 100,
windowMs: 60000
},
// Persistence
persistence: false,
persistenceInterval: 5000
});API Reference
QueueManager.createQueue(name, options)
Create a new queue
| Parameter | Type | Description |
|---|---|---|
name |
string | Queue name |
options.concurrency |
number | Number of concurrent tasks |
options.maxRetries |
number | Maximum retries |
options.retryDelay |
number | Delay before retry (ms) |
options.backoffStrategy |
string | 'fixed', 'linear', 'exponential' |
Returns: Object - Queue interface
QueueManager.getQueue(name)
Get queue interface
| Parameter | Type | Description |
|---|---|---|
name |
string | Queue name |
Returns: Object - Queue interface
QueueManager.addTask(queueName, task, options)
Add a task
| Parameter | Type | Description |
|---|---|---|
queueName |
string | Queue name |
task |
function/object | Task function or object |
options |
object | Task options |
Returns: string - Task ID
QueueManager.addBulkTasks(queueName, tasks, options)
Add multiple tasks
| Parameter | Type | Description |
|---|---|---|
queueName |
string | Queue name |
tasks |
array | Array of tasks |
options |
object | Options for all tasks |
Returns: Array<string> - Task IDs
QueueManager.pauseQueue(name)
Pause a queue
QueueManager.resumeQueue(name)
Resume a queue
QueueManager.clearQueue(name)
Clear a queue
QueueManager.getQueueStats(name)
Get statistics
Returns: Object:
{
pending: number,
processing: number,
completed: number,
failed: number,
avgProcessingTime: number
}Events
| Event | When Triggered | Detail |
|---|---|---|
queue:task:added |
Task added to queue | {queue, taskId} |
queue:task:started |
Task started | {queue, taskId} |
queue:task:completed |
Task completed | {queue, taskId, result} |
queue:task:failed |
Task failed | {queue, taskId, error} |
queue:task:retry |
Task retrying | {queue, taskId, attempt} |
EventManager.on('queue:task:completed', (data) => {
console.log(`Task ${data.taskId} completed`);
});Real-World Examples
Image Processing Queue
// Initialize
await QueueManager.init({ enabled: true });
// Create queue for image processing
QueueManager.createQueue('images', {
concurrency: 3,
maxRetries: 2
});
// Add image processing tasks
async function processImages(files) {
for (const file of files) {
QueueManager.addTask('images', async () => {
// Upload
const url = await uploadImage(file);
// Generate thumbnails
await generateThumbnail(url, 'small');
await generateThumbnail(url, 'medium');
return url;
}, {
priority: file.priority,
name: file.name,
timeout: 120000 // 2 minutes
});
}
}
// Listen for completion
EventManager.on('queue:task:completed', (data) => {
if (data.queue === 'images') {
updateProgress();
}
});Email Queue with Rate Limiting
QueueManager.createQueue('emails', {
concurrency: 2, // 2 emails at a time
maxRetries: 3,
retryDelay: 5000
});
async function sendBulkEmails(users, template) {
for (const user of users) {
QueueManager.addTask('emails', async () => {
await EmailService.send({
to: user.email,
template,
data: { name: user.name }
});
}, {
delay: 1000 // 1 second between emails
});
}
}
// Start sending
await sendBulkEmails(users, 'welcome');API Request Queue
QueueManager.createQueue('api', {
concurrency: 5,
maxRetries: 3,
backoffStrategy: 'exponential'
});
// Queue multiple API requests
const taskIds = QueueManager.addBulkTasks('api',
items.map(item => async () => {
return await ApiService.post('/api/process', item);
}),
{ priority: 'medium' }
);
// Wait for all to complete
const results = await Promise.all(
taskIds.map(id => waitForTask(id))
);Common Pitfalls
⚠️ 1. Enable Queue Before Use
// ❌ Queue disabled
QueueManager.addTask('queue', task);
// ✅ Enable first
await QueueManager.init({ enabled: true });
QueueManager.addTask('queue', task);⚠️ 2. Handle Task Errors
QueueManager.addTask('queue', async () => {
try {
await riskyOperation();
} catch (error) {
// Log error, queue will retry automatically
console.error('Task failed:', error);
throw error; // Re-throw for retry
}
});⚠️ 3. Set Appropriate Timeout
// ❌ No timeout - may hang
QueueManager.addTask('queue', longRunningTask);
// ✅ Set appropriate timeout
QueueManager.addTask('queue', longRunningTask, {
timeout: 120000 // 2 minutes
});Related Documentation
- SyncManager - Offline synchronization
- ApiService - API requests
- EventManager - Events