ساخت میکرو سرویس چت بلادرنگ در پایتون با WebSocket
در دنیای امروز، پیامرسانی بلادرنگ (Real-time messaging) یکی از الزامات کلیدی برای توسعه سیستمهای وب پویا است. میکروسرویسها در این زمینه نقش بسیار مهمی ایفا میکنند، بهویژه وقتی که نیاز به مقیاسپذیری و سادگی توسعه داریم. در این مقاله به ساخت یک میکروسرویس چت بلادرنگ با استفاده از پایتون و WebSocket خواهیم پرداخت.
چرا از WebSocket استفاده میکنیم؟
WebSocket یک پروتکل ارتباطی دوطرفه است که امکان ارسال و دریافت دادهها بدون نیاز به ارسال درخواستهای مداوم (polling) فراهم میکند. این موضوع باعث میشود که ارتباط بین کلاینت و سرور بهصورت مستقیم و سریع روند کند.
چه چیزهایی نیاز داریم؟
- پایتون (نسخه 3.7 یا بالاتر)
- FastAPI برای ساخت API و مدیریت HTTP
- WebSocket برای پیادهسازی ارتباط بلادرنگ
- Redis برای ذخیرهسازی اطلاعات موقت کاربران
مرحله اول: ساخت محیط توسعه
ابتدا یک پروژه جدید در پایتون ایجاد کرده و وابستگیهای مورد نیاز را نصب میکنیم:
pip install fastapi uvicorn websockets redisاین دستورات تمام کتابخانههای لازم برای توسعه یک سرویس چت بلادرنگ را نصب میکند. FastAPI به ما اجازه میدهد APIهای قوی و مستند شده بسازیم، و uvicorn به عنوان سرور ASGI عمل میکند.
مرحله دوم: طراحی ساختار پروژه
ساختار پروژه به صورت زیر است:
| پوشه/فایل | شرح |
|---|---|
| main.py | فایل اصلی سرور |
| websocket_manager.py | مدیریت اتصالات WebSocket |
| models.py | مدلهای داده |
مرحله سوم: پیادهسازی سرور اصلی
در فایل main.py، یک سرور FastAPI ایجاد میکنیم که به WebSocket متصل شود:
from fastapi import FastAPI, WebSocket
from websocket_manager import WebSocketManager
app = FastAPI()
manager = WebSocketManager()
@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
await manager.connect(websocket, user_id)
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f"User {user_id}: {data}")
except Exception as e:
print(f"Error: {e}")
finally:
await manager.disconnect(websocket, user_id)در این کد، ما یک endpoint WebSocket با مسیر /ws/{user_id} تعریف کردهایم. وقتی یک کاربر متصل شود، آن را در لیست اتصالات ذخیره میکنیم و پیامها را به تمامی کاربران دیگر ارسال میکنیم.
مرحله چهارم: مدیریت اتصالات WebSocket
در فایل websocket_manager.py، ساختاری برای مدیریت اتصالات ایجاد میکنیم:
from typing import Dict, Set
from fastapi import WebSocket
class WebSocketManager:
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
async def connect(self, websocket: WebSocket, user_id: str):
await websocket.accept()
self.active_connections[user_id] = websocket
def disconnect(self, websocket: WebSocket, user_id: str):
if user_id in self.active_connections:
del self.active_connections[user_id]
async def broadcast(self, message: str):
for connection in self.active_connections.values():
await connection.send_text(message)این کلاس یک دیکشنری از اتصالات فعال نگه میدارد. هر بار که یک کاربر متصل شود، آن را در این دیکشنری ذخیره میکند و وقتی پیامی دریافت شد، آن را به تمامی کاربران ارسال میکند.
بهینهسازی و امنیت
در یک سیستم واقعی، نیاز به اعتبارسنجی کاربر و مدیریت جلسات داریم. میتوانیم از JWT استفاده کنیم تا اطمینان حاصل کنیم که فقط کاربران معتبر متصل شوند.
چالشهای رایج و راهحلها
- خطای دسترسی: در صورتی که کاربر بدون احراز هویت متصل شود، میتوانیم اتصال را قطع کنیم.
- پر کردن حافظه: با استفاده از Redis میتوانیم اطلاعات کاربران را ذخیره کنیم و از حافظه سرور پاک کنیم.
پیادهسازی پیشرفته با Redis
برای مدیریت دادهها در حالت تکپردازنده و مقیاسپذیر، میتوانیم از Redis استفاده کنیم:
import redis
import json
r = redis.Redis(host='localhost', port=6379, db=0)
def save_user_connection(user_id: str, websocket_id: str):
r.hset("connections", user_id, websocket_id)
def get_user_connection(user_id: str):
return r.hget("connections", user_id)این قسمت به ما امکان میدهد که اتصالات را در Redis ذخیره کنیم و در مواقع نیاز به بازیابی آنها اقدام کنیم.
تست و تستهای عملی
برای تست سرویس چت، میتوانید از کلاینت WebSocket در مرورگر استفاده کنید:
const ws = new WebSocket("ws://localhost:8000/ws/user1");
ws.onmessage = function(event) {
console.log("Message from server:", event.data);
};این کد یک اتصال به سرور میگذارد و هر پیامی از سرور دریافت شود، آن را در کنسول نمایش میدهد.
جمعبندی
در این مقاله یاد گرفتیم چگونه یک میکروسرویس چت بلادرنگ با استفاده از پایتون و WebSocket بسازیم. ما با FastAPI، مدیریت اتصالات WebSocket، و استفاده از Redis برای ذخیرهسازی اطلاعات آشنا شدیم.
این ساختار قابل توسعه و مقیاسپذیر است و میتوانید آن را با فیچرهای بیشتر مانند اعتبارسنجی، لاگگیری، یا مدیریت دسترسی توسعه دهید.
آیا این مطلب برای شما مفید بود ؟




