Attempt to make BetterReader buffer and cancel

This commit is contained in:
KernelDeimos 2024-04-19 01:48:07 -04:00
parent d1db4a11ef
commit 2735a79e6e
4 changed files with 100 additions and 15 deletions

View File

@ -27,21 +27,56 @@ export class BetterReader {
this.chunks_ = []; this.chunks_ = [];
} }
async read (opt_buffer) { _create_cancel_response () {
return {
chunk: null,
n_read: 0,
debug_meta: {
source: 'delegate',
returning: 'cancelled',
this_value_should_not_be_used: true,
},
};
}
async read_and_get_info (opt_buffer, cancel_state) {
if ( ! opt_buffer && this.chunks_.length === 0 ) { if ( ! opt_buffer && this.chunks_.length === 0 ) {
return await this.delegate.read(); const chunk = await this.delegate.read();
if ( cancel_state?.cancelled ) {
// push the chunk back onto the queue
this.chunks_.push(chunk);
return this._create_cancel_response();
}
return {
chunk,
debug_meta: { source: 'delegate' },
};
} }
const chunk = await this.getChunk_(); const chunk = await this.getChunk_();
if ( cancel_state?.cancelled ) {
// push the chunk back onto the queue
this.chunks_.push(chunk);
return this._create_cancel_response();
}
if ( ! opt_buffer || ! chunk ) { if ( ! opt_buffer ) {
return chunk; return { chunk, debug_meta: { source: 'stored chunks', returning: 'chunk' } };
}
if ( ! chunk ) {
return { n_read: 0, debug_meta: { source: 'nothing', returning: 'byte count' } };
} }
this.chunks_.push(chunk); this.chunks_.push(chunk);
while ( this.getTotalBytesReady_() < opt_buffer.length ) { while ( this.getTotalBytesReady_() < opt_buffer.length ) {
const read_chunk = await this.getChunk_(); const read_chunk = await this.getChunk_();
if ( cancel_state?.cancelled ) {
// push the chunk back onto the queue
this.chunks_.push(read_chunk);
return this._create_cancel_response();
}
if ( ! read_chunk ) { if ( ! read_chunk ) {
break; break;
} }
@ -63,14 +98,37 @@ export class BetterReader {
offset += item.length; offset += item.length;
} }
return offset; return {
n_read: offset,
debug_meta: { source: 'stored chunks', returning: 'byte count' },
};
}
read_with_cancel (opt_buffer) {
const cancel_state = { cancelled: false };
const promise = (async () => {
const { chunk, n_read } = await this.read_and_get_info(opt_buffer, cancel_state);
console.log('this is outputting...', chunk, n_read, opt_buffer);
return opt_buffer ? n_read : chunk;
})();
return {
canceller: () => {
cancel_state.cancelled = true;
},
promise,
};
}
async read (opt_buffer) {
const { chunk, n_read } = await this.read_and_get_info(opt_buffer);
console.log('this is outputting...', chunk, n_read, opt_buffer);
return opt_buffer ? n_read : chunk;
} }
async getChunk_() { async getChunk_() {
if ( this.chunks_.length === 0 ) { if ( this.chunks_.length === 0 ) {
const { value } = await this.delegate.read().catch( ( err ) => { const { value } = await this.delegate.read();
return {}; console.log('got value?', value);
});
return value; return value;
} }
@ -84,6 +142,8 @@ export class BetterReader {
this.chunks_ = []; this.chunks_ = [];
console.log('returning merged', merged);
return merged; return merged;
} }

View File

@ -16,7 +16,7 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
import { TeePromise } from "../../promise.js"; import { TeePromise, raceCase } from "../../promise.js";
export class Coupler { export class Coupler {
static description = ` static description = `
@ -42,7 +42,6 @@ export class Coupler {
close () { close () {
if (this.debug) console.log('closing coupler. source is', this.source); if (this.debug) console.log('closing coupler. source is', this.source);
this.source.releaseLock();
this.closed_.resolve({ this.closed_.resolve({
done: true, done: true,
}); });
@ -51,11 +50,21 @@ export class Coupler {
async listenLoop_ () { async listenLoop_ () {
this.active = true; this.active = true;
for (;;) { for (;;) {
const { value, done } = await Promise.race([ let canceller = () => {};
this.closed_, let promise;
this.source.read(), if ( this.source.read_with_cancel !== undefined ) {
]); ({ canceller, promise } = this.source.read_with_cancel());
} else {
promise = this.source.read();
}
const [which, { value, done }] = await raceCase({
source: promise,
closed: this.closed_,
});
if ( done ) { if ( done ) {
if ( which === 'closed' ) {
canceller();
}
this.source = null; this.source = null;
this.target = null; this.target = null;
this.active = false; this.active = false;

View File

@ -57,9 +57,11 @@ const ReadlineProcessorBuilder = builder => builder
const { locals, externs } = ctx; const { locals, externs } = ctx;
const byteBuffer = new Uint8Array(1); const byteBuffer = new Uint8Array(1);
const bytesRead = await externs.in_.read(byteBuffer); const { n_read: bytesRead, debug_meta } = await externs.in_.read_and_get_info(byteBuffer);
if (bytesRead !== 1) { if (bytesRead !== 1) {
console.warn('Failed to read byte in get-byte state of readline'); console.warn('Failed to read byte in get-byte state of readline');
console.log('debug_meta', debug_meta); // so GC doesn't remove it
// debugger;
} }
locals.byteBuffer = byteBuffer; locals.byteBuffer = byteBuffer;
locals.byte = byteBuffer[0]; locals.byte = byteBuffer[0];

View File

@ -41,3 +41,17 @@ export class TeePromise {
return this.then(fn); return this.then(fn);
} }
} }
/**
* raceCase is like Promise.race except it takes an object instead of
* an array, and returns the key of the promise that resolves first
* as well as the value that it resolved to.
*
* @param {Object.<string, Promise>} promise_map
*
* @returns {Promise.<[string, any]>}
*/
export const raceCase = async (promise_map) => {
return Promise.race(Object.entries(promise_map).map(
([key, promise]) => promise.then(value => [key, value])));
};