데이터 엔지니어링

GCP 환경에서의 Airflow Celery Executor 구축 및 운영 파이프라인

GCP 환경에서의 Airflow Celery Executor 구축 및 운영 파이프라인

본 포스팅에서는 Google Cloud Platform(GCP) 환경에서 Airflow Celery Executor 아키텍처를 구축한 과정을 공유합니다. Terraform을 이용한 인프라 프로비저닝부터 Airflow 설정, Systemd를 통한 프로세스 관리, 그리고 Cloud Build를 활용한 배포 자동화까지 전체적인 엔지니어링 파이프라인을 다룹니다.

 

airflow-architecture
출처: https://medium.com/sicara/using-airflow-with-celery-workers-54cb5212d405

1. 아키텍처 개요: 왜 Celery Executor인가?

Airflow는 기본적으로 순차 실행을 담당하는 Sequential Executor를 제공하지만, 프로덕션 환경에서는 병렬 처리와 확장성이 필수적입니다. Celery Executor는 메시지 브로커를 통해 작업을 분산 처리함으로써 다음과 같은 이점을 제공합니다.

  • 수평 확장(Horizontal Scaling): 워커 노드를 추가함으로써 시스템의 전체 처리량을 유연하게 증대시킬 수 있습니다.

  • 고가용성(High Availability): 여러 워커가 분산되어 있어 특정 노드의 장애가 전체 시스템의 중단으로 이어지지 않습니다.

  • 비동기 처리: 스케줄러와 워커가 메시지 큐를 통해 느슨하게 결합(Loosely Coupled)되어 효율적인 리소스 활용이 가능합니다.

데이터 흐름 (Data Flow)

시스템의 전반적인 데이터 흐름은 다음과 같습니다.

  1. Scheduler: DAG를 파싱하고 실행 조건이 충족된 Task를 식별하여 메시지 브로커(Redis)의 큐(Queue)로 전송합니다.

  2. Message Broker: 스케줄러와 워커 간의 통신을 중개합니다. Task 메시지를 큐에 저장하고 유휴 상태의 워커에게 전달합니다.

  3. Celery Worker: 브로커로부터 할당받은 Task를 실제 수행합니다.

  4. Result Backend: 워커가 수행한 Task의 실행 결과(성공, 실패, 반환값 등)를 저장합니다.

  5. Metadata DB: Airflow의 모든 상태 정보를 최종적으로 저장하며, 웹 서버와 스케줄러는 이를 참조하여 UI 및 다음 스케줄링에 반영합니다.

2. 인프라 구성 (Infrastructure as Code)

인프라 관리는 Terraform을 사용하여 코드(IaC)로 구현했습니다. 안정적인 운영을 위해 상태 저장소(DB)와 로깅 스토리지는 컴퓨팅 인스턴스와 분리했습니다.

주요 구성 요소

  • Compute Engine (VM): Airflow 웹서버, 스케줄러, 워커가 구동되는 런타임 환경 (Debian 12, e2-highmem-2).

  • Cloud SQL (PostgreSQL): Airflow 메타데이터 저장소. 관리형 서비스를 사용하여 백업 및 복구 용이성 확보.

  • Cloud Storage (GCS): Airflow 로그 저장소. VM 디스크 용량 절약 및 로그 영구 보존 목적.

  • Redis: VM 내부에 설치하여 메시지 브로커로 사용 (비용 효율성을 고려하여 VM 내부 구성, 필요시 Memorystore로 전환 가능).

  • Load Balancer: HTTPS 트래픽 처리 및 SSL 인증서 관리.

GCP 비용 관리와 관련된 내용은 본 글을 참고해주세요.

Terraform 구성 (main.tf 발췌)

# 주요 리소스 구성 예시

# 1. Compute Engine
resource "google_compute_instance" "airflow_vm" {
  name         = var.vm_name
  machine_type = var.vm_machine_type
  zone         = "${var.region}-a"
  # ... (생략)
}

# 2. Cloud SQL (PostgreSQL)
resource "google_sql_database_instance" "airflow_db" {
  name             = var.db_instance_name
  database_version = "POSTGRES_17"
  settings {
    tier = "db-custom-1-3840"
    backup_configuration {
      enabled = true
      point_in_time_recovery_enabled = true
    }
  }
}

# 3. GCS Bucket (Logging)
resource "google_storage_bucket" "airflow_logs" {
  name     = var.logs_bucket_name
  location = var.region
  lifecycle_rule {
    condition { age = 30 }
    action { type = "Delete" }
  }
}

# 4. Load Balancer & SSL
# Forwarding Rule, Target Proxy, URL Map 등을 통해 HTTPS 트래픽을 VM으로 라우팅

3. 환경 구성 및 미들웨어 설정

Python 및 Airflow 설치

  • Python: 3.11.13 (pyenv 활용)

  • Airflow: 2.10.5

    • 필수 Provider(google, celery, redis, postgres 등)를 포함하여 설치를 진행합니다.

Redis 설정 (Message Broker)

VM 내부에 Redis(7.0.15)를 설치하고 보안을 위해 비밀번호를 설정합니다. Airflow는 Redis를 브로커, 결과 백엔드, 그리고 웹서버의 Rate Limiting 저장소로 활용합니다.

