fix: add stream limit

This commit is contained in:
KernelDeimos 2024-06-19 17:20:09 -04:00 committed by Eric Dubé
parent 2008db0852
commit ceba309dbd
3 changed files with 34 additions and 1 deletions

View File

@ -41,6 +41,7 @@ class LocalDiskUploadStrategy extends BaseOperation {
await this.parent.svc_localDiskStorage.store_stream({
key: uid,
stream: file.stream,
size: file.size,
on_progress: evt => {
progress_tracker.set_total(file.size);
progress_tracker.set(evt.uploaded);

View File

@ -18,7 +18,7 @@
*/
const { LocalDiskStorageStrategy } = require("../filesystem/strategies/storage_a/LocalDiskStorageStrategy");
const { TeePromise } = require("../util/promise");
const { progress_stream } = require("../util/streamutil");
const { progress_stream, size_limit_stream } = require("../util/streamutil");
const BaseService = require("./BaseService");
class LocalDiskStorageService extends BaseService {
@ -58,6 +58,10 @@ class LocalDiskStorageService extends BaseService {
total: size,
progress_callback: on_progress,
});
stream = size_limit_stream(stream, {
limit: size,
});
const writePromise = new TeePromise();

View File

@ -310,6 +310,33 @@ const progress_stream = (source, { total, progress_callback }) => {
return stream;
}
class SizeLimitingStream extends Transform {
constructor(options, { limit }) {
super(options);
this.limit = limit;
this.loaded = 0;
}
_transform(chunk, encoding, callback) {
this.loaded += chunk.length;
if ( this.loaded > this.limit ) {
const excess = this.loaded - this.limit;
chunk = chunk.slice(0, chunk.length - excess);
}
this.push(chunk);
if ( this.loaded >= this.limit ) {
this.end();
}
callback();
}
}
const size_limit_stream = (source, { limit }) => {
const stream = new SizeLimitingStream({}, { limit });
source.pipe(stream);
return stream;
}
class StuckDetectorStream extends Transform {
constructor(options, {
timeout,
@ -459,6 +486,7 @@ module.exports = {
logging_stream,
offset_write_stream,
progress_stream,
size_limit_stream,
stuck_detector_stream,
string_to_stream,
chunk_stream,