کتابخانه confluent-kafka در پایتون
کتابخانه confluent-kafka برای پایتون، پیوندی سریع و کارآمد بین برنامههای پایتون و کلاینت Apache Kafka فراهم میکند. این کتابخانه که توسط Confluent توسعه یافته، بر پایه librdkafka (C/C++) نوشته شده و عملکرد بالا، پایداری و قابلیتهای پیشرفتهای مانند ادغام با Schema Registry را ارائه میدهد.
چرا confluent-kafka؟
- کارایی بالا و تاخیر کم به دلیل پیادهسازی در C/C++.
- پشتیبانی کامل از امکانات Kafka: تولیدکننده، مصرفکننده، تراکنشها، idempotence و غیره.
- یکپارچگی با Confluent Schema Registry و انواع سریالایزرها.
- مستندات و جامعه پشتیبانی قوی.
نصب
نصب ساده با pip انجام میشود. اگر نیاز به Avro / Schema Registry دارید، بستهٔ اضافی را هم نصب کنید:
pip install confluent-kafka
pip install confluent-kafka[avro] # در صورت نیاز به Avro
در اینجا نسخهٔ پایه نصب میشود. بستهٔ avro شامل کتابخانههای مورد نیاز برای ارتباط با Schema Registry است.
نمونه ساده: تولیدکننده (Producer)
from confluent_kafka import Producer
conf = {'bootstrap.servers': 'localhost:9092', 'linger.ms': 5, 'compression.type': 'lz4'}
p = Producer(conf)
def delivery_report(err, msg):
if err is not None:
print(f"Delivery failed: {err}")
else:
print(f"Delivered message to {msg.topic()} [{msg.partition()}] @ {msg.offset()}")
for i in range(10):
p.produce('my_topic', key=str(i), value=f'message {i}', callback=delivery_report)
p.flush()
در این کد یک Producer ساخته شده و با استفاده از تابع callback وضعیت تحویل بررسی میشود. گزینههای linger.ms و compression.type برای افزایش تراکم و زمانبندی ارسال بستهها بهمنظور بهبود کارایی به کار رفتهاند. در انتها با p.flush() تضمین میکنیم تمام پیامها ارسال شوند.
نمونه ساده: مصرفکننده (Consumer)
from confluent_kafka import Consumer, KafkaError
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my_group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False
}
c = Consumer(conf)
c.subscribe(['my_topic'])
try:
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(f"Error: {msg.error()}")
break
print(f"Received: {msg.key()} -> {msg.value()}")
c.commit(asynchronous=False)
except KeyboardInterrupt:
pass
finally:
c.close()
در این Verbrauch/Consumer نمونه، offsetها بهصورت دستی commit میشوند (synchronous) تا کنترل دقیقتری روی پردازش پیامها داشته باشیم. گزینه auto.offset.reset کنترل میکند که در صورت عدم وجود offset شناختهشده چه اتفاقی بیافتد.
یکپارچگی با Schema Registry و Avro
برای دادههای ساختیافته استفاده از Schema Registry و Avro (یا Protobuf/JSON Schema) مزایای زیادی دارد: تطابق schema، تکامل schema و ذخیرهی مرکزی تعاریف. نمونهٔ Avro بهصورت زیر است:
from confluent_kafka.avro import AvroProducer
value_schema_str = """
{
"namespace": "example",
"name": "User",
"type": "record",
"fields" : [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
"""
conf = {'bootstrap.servers': 'localhost:9092', 'schema.registry.url': 'http://localhost:8081'}
avro_prod = AvroProducer(conf, default_value_schema=value_schema_str)
avro_prod.produce(topic='users', value={'name': 'Ali', 'age': 30})
avro_prod.flush()
این مثال نشان میدهد چگونه میتوان یک schema تعریف کرد و با AvroProducer پیامهایی را ثبت کرد که با Schema Registry مطابقت دارند.
پیکربندیهای حیاتی و نکات بهینهسازی
برخی از تنظیمات مهم که در تولید و مصرف برای رسیدن به عملکرد مطلوب باید توجه شوند:
| کلید | شرح |
|---|---|
| bootstrap.servers | آدرس بروکرها |
| acks | معینکننده تضمین تحویل (0,1,all) |
| compression.type | lz4، gzip یا snappy برای کاهش پهنایباند |
| linger.ms | بافر کردن پیامها قبل از ارسال برای batching |
| enable.idempotence | فعال کردن idempotent producer برای جلوگیری از Duplicate |
| transactional.id | برای تراکنشها و exactly-once |
خطاها، rebalance و مدیریت موقعیتها
در مصرفکنندهها، rebalanceها ممکن است موجب انتقال پارتیشنها بین اعضای گروه شوند. باید از callbacks مربوط به rebalance در مواقع حساس استفاده کنید تا commit یا cleanup مناسب انجام گیرد. مدیریت درست offset و retry منطقی از اهمیت بالایی برخوردار است.
امنیت و احراز هویت (SASL/SSL)
confluent-kafka از SASL/PLAIN، SASL/SCRAM، و SSL پشتیبانی میکند. نمونهٔ پیکربندی SASL:
conf = {
'bootstrap.servers': 'broker:9093',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': 'user',
'sasl.password': 'pass'
}
این پیکربندی بهراحتی در Producer یا Consumer قرار میگیرد و ارتباط امن را برقرار میکند.
نکات حرفهای و توصیهها
- برای تاخیر کمتر در تولید واقعی، مقدار linger.ms را بسیار کوچک و batch.size و compression را بهینه کنید.
- در پردازشهای بحرانی از commit دستی استفاده کنید تا از از دست رفتن یا پردازش دوباره جلوگیری شود.
- در حجم بالا از monitoring استفاده کنید: نرخ تولید/مصرف، lag، latency و خطاها را ثبت کنید.
- برای تراکنشهای بین چند topic از transactional.id و API تراکنشها استفاده کنید تا exactly-once semantics بدست آید.
مانیتورینگ و ابزارها
برای پایش سلامت باید متریکهای librdkafka را بخوانید یا از ابزارهایی مثل Confluent Control Center، Prometheus و Grafana بهره ببرید. متریکهایی مانند broker latency، request_rate، consumer_lag و retry_count مهماند.
جمعبندی
confluent-kafka در پایتون یک انتخاب قدرتمند برای ساخت سیستمهای مبتنی بر رویداد است. این کتابخانه برای پروژههایی که نیاز به عملکرد بالا، قابلیت اطمینان و یکپارچگی با اکوسیستم Confluent دارند مناسب است. با درک درست پیکربندیها، مدیریت offsetها و استفاده از Schema Registry میتوانید راهکارهای مقیاسپذیر و قابل اعتمادی پیادهسازی کنید.
آیا این مطلب برای شما مفید بود ؟




