Skip to content

Event Processors

Overview

FundlyHub uses an event-driven architecture with specialized processors running in the backend infrastructure. Each processor subscribes to specific event channels via Redis Streams or the Events Database and performs asynchronous tasks.

Processor Types

1. Write Processors

Handle eventual consistency writes that don't need to happen in the main transaction.

  • SubscriptionWriteProcessor: Updates subscription tables when users follow/unfollow entities.
  • ActivityFeedProcessor: Generates timeline activities (e.g., "John followed Charity X") for social feeds.

2. Projection Processors

Update CQRS read models in the Analytics Database.

  • CampaignStatsProcessor: Aggregates donation totals, counts, and averages.
  • SearchIndexProcessor: Updates Elasticsearch/Postgres search vectors for full-text search.

3. Notification Processors

Handle external communications (Email, SMS, Push).

  • EmailNotificationProcessor: Sends transactional emails (receipts, welcome emails).
  • PushNotificationProcessor: Sends mobile push notifications.

Event Processing Flow

  • Action: User performs an action (e.g., POST /campaigns).
  • Publication: The API service commits the transaction and triggers an event publication to the Events DB.
  • Streaming: The event is streamed to Redis/Queue for consumers.
  • Consumption: Worker services (Processors) pick up the event.
  • Execution: Processors execute their logic (e.g., update stats, send email).
  • Completion: Processor acknowledges the message.

Idempotency & Reliability

Processors are designed to be idempotent and fault-tolerant.

Key Mechanisms:

  • Idempotency Keys: Processors track processed Event IDs to prevent duplicate execution.
  • Dead Letter Queues (DLQ): Events that fail processing after N retries are moved to a DLQ for manual inspection.
  • Circuit Breakers: External service calls (e.g., Email API) are wrapped in circuit breakers to fail fast during outages.

Example Processor Code

Node.js processor implementation:

typescript
import { Processor } from './base-processor';


export class CampaignStatsProcessor extends Processor {
  eventType = EventType.DonationCompleted;

  async handle(event) {
    const { campaignId, amount } = event.payload;

    // Atomic update of stats projection
    await db.query(`
      UPDATE campaign_stats_projection
      SET 
        total_raised = total_raised + $1,
        donation_count = donation_count + 1,
        last_donation_at = NOW()
      WHERE campaign_id = $2
    `, [amount, campaignId]);
    
    console.log(`Updated stats for campaign ${campaignId}`);
  }
}

Built with VitePress