ML Infrastructure
~35 мин

Оркестрация

Airflow, Dagster, Prefect — DAG, scheduling, мониторинг пайплайнов.

Оркестрация — чтобы пайплайн работал каждый день без тебя

Загрузка интерактивного виджета...

🗺️ Где это в общей картине

Оркестратор (Airflow, Dagster) — это диспетчер всего ML-стека. Он не обрабатывает данные сам — он командует: «в 6:00 запусти ETL-пайплайн → когда данные готовы, запусти PySpark для фичей → когда фичи посчитаны, обучи модель → когда модель готова, задеплой». Без оркестратора ты запускаешь каждый шаг руками.

Airflow — это cron на стероидах. Он НЕ обрабатывает данные — он планирует и запускает задачи. У cron нет зависимостей между задачами, ретраев, алертов, UI для мониторинга. У Airflow — есть. Ты описываешь DAG (граф зависимостей) Python-кодом, задаёшь расписание — и Airflow каждое утро выполняет цепочку шагов, перезапуская упавшие и отправляя алерты в Slack.

Конкретный пример: каждое утро в 6:00 Airflow запускает DAG: скачать данные из прод-базы → обработать PySpark-ом → посчитать фичи → обучить модель → проверить метрики → задеплоить → уведомить в Slack. Если шаг 3 упал — Airflow перезапустит только его, а не весь пайплайн.

Apache Airflow — стандарт индустрии

Airflow создан в Airbnb, сейчас Apache-проект. Используется повсеместно — от стартапов до крупных компаний. Если ты будешь работать с пайплайнами — Airflow встретится почти наверняка.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "ml-team",
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "email_on_failure": True,
}

with DAG(
    dag_id="ml_training_pipeline",
    default_args=default_args,
    schedule="0 6 * * *",          # каждый день в 6:00
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["ml", "training"],
) as dag:

    extract = BashOperator(
        task_id="extract_data",
        bash_command="python scripts/extract.py --date {{ ds }}",
    )

    train = PythonOperator(
        task_id="train_model",
        python_callable=train_and_log_model,
    )

    deploy = BashOperator(
        task_id="deploy_model",
        bash_command="python scripts/deploy.py",
    )

    # Зависимости — кто после кого
    extract >> train >> deploy

Ключевые понятия Airflow

  • DAG — граф задач без циклов. Определяет ЧТО выполнять и В КАКОМ ПОРЯДКЕ
  • Task — единица работы: PythonOperator, BashOperator, SparkSubmitOperator
  • Schedule — cron-выражение: "0 6 * * *" = каждый день в 6:00
  • Sensor — ожидание события: файл появился в S3, данные готовы. Пайплайн не стартует раньше времени
  • XCom — передача маленьких данных между задачами (пути, ID, метрики)
  • Backfill — запустить пайплайн за прошлые даты при изменении логики

Dagster — современная альтернатива

Dagster — «Airflow нового поколения». Главное отличие: вместо задач (tasks) ты думаешь в терминах данных (assets). Не «задача A → задача B», а «dataset A → трансформация → dataset B». Dagster сам строит DAG из зависимостей между assets. Плюсы: встроенное тестирование, type checking, лучший developer experience. Если начинаешь с нуля — стоит рассмотреть.

import dagster as dg
import pandas as pd

@dg.asset(description="Raw events from production DB")
def raw_events() -> pd.DataFrame:
    return pd.read_sql("SELECT * FROM events WHERE date = CURRENT_DATE", conn)

@dg.asset(description="User-level features for ML")
def user_features(raw_events: pd.DataFrame) -> pd.DataFrame:
    # Dagster видит зависимость: user_features нужен raw_events
    return raw_events.groupby("user_id").agg(
        total_events=("event_id", "count"),
        total_revenue=("amount", "sum"),
    ).reset_index()

# Dagster автоматически строит DAG: raw_events → user_features

Prefect — Python-first оркестрация

Prefect — самый простой API. Берёшь обычную Python-функцию, добавляешь декоратор @flow — и она становится пайплайном с ретраями и мониторингом. Хорош для небольших команд, которым не нужен тяжёлый Airflow.

Что выбрать

  • Airflow — стандарт. Огромная экосистема, 100+ операторов, все знают. Сложнее в настройке, но проверен годами
  • Dagster — для ML-команд. Asset-oriented, встроенное тестирование. Лучший DX, но экосистема меньше
  • Prefect — самый простой API. Для небольших команд и быстрого старта
  • Kubeflow Pipelines — если ML-платформа на Kubernetes и нужен inference на GPU

💡 Как это в реальной работе

В большинстве компаний уже стоит Airflow — ты подключаешься к нему и пишешь свои DAG-и. Типичный ML DAG: wait_for_data (sensor) → extract → validate → compute_features (PySpark) → train → evaluate → deploy → notify_slack. Всё видно в Web UI: какие таски прошли, какие упали, логи каждого шага.

🎯 На собесе

Как устроен ваш ML-пайплайн? Оркестратор (Airflow/Dagster), DAG из шагов: extract → validate → transform → train → evaluate → deploy. Расписание ежедневно. Retries + таймауты для устойчивости. Sensors для ожидания данных. Алерты в Slack при падении.