ویژگی تصویر

کار با استریم‌ها در Node.js — مقدمه‌ای عملی

  /  Node.js   /  کار با استریم ها در Node.js
بنر تبلیغاتی الف
NodeJS - Node.js

استریم‌ها (Streams) یکی از پایه‌‌های پردازش داده در Node.js هستند. آن‌ها به‌جای بارگذاری کامل داده در حافظه، آن را به قطعات (chunk) تقسیم می‌کنند و به‌تدریج پردازش می‌کنند. این موضوع برای فایل‌های بزرگ، شبکه، فشرده‌سازی و کار با پایگاه‌داده بسیار حیاتی است؛ چون مصرف حافظه را پایین می‌آورد و تأخیر را کاهش می‌دهد.

انواع استریم‌ها

در Node.js چهار نوع اصلی وجود دارد:

  • Readable: فقط می‌خواند (مثلاً fs.createReadStream)
  • Writable: فقط می‌نویسد (مثلاً fs.createWriteStream)
  • Duplex: هم‌خواندنی و هم‌نوشتنی (مثلاً TCP sockets)
  • Transform: نوعی Duplex که روی داده‌ها تبدیل انجام می‌دهد (مثلاً فشرده‌سازی)
نوعکاربردمثال
Readableخواندن دادهfs.createReadStream
Writableنوشتن دادهfs.createWriteStream
Duplexارسال/دریافت همزمانnet.Socket
Transformتبدیل جریان دادهzlib.createGzip

ساده‌ترین مثال: کپی فایل با pipeline

const fs = require('fs');
const { pipeline } = require('stream');
const { promisify } = require('util');
const pipe = promisify(pipeline);

async function copyFile(src, dest) {
  await pipe(
    fs.createReadStream(src),
    fs.createWriteStream(dest)
  );
}

copyFile('large-input.bin', 'large-output.bin')
  .then(()=> console.log('Copied'))
  .catch(err => console.error('Error:', err));

این کد از stream.pipeline استفاده شده (با promisify) تا خطاها مدیریت و منابع به‌درستی آزاد شوند. pipeline خودکاراً در صورت رخداد خطا جریان‌های مرتبط را Destroy می‌کند و از نشت حافظه جلوگیری می‌کند.

مدیریت backpressure

در استریم‌ها ممکن است تولیدکننده سریع‌تر از مصرف‌کننده داده تولید کند. Writable.write مقدار بولی برمی‌گرداند: اگر false باشد، باید منتظر رخداد ‘drain’ بمانید تا ادامه دهید.

const ws = fs.createWriteStream('out.dat');

function writeLots(stream, data) {
  let i = 100000;
  function write() {
    let ok = true;
    while (i-- > 0 && ok) {
      ok = stream.write(data);
    }
    if (!ok) {
      stream.once('drain', write);
    } else {
      stream.end();
    }
  }
  write();
}

writeLots(ws, Buffer.alloc(1024));

در این مثال اگر write بازگرداند false، تابع منتظر ‘drain’ می‌شود تا دوباره نوشتن را ادامه دهد. این الگو مانع از پر شدن حافظه می‌شود.

مثال Transform برای تبدیل متن به حروف بزرگ

const { Transform } = require('stream');

class UpperCase extends Transform {
  _transform(chunk, encoding, callback) {
    try {
      const data = chunk.toString().toUpperCase();
      callback(null, Buffer.from(data));
    } catch (err) {
      callback(err);
    }
  }
}

const upper = new UpperCase();
process.stdin.pipe(upper).pipe(process.stdout);

این کلاس Transform هر chunk را دریافت می‌کند، به حروف بزرگ تبدیل می‌کند و آن را عبور می‌دهد. استفاده از Transform برای پردازش‌های خطی (مانند فشرده‌سازی، رمزنگاری یا فیلتر خطوط) بسیار مناسب است.

استفاده از async iterator برای خواندن استریم

const fs = require('fs');

async function readChunks(path) {
  const stream = fs.createReadStream(path);
  for await (const chunk of stream) {
    console.log('Got chunk of size', chunk.length);
  }
}

readChunks('large-file.txt').catch(console.error);

این روش با for await…of ساده و خوانا است. مناسب زمانی‌ست که بخواهیم هر chunk را مرحله‌به‌مرحله پردازش کنیم بدون استفاده از callback یا events.

فشرده‌سازی و شبکه — نمونه pipeline واقعی

const fs = require('fs');
const zlib = require('zlib');
const { pipeline } = require('stream');
const { promisify } = require('util');
const pipe = promisify(pipeline);

async function compress(src, dest) {
  const gzip = zlib.createGzip();
  await pipe(
    fs.createReadStream(src),
    gzip,
    fs.createWriteStream(dest)
  );
}

در اینجا از zlib.createGzip برای فشرده‌سازی در حین خواندن فایل استفاده شده. pipeline تضمین می‌کند که در صورت خطا همه جریان‌ها بسته شوند. این الگو برای ارسال پاسخ HTTP فشرده یا ذخیره‌سازی به‌صورت بلادرنگ بسیار کاراست.

نکات عملکردی و پیشرفته

  • highWaterMark را برای کنترل اندازه بافر تنظیم کنید تا trade-off بین I/O و مصرف حافظه مناسب شود.
  • در objectMode می‌توانید اشیاء جاوااسکریپتی را به‌عنوان chunks منتقل کنید؛ مثلاً خواندن ردیف‌های JSON.
  • از stream.finished یا require(‘stream’).promises.finished برای اطمینان از بسته‌شدن کامل استفاده کنید.
  • همیشه خطاها را روی هر استریم گوش بدهید یا از pipeline (یا stream.promises.pipeline) استفاده کنید تا مدیریت خطا متمرکز باشد.
  • برای عملیات‌های موازی کوچک، مراقب blocking loop باشید؛ از جریان‌ها و صف‌ها (queues) استفاده کنید.

مثال بهینه‌سازی: استفاده از highWaterMark در Readable

const fs = require('fs');

const rs = fs.createReadStream('video.mp4', { highWaterMark: 64 * 1024 }); // 64KB
rs.on('data', chunk => {
  // پردازش chunk
});

با تنظیم highWaterMark می‌توانید اندازه chunkها را تنظیم کنید. اگر I/O شما ضعیف است، افزایش اندازه ممکن است کارایی را بالا ببرد؛ اما مصرف حافظه هم افزایش می‌یابد. مقدار پیش‌فرض برای فایل‌ها معمولاً مناسب است اما در موارد خاص این تنظیم مفید است.

موارد کاربردی (Use Cases)

  • ارسال و دریافت فایل‌های بزرگ از طریق HTTP بدون بارگذاری کامل در رم
  • فشرده‌سازی و رمزنگاری بلادرنگ
  • پردازش جریان داده‌های لاگ یا CSV و تبدیل/فیلتر به قطعات کوچک
  • جریان‌دهی نتایج پایگاه‌داده یا تولید گزارش‌های streaming-friendly

خلاصه و پیشنهادات

استریم‌ها ابزار قدرتمندی برای مدیریت داده‌های بزرگ و کم‌هزینه‌اند. از pipeline و stream.promises برای مدیریت خطا و پاک‌سازی منابع استفاده کنید. backpressure را در نظر بگیرید و highWaterMark یا objectMode را متناسب با نیاز تنظیم کنید. در نهایت، با تمرین روی کیس‌های واقعی (فشرده‌سازی، HTTP، فایل) بهینه‌ترین الگوها را خواهید یافت.

آیا این مطلب برای شما مفید بود ؟

خیر
بله
موضوعات شما در انجمن: