کار با استریم ها در Node.js
استریمها (Streams) یکی از پایههای پردازش داده در Node.js هستند. آنها بهجای بارگذاری کامل داده در حافظه، آن را به قطعات (chunk) تقسیم میکنند و بهتدریج پردازش میکنند. این موضوع برای فایلهای بزرگ، شبکه، فشردهسازی و کار با پایگاهداده بسیار حیاتی است؛ چون مصرف حافظه را پایین میآورد و تأخیر را کاهش میدهد.
انواع استریمها
در Node.js چهار نوع اصلی وجود دارد:
- Readable: فقط میخواند (مثلاً fs.createReadStream)
- Writable: فقط مینویسد (مثلاً fs.createWriteStream)
- Duplex: همخواندنی و همنوشتنی (مثلاً TCP sockets)
- Transform: نوعی Duplex که روی دادهها تبدیل انجام میدهد (مثلاً فشردهسازی)
| نوع | کاربرد | مثال |
|---|---|---|
| Readable | خواندن داده | fs.createReadStream |
| Writable | نوشتن داده | fs.createWriteStream |
| Duplex | ارسال/دریافت همزمان | net.Socket |
| Transform | تبدیل جریان داده | zlib.createGzip |
سادهترین مثال: کپی فایل با pipeline
const fs = require('fs');
const { pipeline } = require('stream');
const { promisify } = require('util');
const pipe = promisify(pipeline);
async function copyFile(src, dest) {
await pipe(
fs.createReadStream(src),
fs.createWriteStream(dest)
);
}
copyFile('large-input.bin', 'large-output.bin')
.then(()=> console.log('Copied'))
.catch(err => console.error('Error:', err));این کد از stream.pipeline استفاده شده (با promisify) تا خطاها مدیریت و منابع بهدرستی آزاد شوند. pipeline خودکاراً در صورت رخداد خطا جریانهای مرتبط را Destroy میکند و از نشت حافظه جلوگیری میکند.
مدیریت backpressure
در استریمها ممکن است تولیدکننده سریعتر از مصرفکننده داده تولید کند. Writable.write مقدار بولی برمیگرداند: اگر false باشد، باید منتظر رخداد ‘drain’ بمانید تا ادامه دهید.
const ws = fs.createWriteStream('out.dat');
function writeLots(stream, data) {
let i = 100000;
function write() {
let ok = true;
while (i-- > 0 && ok) {
ok = stream.write(data);
}
if (!ok) {
stream.once('drain', write);
} else {
stream.end();
}
}
write();
}
writeLots(ws, Buffer.alloc(1024));در این مثال اگر write بازگرداند false، تابع منتظر ‘drain’ میشود تا دوباره نوشتن را ادامه دهد. این الگو مانع از پر شدن حافظه میشود.
مثال Transform برای تبدیل متن به حروف بزرگ
const { Transform } = require('stream');
class UpperCase extends Transform {
_transform(chunk, encoding, callback) {
try {
const data = chunk.toString().toUpperCase();
callback(null, Buffer.from(data));
} catch (err) {
callback(err);
}
}
}
const upper = new UpperCase();
process.stdin.pipe(upper).pipe(process.stdout);این کلاس Transform هر chunk را دریافت میکند، به حروف بزرگ تبدیل میکند و آن را عبور میدهد. استفاده از Transform برای پردازشهای خطی (مانند فشردهسازی، رمزنگاری یا فیلتر خطوط) بسیار مناسب است.
استفاده از async iterator برای خواندن استریم
const fs = require('fs');
async function readChunks(path) {
const stream = fs.createReadStream(path);
for await (const chunk of stream) {
console.log('Got chunk of size', chunk.length);
}
}
readChunks('large-file.txt').catch(console.error);این روش با for await…of ساده و خوانا است. مناسب زمانیست که بخواهیم هر chunk را مرحلهبهمرحله پردازش کنیم بدون استفاده از callback یا events.
فشردهسازی و شبکه — نمونه pipeline واقعی
const fs = require('fs');
const zlib = require('zlib');
const { pipeline } = require('stream');
const { promisify } = require('util');
const pipe = promisify(pipeline);
async function compress(src, dest) {
const gzip = zlib.createGzip();
await pipe(
fs.createReadStream(src),
gzip,
fs.createWriteStream(dest)
);
}در اینجا از zlib.createGzip برای فشردهسازی در حین خواندن فایل استفاده شده. pipeline تضمین میکند که در صورت خطا همه جریانها بسته شوند. این الگو برای ارسال پاسخ HTTP فشرده یا ذخیرهسازی بهصورت بلادرنگ بسیار کاراست.
نکات عملکردی و پیشرفته
- highWaterMark را برای کنترل اندازه بافر تنظیم کنید تا trade-off بین I/O و مصرف حافظه مناسب شود.
- در objectMode میتوانید اشیاء جاوااسکریپتی را بهعنوان chunks منتقل کنید؛ مثلاً خواندن ردیفهای JSON.
- از stream.finished یا require(‘stream’).promises.finished برای اطمینان از بستهشدن کامل استفاده کنید.
- همیشه خطاها را روی هر استریم گوش بدهید یا از pipeline (یا stream.promises.pipeline) استفاده کنید تا مدیریت خطا متمرکز باشد.
- برای عملیاتهای موازی کوچک، مراقب blocking loop باشید؛ از جریانها و صفها (queues) استفاده کنید.
مثال بهینهسازی: استفاده از highWaterMark در Readable
const fs = require('fs');
const rs = fs.createReadStream('video.mp4', { highWaterMark: 64 * 1024 }); // 64KB
rs.on('data', chunk => {
// پردازش chunk
});
با تنظیم highWaterMark میتوانید اندازه chunkها را تنظیم کنید. اگر I/O شما ضعیف است، افزایش اندازه ممکن است کارایی را بالا ببرد؛ اما مصرف حافظه هم افزایش مییابد. مقدار پیشفرض برای فایلها معمولاً مناسب است اما در موارد خاص این تنظیم مفید است.
موارد کاربردی (Use Cases)
- ارسال و دریافت فایلهای بزرگ از طریق HTTP بدون بارگذاری کامل در رم
- فشردهسازی و رمزنگاری بلادرنگ
- پردازش جریان دادههای لاگ یا CSV و تبدیل/فیلتر به قطعات کوچک
- جریاندهی نتایج پایگاهداده یا تولید گزارشهای streaming-friendly
خلاصه و پیشنهادات
استریمها ابزار قدرتمندی برای مدیریت دادههای بزرگ و کمهزینهاند. از pipeline و stream.promises برای مدیریت خطا و پاکسازی منابع استفاده کنید. backpressure را در نظر بگیرید و highWaterMark یا objectMode را متناسب با نیاز تنظیم کنید. در نهایت، با تمرین روی کیسهای واقعی (فشردهسازی، HTTP، فایل) بهینهترین الگوها را خواهید یافت.
آیا این مطلب برای شما مفید بود ؟




