puter/packages/pty/exports.js

383 lines
10 KiB
JavaScript
Raw Normal View History

2024-04-13 08:53:44 +08:00
/*
* Copyright (C) 2024 Puter Technologies Inc.
*
* This file is part of Phoenix Shell.
*
* Phoenix Shell is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
2024-06-08 13:07:42 +08:00
import { libs } from '@heyputer/puter-js-common';
const { TeePromise, raceCase } = libs.promise;
2024-04-13 08:53:44 +08:00
const encoder = new TextEncoder();
const CHAR_LF = '\n'.charCodeAt(0);
const CHAR_CR = '\r'.charCodeAt(0);
2024-04-20 11:24:32 +08:00
const DONE = Symbol('done');
class Channel {
constructor () {
this.chunks_ = [];
globalThis.chnl = this;
const events = ['write','consume','change'];
for ( const event of events ) {
this[`on_${event}_`] = [];
this[`emit_${event}_`] = () => {
for ( const listener of this[`on_${event}_`] ) {
listener();
}
};
}
this.on('write', () => { this.emit_change_(); });
this.on('consume', () => { this.emit_change_(); });
}
on (event, listener) {
this[`on_${event}_`].push(listener);
}
off (event, listener) {
const index = this[`on_${event}_`].indexOf(listener);
if ( index !== -1 ) {
this[`on_${event}_`].splice(index, 1);
}
}
get () {
const cancel = new TeePromise();
const data = new TeePromise();
const done = new TeePromise();
let called = 0;
const on_data = () => {
if ( this.chunks_.length > 0 ) {
if ( called > 0 ) {
throw new Error('called more than once');
}
called++;
const chunk = this.chunks_.shift();
( chunk === DONE ? done : data ).resolve(chunk);
this.off('write', on_data);
this.emit_consume_();
}
};
this.on('write', on_data);
on_data();
const to_return = {
cancel: () => {
this.off('write', on_data);
cancel.resolve();
},
promise: raceCase({
cancel,
data,
done,
}),
};
return to_return;
}
write (chunk) {
this.chunks_.push(chunk);
this.emit_write_();
}
pushback (...chunks) {
for ( let i = chunks.length - 1; i >= 0; i-- ) {
this.chunks_.unshift(chunks[i]);
}
this.emit_write_();
}
is_empty () {
return this.chunks_.length === 0;
}
}
2024-04-13 08:53:44 +08:00
export class BetterReader {
constructor ({ delegate }) {
this.delegate = delegate;
this.chunks_ = [];
2024-04-20 11:24:32 +08:00
this.channel_ = new Channel();
this._init();
}
_init () {
let working = Promise.resolve();
this.channel_.on('consume', async () => {
await working;
working = new TeePromise();
if ( this.channel_.is_empty() ) {
await this.intake_();
}
working.resolve();
});
this.intake_();
2024-04-13 08:53:44 +08:00
}
2024-04-20 11:24:32 +08:00
async intake_ () {
const { value, done } = await this.delegate.read();
if ( done ) {
this.channel_.write(DONE);
return;
}
this.channel_.write(value);
}
_create_cancel_response () {
return {
chunk: null,
n_read: 0,
debug_meta: {
source: 'delegate',
returning: 'cancelled',
this_value_should_not_be_used: true,
},
};
}
2024-04-20 11:24:32 +08:00
read_and_get_info (opt_buffer, cancel_state) {
if ( ! opt_buffer ) {
const { promise, cancel } = this.channel_.get();
return {
cancel,
promise: promise.then(([which, chunk]) => {
if ( which !== 'data' ) {
return { done: true, value: null };
}
return { value: chunk };
}),
};
}
const final_promise = new TeePromise();
let current_cancel_ = () => {};
(async () => {
let n_read = 0;
const chunks = [];
while ( n_read < opt_buffer.length ) {
const { promise, cancel } = this.channel_.get();
current_cancel_ = cancel;
let [which, chunk] = await promise;
if ( which === 'done' ) {
break;
}
if ( which === 'cancel' ) {
this.channel_.pushback(...chunks);
2024-06-08 13:07:42 +08:00
return
2024-04-20 11:24:32 +08:00
}
if ( n_read + chunk.length > opt_buffer.length ) {
const diff = opt_buffer.length - n_read;
this.channel_.pushback(chunk.subarray(diff));
chunk = chunk.subarray(0, diff);
}
chunks.push(chunk);
opt_buffer.set(chunk, n_read);
n_read += chunk.length;
}
final_promise.resolve({ n_read });
})();
return {
cancel: () => {
current_cancel_();
},
promise: final_promise,
};
}
read_with_cancel (opt_buffer) {
2024-04-20 11:24:32 +08:00
const o = this.read_and_get_info(opt_buffer);
const { cancel, promise } = o;
// const promise = (async () => {
// const { chunk, n_read } = await this.read_and_get_info(opt_buffer, cancel_state);
// return opt_buffer ? n_read : chunk;
// })();
return {
2024-04-20 11:24:32 +08:00
cancel,
promise,
};
}
async read (opt_buffer) {
2024-04-20 11:24:32 +08:00
const { chunk, n_read } = await this.read_and_get_info(opt_buffer).promise;
return opt_buffer ? n_read : chunk;
2024-04-13 08:53:44 +08:00
}
async getChunk_() {
if ( this.chunks_.length === 0 ) {
// Wait for either a delegate read to happen, or for a chunk to be added to the buffer from a cancelled read.
const delegate_read = this.delegate.read();
const [which, result] = await raceCase({
delegate: delegate_read,
buffer_not_empty: this.waitUntilDataAvailable(),
});
if (which === 'delegate') {
2024-04-20 11:24:32 +08:00
return result;
}
2024-04-20 11:24:32 +08:00
// There's a chunk in the buffer now, so we can use the regular path.
// But first, make sure that once the delegate read completes, we save the chunk.
2024-04-20 11:24:32 +08:00
this.chunks_.push(result);
2024-04-13 08:53:44 +08:00
}
const len = this.getTotalBytesReady_();
const merged = new Uint8Array(len);
let offset = 0;
for ( const item of this.chunks_ ) {
merged.set(item, offset);
offset += item.length;
}
this.chunks_ = [];
return merged;
}
getTotalBytesReady_ () {
2024-04-20 11:24:32 +08:00
return this.chunks_.reduce((sum, chunk) => {
return sum + chunk.value.length
}, 0);
2024-04-13 08:53:44 +08:00
}
canRead() {
return this.getTotalBytesReady_() > 0;
}
async waitUntilDataAvailable() {
let resolve_promise;
let reject_promise;
const promise = new Promise((resolve, reject) => {
resolve_promise = resolve;
reject_promise = reject;
});
const check = () => {
if (this.canRead()) {
resolve_promise();
} else {
setTimeout(check, 0);
}
};
setTimeout(check, 0);
await promise;
}
2024-04-13 08:53:44 +08:00
}
/**
* PTT: pseudo-terminal target; called "slave" in POSIX
*/
export class PTT {
constructor(pty) {
this.readableStream = new ReadableStream({
start: controller => {
this.readController = controller;
}
});
this.writableStream = new WritableStream({
start: controller => {
this.writeController = controller;
},
write: chunk => {
if (typeof chunk === 'string') {
chunk = encoder.encode(chunk);
}
if ( pty.outputModeflags?.outputNLCR ) {
chunk = pty.LF_to_CRLF(chunk);
}
pty.readController.enqueue(chunk);
}
});
this.out = this.writableStream.getWriter();
this.in = this.readableStream.getReader();
}
}
/**
* PTY: pseudo-terminal
2024-06-08 13:07:42 +08:00
*
2024-04-13 08:53:44 +08:00
* This implements the PTY device driver.
*/
export class PTY {
constructor () {
this.outputModeflags = {
outputNLCR: true
};
this.readableStream = new ReadableStream({
start: controller => {
this.readController = controller;
}
});
this.writableStream = new WritableStream({
start: controller => {
this.writeController = controller;
},
write: chunk => {
if ( typeof chunk === 'string' ) {
chunk = encoder.encode(chunk);
}
for ( const target of this.targets ) {
target.readController.enqueue(chunk);
}
}
});
this.out = this.writableStream.getWriter();
this.in = this.readableStream.getReader();
this.targets = [];
}
getPTT () {
const target = new PTT(this);
this.targets.push(target);
return target;
}
LF_to_CRLF (input) {
let lfCount = 0;
for (let i = 0; i < input.length; i++) {
if (input[i] === 0x0A) {
lfCount++;
}
}
const output = new Uint8Array(input.length + lfCount);
let outputIndex = 0;
for (let i = 0; i < input.length; i++) {
// If LF is encountered, insert CR (0x0D) before LF (0x0A)
if (input[i] === 0x0A) {
output[outputIndex++] = 0x0D;
}
output[outputIndex++] = input[i];
}
return output;
}
}