Event Processors
Overview
FundlyHub uses an event-driven architecture with specialized processors running in the backend infrastructure. Each processor subscribes to specific event channels via Redis Streams or the Events Database and performs asynchronous tasks.
Processor Types
1. Write Processors
Handle eventual consistency writes that don't need to happen in the main transaction.
- SubscriptionWriteProcessor: Updates subscription tables when users follow/unfollow entities.
- ActivityFeedProcessor: Generates timeline activities (e.g., "John followed Charity X") for social feeds.
2. Projection Processors
Update CQRS read models in the Analytics Database.
- CampaignStatsProcessor: Aggregates donation totals, counts, and averages.
- SearchIndexProcessor: Updates Elasticsearch/Postgres search vectors for full-text search.
3. Notification Processors
Handle external communications (Email, SMS, Push).
- EmailNotificationProcessor: Sends transactional emails (receipts, welcome emails).
- PushNotificationProcessor: Sends mobile push notifications.
Event Processing Flow
- Action: User performs an action (e.g., POST /campaigns).
- Publication: The API service commits the transaction and triggers an event publication to the Events DB.
- Streaming: The event is streamed to Redis/Queue for consumers.
- Consumption: Worker services (Processors) pick up the event.
- Execution: Processors execute their logic (e.g., update stats, send email).
- Completion: Processor acknowledges the message.
Idempotency & Reliability
Processors are designed to be idempotent and fault-tolerant.
Key Mechanisms:
- Idempotency Keys: Processors track processed Event IDs to prevent duplicate execution.
- Dead Letter Queues (DLQ): Events that fail processing after N retries are moved to a DLQ for manual inspection.
- Circuit Breakers: External service calls (e.g., Email API) are wrapped in circuit breakers to fail fast during outages.
Example Processor Code
Node.js processor implementation:
typescript
import { Processor } from './base-processor';
export class CampaignStatsProcessor extends Processor {
eventType = EventType.DonationCompleted;
async handle(event) {
const { campaignId, amount } = event.payload;
// Atomic update of stats projection
await db.query(`
UPDATE campaign_stats_projection
SET
total_raised = total_raised + $1,
donation_count = donation_count + 1,
last_donation_at = NOW()
WHERE campaign_id = $2
`, [amount, campaignId]);
console.log(`Updated stats for campaign ${campaignId}`);
}
}