diff --git a/packages/backend/src/filesystem/strategies/storage_a/LocalDiskStorageStrategy.js b/packages/backend/src/filesystem/strategies/storage_a/LocalDiskStorageStrategy.js index ae7470f4..fa01bddd 100644 --- a/packages/backend/src/filesystem/strategies/storage_a/LocalDiskStorageStrategy.js +++ b/packages/backend/src/filesystem/strategies/storage_a/LocalDiskStorageStrategy.js @@ -41,6 +41,7 @@ class LocalDiskUploadStrategy extends BaseOperation { await this.parent.svc_localDiskStorage.store_stream({ key: uid, stream: file.stream, + size: file.size, on_progress: evt => { progress_tracker.set_total(file.size); progress_tracker.set(evt.uploaded); diff --git a/packages/backend/src/services/LocalDiskStorageService.js b/packages/backend/src/services/LocalDiskStorageService.js index 14b6d300..0004fbcb 100644 --- a/packages/backend/src/services/LocalDiskStorageService.js +++ b/packages/backend/src/services/LocalDiskStorageService.js @@ -18,7 +18,7 @@ */ const { LocalDiskStorageStrategy } = require("../filesystem/strategies/storage_a/LocalDiskStorageStrategy"); const { TeePromise } = require("../util/promise"); -const { progress_stream } = require("../util/streamutil"); +const { progress_stream, size_limit_stream } = require("../util/streamutil"); const BaseService = require("./BaseService"); class LocalDiskStorageService extends BaseService { @@ -58,6 +58,10 @@ class LocalDiskStorageService extends BaseService { total: size, progress_callback: on_progress, }); + + stream = size_limit_stream(stream, { + limit: size, + }); const writePromise = new TeePromise(); diff --git a/packages/backend/src/util/streamutil.js b/packages/backend/src/util/streamutil.js index bd0e92dc..2f75f8fb 100644 --- a/packages/backend/src/util/streamutil.js +++ b/packages/backend/src/util/streamutil.js @@ -310,6 +310,33 @@ const progress_stream = (source, { total, progress_callback }) => { return stream; } +class SizeLimitingStream extends Transform { + constructor(options, { limit }) { + super(options); + this.limit = limit; + this.loaded = 0; + } + + _transform(chunk, encoding, callback) { + this.loaded += chunk.length; + if ( this.loaded > this.limit ) { + const excess = this.loaded - this.limit; + chunk = chunk.slice(0, chunk.length - excess); + } + this.push(chunk); + if ( this.loaded >= this.limit ) { + this.end(); + } + callback(); + } +} + +const size_limit_stream = (source, { limit }) => { + const stream = new SizeLimitingStream({}, { limit }); + source.pipe(stream); + return stream; +} + class StuckDetectorStream extends Transform { constructor(options, { timeout, @@ -459,6 +486,7 @@ module.exports = { logging_stream, offset_write_stream, progress_stream, + size_limit_stream, stuck_detector_stream, string_to_stream, chunk_stream,