ETL/ELT пайплайны
Batch vs streaming, Kafka basics, Change Data Capture, ELT-подход с dbt.
ETL/ELT — как данные попадают из продакшна в DWH
Загрузка интерактивного виджета...
🗺️ Где это в общей картине
ETL — Extract, Transform, Load. Одним предложением: забрать данные из источников, привести в порядок и положить в хранилище. ELT — то же самое, но сначала грузим сырые данные в DWH, а потом трансформируем уже внутри — ClickHouse или BigQuery считают быстрее, чем отдельный сервер. ELT + dbt — стандарт в 2024-2025.
Batch vs Streaming — когда что
Batch — данные обрабатываются порциями (раз в час, раз в день). Streaming — данные обрабатываются по мере поступления, в реальном времени.
- Batch — раз в час/день. Spark, dbt, Airflow. Простой, надёжный, закрывает 90% задач: отчёты, витрины, фичи
- Streaming — в реальном времени. Kafka, Flink, Spark Streaming. Для fraud detection, рекомендаций, real-time алертов
- Micro-batch — каждые 1-5 минут. Spark Structured Streaming. Компромисс: почти real-time, но проще streaming
Apache Kafka и CDC — real-time доставка данных
Kafka — распределённый лог событий. Одна система записывает события в «топик» (producer), другая — читает (consumer). Данные хранятся на диске — можно перечитать заново (replay). Ключевые концепции: Topic (именованный поток, разбит на партиции), Consumer Group (каждая партиция читается одним из группы), Offset (позиция чтения). CDC (Change Data Capture) через Debezium читает WAL базы данных и отправляет каждое изменение в Kafka за секунды — вместо полной выгрузки раз в день.
from kafka import KafkaProducer, KafkaConsumer
import json
# Producer — отправляет события в топик
producer = KafkaProducer(bootstrap_servers=['kafka:9092'],
value_serializer=lambda v: json.dumps(v).encode())
producer.send('user-events', {'user_id': 'u-123', 'event': 'purchase'})
# Consumer — читает и обрабатывает
consumer = KafkaConsumer('user-events', group_id='feature-calc',
bootstrap_servers=['kafka:9092'],
value_deserializer=lambda m: json.loads(m.decode()))
for msg in consumer:
update_user_features(msg.value['user_id'], msg.value)💡 Зачем CDC для ML
dbt — SQL-трансформации как код
dbt (data build tool) — стандарт для ELT-трансформаций. Ты пишешь SQL-модели (обычные SELECT), а dbt превращает их в таблицы/views в DWH. Плюс тесты данных (unique, not_null, relationships), автодокументация и DAG зависимостей — всё версионируется в Git. Основные команды: dbt run (выполнить модели), dbt test (проверить данные), dbt docs generate (документация + граф).
-- models/marts/mart_user_features.sql
-- Витрина фичей для ML — обычный SELECT, dbt превращает его в таблицу
{{ config(materialized='table', tags=['ml-features']) }}
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
)
SELECT
user_id,
COUNT(*) AS total_orders,
SUM(amount) AS total_revenue,
AVG(amount) AS avg_order_value,
CURRENT_DATE - MAX(created_at)::date AS days_since_last_order
FROM orders
GROUP BY user_idТесты на данные описываются в YAML рядом с моделью: unique и not_null для ключей, accepted_range для числовых полей, relationships для ссылочной целостности. dbt test запускается после dbt run — если тест упал, пайплайн останавливается и битые данные не попадут в витрину.
💡 Как это в реальной работе
🎯 На собесе