# /etc/redis/redis.conf 수정
requirepass {secure_password}
bind 127.0.0.1

4. Airflow 주요 설정

Celery Executor의 성능 최적화와 안정적인 운영을 위해 airflow.cfg를 조정했습니다.

Core & Database

[core]
executor = CeleryExecutor

[database]
# Cloud SQL 연결 정보 설정
sql_alchemy_conn = postgresql+psycopg2://{user}:{pw}@{ip}/{db}

Logging (Remote Logging)

로컬 디스크 의존성을 제거하기 위해 GCS를 원격 로그 저장소로 지정했습니다.

[logging]
remote_logging = True
remote_log_conn_id = {gcs_connection_id}
remote_log_folder = gs://{bucket_name}/logs
delete_local_logs = True # 전송 후 로컬 로그 삭제

Celery & Performance Tuning

[celery]
broker_url = redis://{pw}@localhost:6379/0
result_backend = redis://{pw}@localhost:6379/2

# Worker Concurrency: 단일 워커가 동시에 처리할 수 있는 Task 수
worker_concurrency = 4

# Prefetch Multiplier: 워커가 미리 가져올 Task 수 (Throughput 최적화)
worker_prefetch_multiplier = 1

# Task Acks Late: Task 완료 후 Ack 전송 (데이터 무결성 보장)
task_acks_late = True

# 모니터링 강화 설정
task_track_started = True  # Running 상태 추적
task_send_sent_event = True
  • task_acks_late = True: 워커가 작업을 완료한 후에만 메시지 큐에 처리 완료(Ack)를 보냅니다. 작업 도중 워커가 중단되더라도 다른 워커가 해당 작업을 다시 가져갈 수 있어 데이터 유실을 방지합니다.

  • worker_concurrency: I/O Bound 작업이 많은 특성을 고려하여 CPU 코어 수 대비 여유 있게 설정했습니다.

5. 프로세스 관리 (Systemd)

안정적인 데몬 관리를 위해 Systemd Service를 등록했습니다. 특히 단일 VM에서 여러 워커 프로세스를 띄우거나 고유한 호스트네임을 부여하기 위해 airflow-worker@.service 템플릿을 활용했습니다.

Celery Worker Service 템플릿

# /etc/systemd/system/airflow-worker@.service
[Unit]
Description=Airflow Celery Worker %i
After=network.target postgresql.service redis-server.service

[Service]
User={username}
EnvironmentFile={$AIRFLOW_HOME}/environment
# %i를 활용해 고유 hostname 부여 (예: worker1@hostname)
ExecStart=/path/to/celery -A airflow.providers.celery.executors.celery_executor.app worker \
    --loglevel INFO -E --hostname 'worker%i@%%H' --queues default
Restart=always

[Install]
WantedBy=multi-user.target

이 외에도 webserver, scheduler, flower 모니터링 도구 또한 개별 서비스로 등록하여 관리합니다.

6. 배포 자동화 (CI/CD)

GitHub의 Main 브랜치에 변경 사항이 푸시되면 GCP Cloud Build가 트리거되어 배포를 수행합니다.

배포 전략

  • Worker Pool & Private IP: 보안을 위해 외부 IP가 아닌 내부 IP(Private IP)로 VM에 접근합니다. 이를 위해 Cloud Build의 Private Worker Pool을 사용했습니다.

  • Secret Manager: SSH 키와 같은 민감 정보는 Secret Manager에서 런타임에 안전하게 로드합니다.

Cloud Build 구성 (yaml)

steps:
  - name: 'gcr.io/cloud-builders/gcloud'
    entrypoint: 'bash'
    args:
      - '-c'
      - |
        # 1. Secret Manager에서 SSH Key 로드
        gcloud secrets versions access latest --secret="{ssh_key_secret}" > /root/.ssh/id_rsa
        chmod 600 /root/.ssh/id_rsa

        # 2. VM 내부 IP 조회
        VM_INTERNAL_IP=$$(gcloud compute instances describe ${_VM_NAME} --format="value(networkInterfaces[0].networkIP)")

        # 3. SSH 접속 및 배포 스크립트 실행
        ssh -i /root/.ssh/id_rsa ${_VM_USER}@$$VM_INTERNAL_IP '
          cd ${_AIRFLOW_HOME}
          git pull origin main
          
          # 서비스 재시작 (Graceful Restart 고려)
          sudo systemctl restart airflow-webserver airflow-scheduler airflow-worker
        '
options:
  pool:
    name: 'projects/{project}/locations/{region}/workerPools/{pool_name}'

마무리

본 포스팅에서는 단일 VM과 관리형 DB 서비스를 활용하여 Celery Executor 기반의 Airflow 환경을 구축하는 방법을 살펴보았습니다. 이 아키텍처는 초기 구축 비용을 효율적으로 관리하면서도, 향후 워크로드 증가 시 워커 노드를 수평적으로 확장하여 유연하게 대응할 수 있는 기반을 제공합니다.

참고: 본 문서는 PostgreSQL 17, Airflow 2.10.5, Redis 7.0 버전을 기준으로 작성되었습니다.

Share this post

About the author

답글 남기기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다