Skip to content

AmirJlr/RealTimeRecSys

Repository files navigation

Real-Time Recommender System

DeepWiki

Languages: English | فارسی

A complete end-to-end recommendation engine providing both real-time rankings and historical analytical insights.

🚀 Tech Stack

  • Streaming: Spark Structured Streaming 4.0, Kafka (Redpanda)
  • Storage: PostgreSQL (Hot Data), ClickHouse (OLAP/Analytics), Elasticsearch (Observability)
  • Orchestration: Airflow 3.0
  • Serving: FastAPI
  • Infrastructure: Docker & Docker Compose

🏗️ Project Structure

├── airflow/            # DAGs for aggregations, maintenance, and monitoring
├── api/                # FastAPI application (Real-time & Analytics routes)
├── docker/             # Dockerfiles and specialized compose files
├── kafka/              # Producer scripts and Kafka Connect configurations
├── postgres/           # Database initialization scripts
├── spark/              # Spark Streaming application and notebooks
├── config.py           # Centralized configuration management
└── .env                # Environment variables (Credentials & Hosts)

📚 Documentation

🔄 Workflow

Pipeline (English)

[User Events] → [Redpanda]
      │
      ├─► [Spark Streaming] ──► [PostgreSQL]  ───┐
      │     (Real-time Ranks)                    │
      │                                          │
      ├─► [Kafka Connect]   ──► [ClickHouse]  ───├────► [FastAPI] ──► [End User]
      │     (Raw Event Log)      (Analytics)     │
      │                                          │
      └─► [Kafka Connect] ──► [Elasticsearch] ───┘
            (Observability)      (Dashboard)

⚙️ Key Features

  • Dual-Path Serving: FastAPI serves real-time recommendations from PostgreSQL and batch analytics (top items/categories) from ClickHouse.
  • Automated Operations: Airflow manages data lifecycle, from daily aggregations to system health monitoring.

🛠️ Quick Start

  1. Configure Environment Create a .env file in the root directory based on project credentials.

  2. Start Infrastructure To ensure all environment variables are correctly loaded from the root .env file, use the --env-file flag:

    From Project Root:

    docker compose --env-file .env -f docker/compose-clickhouse.yml -f docker/compose-elastic.yml -f docker/compose-redpanda.yml -f docker/compose-spark.yml -f docker/compose-airflow.yml up -d

    From docker/ folder:

    cd docker
    docker compose --env-file ../.env -f compose-redpanda.yml -f compose-spark.yml -f compose-airflow.yml -f compose-clickhouse.yml -f compose-elastic.yml up -d
  3. Access Services

  4. Configure Airflow Connections To enable maintenance DAGs like recommender_maintenance.py, you must create a Postgres connection in the Airflow UI (Admin -> Connections):

    • Connection Id: postgres_rec
    • Connection Type: Postgres
    • Host: postgres (from POSTGRES_HOST)
    • Database: recommendation_db (from POSTGRES_DB)
    • Login: (from POSTGRES_USER)
    • Password: (from POSTGRES_PASSWORD)
    • Port: 5432 (from POSTGRES_PORT)

📊 API Endpoints

  • GET /recommendations: Real-time personalized top items (Postgres).
  • GET /analytics/top-items: Historical top performers (ClickHouse).
  • GET /analytics/top-categories: Popular categories over time (ClickHouse).

فارسی

یک موتور پیشنهاددهی یکپارچه که هم رتبه‌بندی‌های بلادرنگ و هم بینش‌های تحلیلی را ارائه می‌دهد.

🚀 پشته فناوری

  • جریان (Streaming): Spark Structured Streaming 4.0، Kafka (Redpanda)
  • ذخیره‌سازی: PostgreSQL (داده‌های داغ/Hot)، ClickHouse (OLAP/Analytics)، Elasticsearch (رصدپذیری/Observability)
  • ارکستریشن: Airflow 3.0
  • سروینگ: FastAPI
  • زیرساخت: Docker و Docker Compose

🏗️ ساختار پروژه

├── airflow/            # DAGها برای تجمیع‌ها، نگهداری و مانیتورینگ
├── api/                # برنامه FastAPI (مسیرهای بلادرنگ و تحلیلی)
├── docker/             # Dockerfileها و فایل‌های compose اختصاصی
├── kafka/              # اسکریپت‌های تولیدکننده و تنظیمات Kafka Connect
├── postgres/           # اسکریپت‌های مقداردهی اولیه دیتابیس
├── spark/              # برنامه Spark Streaming و نوت‌بوک‌ها
├── config.py           # مدیریت متمرکز تنظیمات
└── .env                # متغیرهای محیطی (اعتبارنامه‌ها و میزبان‌ها)

📚 مستندات

🔄 گردش‌کار

Pipeline (فارسی)

[رویدادهای کاربر] → [Redpanda]
      │
      ├─► [Spark Streaming] ──► [PostgreSQL]  ───┐
      │     (رتبه‌بندی بلادرنگ)                  │
      │                                          │
      ├─► [Kafka Connect]   ──► [ClickHouse]  ───├────► [FastAPI] ──► [کاربر نهایی]
      │     (لاگ خام رویدادها)   (تحلیل‌ها)      │
      │                                          │
      └─► [Kafka Connect] ──► [Elasticsearch] ───┘
            (رصدپذیری)          (داشبورد)

⚙️ ویژگی‌های کلیدی

  • سروینگ دو-مسیره: FastAPI پیشنهادهای بلادرنگ را از PostgreSQL و تحلیل‌های دسته‌ای (آیتم‌ها/دسته‌بندی‌های برتر) را از ClickHouse ارائه می‌دهد.
  • عملیات خودکار: Airflow چرخه عمر داده را از تجمیع‌های روزانه تا پایش سلامت سیستم مدیریت می‌کند.

🛠️ شروع سریع

  1. پیکربندی محیط یک فایل .env در ریشه پروژه بر اساس اعتبارنامه‌های پروژه ایجاد کنید.

  2. راه‌اندازی زیرساخت برای اینکه همه متغیرهای محیطی از فایل .env ریشه به‌درستی بارگذاری شوند، از پرچم --env-file استفاده کنید:

    از ریشه پروژه:

    docker compose --env-file .env -f docker/compose-clickhouse.yml -f docker/compose-elastic.yml -f docker/compose-redpanda.yml -f docker/compose-spark.yml -f docker/compose-airflow.yml up -d

    از پوشه docker/:

    cd docker
    docker compose --env-file ../.env -f compose-redpanda.yml -f compose-spark.yml -f compose-airflow.yml -f compose-clickhouse.yml -f compose-elastic.yml up -d
  3. دسترسی به سرویس‌ها

  4. پیکربندی اتصال‌های Airflow برای فعال‌سازی DAGهای نگهداری مثل recommender_maintenance.py، باید یک اتصال Postgres در رابط کاربری Airflow (Admin -> Connections) بسازید:

    • Connection Id: postgres_rec
    • Connection Type: Postgres
    • Host: postgres (از POSTGRES_HOST)
    • Database: recommendation_db (از POSTGRES_DB)
    • Login: (از POSTGRES_USER)
    • Password: (از POSTGRES_PASSWORD)
    • Port: 5432 (از POSTGRES_PORT)

📊 مسیرهای API

  • GET /recommendations: پیشنهادهای شخصی‌سازی‌شده بلادرنگ (Postgres).
  • GET /analytics/top-items: آیتم‌های برتر تاریخی (ClickHouse).
  • GET /analytics/top-categories: دسته‌بندی‌های محبوب در طول زمان (ClickHouse).

Releases

No releases published

Packages

 
 
 

Contributors