ویژگی تصویر

کتابخانه confluent-kafka در پایتون — مرجع جامع

  /  پایتون   /  کتابخانه confluent-kafka در پایتون
بنر تبلیغاتی الف

کتابخانه confluent-kafka برای پایتون، پیوندی سریع و کارآمد بین برنامه‌های پایتون و کلاینت Apache Kafka فراهم می‌کند. این کتابخانه که توسط Confluent توسعه یافته، بر پایه librdkafka (C/C++) نوشته شده و عملکرد بالا، پایداری و قابلیت‌های پیشرفته‌ای مانند ادغام با Schema Registry را ارائه می‌دهد.

چرا confluent-kafka؟

  • کارایی بالا و تاخیر کم به دلیل پیاده‌سازی در C/C++.
  • پشتیبانی کامل از امکانات Kafka: تولیدکننده، مصرف‌کننده، تراکنش‌ها، idempotence و غیره.
  • یکپارچگی با Confluent Schema Registry و انواع سریالایزرها.
  • مستندات و جامعه پشتیبانی قوی.

نصب

نصب ساده با pip انجام می‌شود. اگر نیاز به Avro / Schema Registry دارید، بستهٔ اضافی را هم نصب کنید:

pip install confluent-kafka
pip install confluent-kafka[avro]  # در صورت نیاز به Avro

در اینجا نسخهٔ پایه نصب می‌شود. بستهٔ avro شامل کتابخانه‌های مورد نیاز برای ارتباط با Schema Registry است.

نمونه ساده: تولیدکننده (Producer)

from confluent_kafka import Producer

conf = {'bootstrap.servers': 'localhost:9092', 'linger.ms': 5, 'compression.type': 'lz4'}
p = Producer(conf)

def delivery_report(err, msg):
    if err is not None:
        print(f"Delivery failed: {err}")
    else:
        print(f"Delivered message to {msg.topic()} [{msg.partition()}] @ {msg.offset()}")

for i in range(10):
    p.produce('my_topic', key=str(i), value=f'message {i}', callback=delivery_report)

p.flush()

در این کد یک Producer ساخته شده و با استفاده از تابع callback وضعیت تحویل بررسی می‌شود. گزینه‌های linger.ms و compression.type برای افزایش تراکم و زمان‌بندی ارسال بسته‌ها به‌منظور بهبود کارایی به کار رفته‌اند. در انتها با p.flush() تضمین می‌کنیم تمام پیام‌ها ارسال شوند.

نمونه ساده: مصرف‌کننده (Consumer)

from confluent_kafka import Consumer, KafkaError

conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my_group',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False
}
c = Consumer(conf)
c.subscribe(['my_topic'])

try:
    while True:
        msg = c.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(f"Error: {msg.error()}")
                break
        print(f"Received: {msg.key()} -> {msg.value()}")
        c.commit(asynchronous=False)
except KeyboardInterrupt:
    pass
finally:
    c.close()

در این Verbrauch/Consumer نمونه، offsetها به‌صورت دستی commit می‌شوند (synchronous) تا کنترل دقیق‌تری روی پردازش پیام‌ها داشته باشیم. گزینه auto.offset.reset کنترل می‌کند که در صورت عدم وجود offset شناخته‌شده چه اتفاقی بیافتد.

یکپارچگی با Schema Registry و Avro

برای داده‌های ساخت‌یافته استفاده از Schema Registry و Avro (یا Protobuf/JSON Schema) مزایای زیادی دارد: تطابق schema، تکامل schema و ذخیره‌ی مرکزی تعاریف. نمونهٔ Avro به‌صورت زیر است:

from confluent_kafka.avro import AvroProducer
value_schema_str = """
{
   "namespace": "example",
   "name": "User",
   "type": "record",
   "fields" : [
     {"name": "name", "type": "string"},
     {"name": "age", "type": "int"}
   ]
}
"""
conf = {'bootstrap.servers': 'localhost:9092', 'schema.registry.url': 'http://localhost:8081'}
avro_prod = AvroProducer(conf, default_value_schema=value_schema_str)
avro_prod.produce(topic='users', value={'name': 'Ali', 'age': 30})
avro_prod.flush()

این مثال نشان می‌دهد چگونه می‌توان یک schema تعریف کرد و با AvroProducer پیام‌هایی را ثبت کرد که با Schema Registry مطابقت دارند.

پیکربندی‌های حیاتی و نکات بهینه‌سازی

برخی از تنظیمات مهم که در تولید و مصرف برای رسیدن به عملکرد مطلوب باید توجه شوند:

کلیدشرح
bootstrap.serversآدرس بروکرها
acksمعین‌کننده تضمین تحویل (0,1,all)
compression.typelz4، gzip یا snappy برای کاهش پهنای‌باند
linger.msبافر کردن پیام‌ها قبل از ارسال برای batching
enable.idempotenceفعال کردن idempotent producer برای جلوگیری از Duplicate
transactional.idبرای تراکنش‌ها و exactly-once

خطاها، rebalance و مدیریت موقعیت‌ها

در مصرف‌کننده‌ها، rebalanceها ممکن است موجب انتقال پارتیشن‌ها بین اعضای گروه شوند. باید از callbacks مربوط به rebalance در مواقع حساس استفاده کنید تا commit یا cleanup مناسب انجام گیرد. مدیریت درست offset و retry منطقی از اهمیت بالایی برخوردار است.

امنیت و احراز هویت (SASL/SSL)

confluent-kafka از SASL/PLAIN، SASL/SCRAM، و SSL پشتیبانی می‌کند. نمونهٔ پیکربندی SASL:

conf = {
  'bootstrap.servers': 'broker:9093',
  'security.protocol': 'SASL_SSL',
  'sasl.mechanisms': 'PLAIN',
  'sasl.username': 'user',
  'sasl.password': 'pass'
}

این پیکربندی به‌راحتی در Producer یا Consumer قرار می‌گیرد و ارتباط امن را برقرار می‌کند.

نکات حرفه‌ای و توصیه‌ها

  • برای تاخیر کمتر در تولید واقعی، مقدار linger.ms را بسیار کوچک و batch.size و compression را بهینه کنید.
  • در پردازش‌های بحرانی از commit دستی استفاده کنید تا از از دست رفتن یا پردازش دوباره جلوگیری شود.
  • در حجم بالا از monitoring استفاده کنید: نرخ تولید/مصرف، lag، latency و خطاها را ثبت کنید.
  • برای تراکنش‌های بین چند topic از transactional.id و API تراکنش‌ها استفاده کنید تا exactly-once semantics بدست آید.

مانیتورینگ و ابزارها

برای پایش سلامت باید متریک‌های librdkafka را بخوانید یا از ابزارهایی مثل Confluent Control Center، Prometheus و Grafana بهره ببرید. متریک‌هایی مانند broker latency، request_rate، consumer_lag و retry_count مهم‌اند.

جمع‌بندی

confluent-kafka در پایتون یک انتخاب قدرتمند برای ساخت سیستم‌های مبتنی بر رویداد است. این کتابخانه برای پروژه‌هایی که نیاز به عملکرد بالا، قابلیت اطمینان و یکپارچگی با اکوسیستم Confluent دارند مناسب است. با درک درست پیکربندی‌ها، مدیریت offsetها و استفاده از Schema Registry می‌توانید راهکارهای مقیاس‌پذیر و قابل اعتمادی پیاده‌سازی کنید.

آیا این مطلب برای شما مفید بود ؟

خیر
بله
موضوعات شما در انجمن: