message-queues
PassAsync communication patterns using message brokers and task queues. Use when building event-driven systems, background job processing, or service decoupling. Covers Kafka (event streaming), RabbitMQ (complex routing), NATS (cloud-native), Redis Streams, Celery (Python), BullMQ (TypeScript), Temporal (workflows), and event sourcing patterns.
(0)
0stars
0downloads
0views
Install Skill
Skills are third-party code from public GitHub repositories. SkillHub scans for known malicious patterns but cannot guarantee safety. Review the source code before installing.
Install globally (user-level):
npx skillhub install Albmartinez13/ai-design-components/message-queuesInstall in current project:
npx skillhub install Albmartinez13/ai-design-components/message-queues --projectSuggested path: ~/.claude/skills/message-queues/
SKILL.md Content
---
name: message-queues
description: Async communication patterns using message brokers and task queues. Use when building event-driven systems, background job processing, or service decoupling. Covers Kafka (event streaming), RabbitMQ (complex routing), NATS (cloud-native), Redis Streams, Celery (Python), BullMQ (TypeScript), Temporal (workflows), and event sourcing patterns.
---
# Message Queues
Implement asynchronous communication patterns for event-driven architectures, background job processing, and service decoupling.
## When to Use This Skill
Use message queues when:
- **Long-running operations** block HTTP requests (report generation, video processing)
- **Service decoupling** required (microservices, event-driven architecture)
- **Guaranteed delivery** needed (payment processing, order fulfillment)
- **Event streaming** for analytics (log aggregation, metrics pipelines)
- **Workflow orchestration** for complex processes (multi-step sagas, human-in-the-loop)
- **Background job processing** (email sending, image resizing)
## Broker Selection Decision Tree
Choose message broker based on primary need:
### Event Streaming / Log Aggregation
**→ Apache Kafka**
- Throughput: 500K-1M msg/s
- Replay events (event sourcing)
- Exactly-once semantics
- Long-term retention
- Use: Analytics pipelines, CQRS, event sourcing
### Simple Background Jobs
**→ Task Queues**
- **Python** → Celery + Redis
- **TypeScript** → BullMQ + Redis
- **Go** → Asynq + Redis
- Use: Email sending, report generation, webhooks
### Complex Workflows / Sagas
**→ Temporal**
- Durable execution (survives restarts)
- Saga pattern support
- Human-in-the-loop workflows
- Use: Order processing, AI agent orchestration
### Request-Reply / RPC Patterns
**→ NATS**
- Built-in request-reply
- Sub-millisecond latency
- Cloud-native, simple operations
- Use: Microservices RPC, IoT command/control
### Complex Message Routing
**→ RabbitMQ**
- Exchanges (direct, topic, fanout, headers)
- Dead letter exchanges
- Message TTL, priorities
- Use: Multi-consumer patterns, pub/sub
### Already Using Redis
**→ Redis Streams**
- No new infrastructure
- Simple consumer groups
- Moderate throughput (100K+ msg/s)
- Use: Notification queues, simple job queues
## Performance Comparison
| Broker | Throughput | Latency (p99) | Best For |
|--------|-----------|---------------|----------|
| **Kafka** | 500K-1M msg/s | 10-50ms | Event streaming |
| **NATS JetStream** | 200K-400K msg/s | Sub-ms to 5ms | Cloud-native microservices |
| **RabbitMQ** | 50K-100K msg/s | 5-20ms | Task queues, complex routing |
| **Redis Streams** | 100K+ msg/s | Sub-ms | Simple queues, caching |
## Quick Start Examples
### Kafka Producer/Consumer (Python)
See `examples/kafka-python/` for working code.
```python
from confluent_kafka import Producer, Consumer
# Producer
producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.produce('orders', key='order_123', value='{"status": "created"}')
producer.flush()
# Consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processors',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['orders'])
while True:
msg = consumer.poll(1.0)
if msg is not None:
process_order(msg.value())
```
### Celery Background Jobs (Python)
See `examples/celery-image-processing/` for full implementation.
```python
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379')
@app.task(bind=True, max_retries=3)
def process_image(self, image_url: str):
try:
result = expensive_image_processing(image_url)
return result
except RecoverableError as e:
raise self.retry(exc=e, countdown=60)
```
### BullMQ Job Processing (TypeScript)
See `examples/bullmq-webhook-processor/` for full implementation.
```typescript
import { Queue, Worker } from 'bullmq'
const queue = new Queue('webhooks', {
connection: { host: 'localhost', port: 6379 }
})
// Enqueue job
await queue.add('send-webhook', {
url: 'https://example.com/webhook',
payload: { event: 'order.created' }
})
// Process jobs
const worker = new Worker('webhooks', async job => {
await fetch(job.data.url, {
method: 'POST',
body: JSON.stringify(job.data.payload)
})
}, { connection: { host: 'localhost', port: 6379 } })
```
### Temporal Workflow Orchestration
See `examples/temporal-order-saga/` for saga pattern implementation.
```python
from temporalio import workflow, activity
from datetime import timedelta
@workflow.defn
class OrderSagaWorkflow:
@workflow.run
async def run(self, order_id: str) -> str:
# Step 1: Reserve inventory
inventory_id = await workflow.execute_activity(
reserve_inventory,
order_id,
start_to_close_timeout=timedelta(seconds=10),
)
# Step 2: Charge payment
payment_id = await workflow.execute_activity(
charge_payment,
order_id,
start_to_close_timeout=timedelta(seconds=30),
)
return f"Order {order_id} completed"
```
## Core Patterns
### Event Naming Convention
Use: `Domain.Entity.Action.Version`
Examples:
- `order.created.v1`
- `user.profile.updated.v2`
- `payment.failed.v1`
### Event Schema Structure
```json
{
"event_type": "order.created.v2",
"event_id": "uuid-here",
"timestamp": "2025-12-02T10:00:00Z",
"version": "2.0",
"data": {
"order_id": "ord_123",
"customer_id": "cus_456"
},
"metadata": {
"producer": "order-service",
"trace_id": "abc123",
"correlation_id": "xyz789"
}
}
```
### Dead Letter Queue Pattern
Route failed messages to dead letter queue (DLQ) after max retries:
```python
@app.task(bind=True, max_retries=3)
def process_order(self, order_id: str):
try:
result = perform_processing(order_id)
return result
except UnrecoverableError as e:
send_to_dlq(order_id, str(e))
raise Reject(e, requeue=False)
```
### Idempotency for Exactly-Once Processing
```python
@app.post("/process")
async def process_payment(
payment_data: dict,
idempotency_key: str = Header(None)
):
# Check if already processed
cached_result = redis_client.get(f"idempotency:{idempotency_key}")
if cached_result:
return {"status": "already_processed"}
result = process_payment_logic(payment_data)
redis_client.setex(f"idempotency:{idempotency_key}", 86400, result)
return {"status": "processed", "result": result}
```
## Frontend Integration
### Job Status Updates via SSE
```python
# FastAPI endpoint for real-time job status
@app.get("/status/{task_id}")
async def task_status_stream(task_id: str):
async def event_generator():
while True:
task = celery_app.AsyncResult(task_id)
if task.state == 'PROGRESS':
yield {"event": "progress", "data": task.info.get('progress', 0)}
elif task.state == 'SUCCESS':
yield {"event": "complete", "data": task.result}
break
await asyncio.sleep(0.5)
return EventSourceResponse(event_generator())
```
### React Component
```typescript
export function JobStatus({ jobId }: { jobId: string }) {
const [progress, setProgress] = useState(0)
useEffect(() => {
const eventSource = new EventSource(`/api/status/${jobId}`)
eventSource.addEventListener('progress', (e) => {
setProgress(JSON.parse(e.data))
})
eventSource.addEventListener('complete', (e) => {
toast({ title: 'Job complete', description: JSON.parse(e.data) })
eventSource.close()
})
return () => eventSource.close()
}, [jobId])
return <ProgressBar value={progress} />
}
```
## Detailed Guides
For comprehensive documentation, see reference files:
### Broker-Specific Guides
- **Kafka**: See `references/kafka.md` for partitioning, consumer groups, exactly-once semantics
- **RabbitMQ**: See `references/rabbitmq.md` for exchanges, bindings, routing patterns
- **NATS**: See `references/nats.md` for JetStream, request-reply patterns
- **Redis Streams**: See `references/redis-streams.md` for consumer groups, acknowledgments
### Task Queue Guides
- **Celery**: See `references/celery.md` for periodic tasks, canvas (workflows), monitoring
- **BullMQ**: See `references/bullmq.md` for job prioritization, flows, Bull Board monitoring
- **Temporal**: See `references/temporal-workflows.md` for saga patterns, signals, queries
### Pattern Guides
- **Event Patterns**: See `references/event-patterns.md` for event sourcing, CQRS, outbox pattern
## Common Anti-Patterns to Avoid
### 1. Synchronous API for Long Operations
```python
# ❌ BAD: Blocks request thread
@app.post("/generate-report")
def generate_report(user_id: str):
report = expensive_computation(user_id) # 5 minutes!
return report
# ✅ GOOD: Enqueue background job
@app.post("/generate-report")
async def generate_report(user_id: str):
task = generate_report_task.delay(user_id)
return {"task_id": task.id}
```
### 2. Non-Idempotent Consumers
```python
# ❌ BAD: Processes duplicates
@app.task
def send_email(email: str):
send_email_service(email) # Sends twice if retried!
# ✅ GOOD: Idempotent with deduplication
@app.task
def send_email(email: str, idempotency_key: str):
if redis.exists(f"sent:{idempotency_key}"):
return "already_sent"
send_email_service(email)
redis.setex(f"sent:{idempotency_key}", 86400, "1")
```
### 3. Ignoring Dead Letter Queues
```python
# ❌ BAD: Failed messages lost forever
@app.task(max_retries=3)
def risky_task(data):
process(data) # If all retries fail, data disappears
# ✅ GOOD: DLQ for manual inspection
@app.task(max_retries=3)
def risky_task(data):
try:
process(data)
except Exception as e:
if self.request.retries >= 3:
send_to_dlq(data, str(e))
raise
```
### 4. Using Kafka for Request-Reply
```python
# ❌ BAD: Kafka is not designed for RPC
def get_user_profile(user_id: str):
kafka_producer.send("user_requests", {"user_id": user_id})
# How to correlate response? Kafka is asynchronous!
# ✅ GOOD: Use NATS request-reply or HTTP/gRPC
response = await nats.request("user.profile", user_id.encode())
```
## Library Recommendations
### Context7 Research
**Confluent Kafka (Python)**
- Context7 ID: `/confluentinc/confluent-kafka-python`
- Trust Score: 68.8/100
- Code Snippets: 192+
- Production-ready Python Kafka client
**Temporal**
- Context7 ID: `/websites/temporal_io`
- Trust Score: 80.9/100
- Code Snippets: 3,769+
- Workflow orchestration for durable execution
### Installation
**Python:**
```bash
pip install confluent-kafka celery[redis] temporalio aio-pika redis
```
**TypeScript/Node.js:**
```bash
npm install kafkajs bullmq @temporalio/client amqplib ioredis
```
**Rust:**
```bash
cargo add rdkafka lapin async-nats redis
```
**Go:**
```bash
go get github.com/confluentinc/confluent-kafka-go
go get github.com/hibiken/asynq
go get go.temporal.io/sdk
```
## Utilities
Use scripts for setup automation:
- **Kafka setup**: Run `python scripts/kafka_producer_consumer.py` for test utilities
- **Schema validation**: Run `python scripts/validate_message_schema.py` to validate event schemas
## Related Skills
- **api-patterns**: API design for async job submission
- **realtime-sync**: WebSocket/SSE for job status updates
- **feedback**: Toast notifications for job completion
- **databases-***: Persistent storage for event logs
- **observability**: Tracing and metrics for queue operations