ویژگی تصویر

معرفی کتابخانه Celery در پایتون

  /  پایتون   /  کتابخانه celery در پایتون
بنر تبلیغاتی الف

Celery یک کتابخانهٔ توزیع‌شده برای اجرای وظایف (tasks) به صورت ناهمگام (asynchronous) و برنامه‌ریزی‌شده (scheduled) در اکوسیستم پایتون است. این ابزار به شما امکان می‌دهد کارهای سنگین، زمان‌بر یا تکراری را خارج از فرایند اصلی اپلیکیشن اجرا کنید تا پاسخ‌دهی سریع‌تر، مقیاس‌پذیری بهتر و مدیریت خطاهای پیشرفته فراهم شود.

کاربردها و مزایا

  • اجرای کارهای زمان‌بر مانند ارسال ایمیل، پردازش تصویر یا محاسبات سنگین.
  • اجرای دوره‌ای (cron-like) با استفاده از Beat یا schedulerهای داخلی.
  • پشتیبانی از چینش (chains), گروه‌ها (groups), و chordها برای گردش کارهای پیچیده.
  • قابلیت مقیاس‌گذاری افقی با اضافه کردن workerهای بیشتر.

نقشهٔ کلی معماری

معمولاً Celery شامل سه جز اصلی است: پروسهٔ اپلیکیشن (producer) که پیام ایجاد می‌کند، بروکر (مثل Redis یا RabbitMQ) که پیام‌ها را حمل می‌کند و workerها که پیام‌ها را دریافت و اجرا می‌کنند. نتیجهٔ کار می‌تواند در backend ذخیره شود (مثلاً Redis, database, یا RPC).

راه‌اندازی اولیه (مثال پایه)

from celery import Celery

app = Celery(
    'myapp',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1'
)

app.conf.update(
    task_serializer='json',
    result_serializer='json',
    accept_content=['json'],
    timezone='UTC',
    enable_utc=True,
)

در این کد، یک شیء Celery ساخته می‌شود که از Redis به‌عنوان broker و backend استفاده می‌کند. پیکربندی‌هایی مثل serializer و timezone نیز تنظیم شده‌اند تا سازگاری و امنیت داده‌ها افزایش یابد.

تعریف و فراخوانی یک Task ساده

from .celery import app

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def send_email(self, to_address, subject, body):
    try:
        # فرض کنید تابع send_actual_email پیاده‌سازی شده است
        send_actual_email(to_address, subject, body)
    except Exception as exc:
        raise self.retry(exc=exc)

در این مثال، یک تسک ارسال ایمیل تعریف شده است. گزینه bind=True دسترسی به نمونهٔ task را فراهم می‌کند تا از متد retry استفاده کنیم. max_retries و default_retry_delay مدیریت تلاش‌های مجدد را بر عهده دارند. اگر خطایی رخ دهد، متد retry باعث صف‌بندی مجدد تسک می‌شود.

اجرای worker و استفاده از CLI

# اجرای worker با نام myworker و دو پردازش همزمان
celery -A myapp worker --loglevel=info --concurrency=2

# اجرای scheduler (beat) برای تسک‌های زمان‌بندی شده
celery -A myapp beat --loglevel=info

برای اجرای worker از دستور celery -A worker استفاده می‌شود. پارامتر concurrency تعداد پردازش‌های همزمان (process pool) را تعیین می‌کند. برای وظایف زمان‌بندی‌شده باید Beat را نیز اجرا کنید که برنامه‌ها را طبق schedule اجرا می‌کند.

نمونه‌ای از Periodic Task (Beat)

from celery.schedules import crontab

app.conf.beat_schedule = {
    'cleanup-temp-files-every-night': {
        'task': 'myapp.tasks.cleanup_temp_files',
        'schedule': crontab(hour=3, minute=0),
    },
}

این پیکربندی باعث می‌شود تسک cleanup_temp_files هر شب در ساعت 03:00 اجرا شود. Beat مسئول قرار دادن پیام‌ها در صف در زمان‌های مشخص است.

قابلیت‌های پیشرفته: Canvas، Chains و Chords

Celery Canvas ابزاری برای ترکیب تسک‌ها است: chain برای اجرای ترتیبی، group برای اجرای موازی و chord برای اجرای callback پس از اتمام گروه. این قابلیت‌ها برای ساخت گردش‌کارهای پیچیده بسیار مفیدند.

بهینه‌سازی و نکات عملی برای تولید (Production)

  • استفاده از بروکر مناسب: RabbitMQ برای موارد با پیام‌های پیچیده و تضمین delivery بهتر است؛ Redis ساده‌تر و سریع‌تر است.
  • تنظیمات acks_late و visibility_timeout برای جلوگیری از از دست رفتن پیام‌ها.
  • پیکربندی worker_max_tasks_per_child برای جلوگیری از نشت حافظه طولانی‌مدت.
  • استفاده از monitor مثل Flower برای مشاهدهٔ وضعیت تسک‌ها و workerها.
  • انتخاب pool مناسب: prefork برای CPU-bound، gevent/asyncio برای I/O-bound با توجه به نیاز.

مقایسهٔ کوتاه بروکرها

Brokerمزایامعایب
Redisساده، سریع، تنظیم آسانمحدودیت در بعضی Guaranteeها، مناسب حجم متوسط
RabbitMQقابلیت‌های قوی در routing و تضمین deliveryپیچیده‌تر و نیازمند نگهداری بیشتر

نمونهٔ تسک با ویژگی‌های امنیتی و مدیریت زمان

@app.task(bind=True, acks_late=True, time_limit=300, soft_time_limit=240)
def process_video(self, video_id):
    try:
        # پردازش و تبدیل ویدئو
        do_transcode(video_id)
    except Exception as exc:
        # لاگ و تلاش مجدد با backoff
        raise self.retry(exc=exc, countdown=60 * 2)

در این کد، acks_late=True باعث می‌شود acknowledgement فقط پس از پایان موفق تسک ارسال شود تا در صورت کرش worker، تسک مجدداً اجرا شود. time_limit و soft_time_limit برای جلوگیری از گیر کردن تسک‌ها کاربرد دارند.

تست، مانیتورینگ و خطاگیری

برای تست تسک‌ها از اجرای محلی با workerهای با concurrency کم و broker تستی استفاده کنید. برای مانیتورینگ از Flower یا ابزارهای مبتنی بر Prometheus استفاده کنید تا متریک‌ها و اخطارها را ببینید. لاگ‌گذاری و جمع‌آوری استثناها به تشخیص الگوهای شکست کمک می‌کند.

نکات کارشناسی

  • تسک‌ها را ایتمپوتنت (idempotent) پیاده‌سازی کنید تا اجرای مجدد باعث رفتار غیرمنتظره نشود.
  • از serializer ایمن مانند JSON استفاده کنید تا حملات احتمالی از بین برود.
  • برای حجم بالا، نرخ مصرف پیام را با prefetch_multiplier و rate_limit کنترل کنید.
  • در صورت استفاده با Django، از پیکربندی app.autodiscover_tasks استفاده کنید تا تسک‌ها خودکار بارگذاری شوند.

جمع‌بندی

Celery یک ابزار بالغ و قدرتمند برای پردازش ناهمگام در پایتون است که با پیکربندی صحیح می‌تواند عملکرد و قابلیت اطمینان اپلیکیشن شما را به‌طور چشمگیری افزایش دهد. انتخاب بروکر مناسب، رعایت نکات ایمنی، و مانیتورینگ دقیق از کلیدهای موفقیت در تولید هستند.

آیا این مطلب برای شما مفید بود ؟

خیر
بله
موضوعات شما در انجمن: