mirror of
https://github.com/HeyPuter/puter.git
synced 2025-02-02 14:18:43 +08:00
fix: buffer incomplete JSON objects from AI stream
- simplify ndjson stream to get complete lines instead of chunks - add buffering for incomplete lines
This commit is contained in:
parent
af3d732fc4
commit
60eef2fc67
@ -282,7 +282,14 @@ async function driverCall_(
|
|||||||
let signal_stream_update = null;
|
let signal_stream_update = null;
|
||||||
let lastLength = 0;
|
let lastLength = 0;
|
||||||
let response_complete = false;
|
let response_complete = false;
|
||||||
const parts_received = [];
|
|
||||||
|
let buffer = '';
|
||||||
|
|
||||||
|
// NOTE: linked-list technically would perform better,
|
||||||
|
// but in practice there are at most 2-3 lines
|
||||||
|
// buffered so this does not matter.
|
||||||
|
const lines_received = [];
|
||||||
|
|
||||||
xhr.onreadystatechange = () => {
|
xhr.onreadystatechange = () => {
|
||||||
if ( xhr.readyState === 2 ) {
|
if ( xhr.readyState === 2 ) {
|
||||||
if ( xhr.getResponseHeader("Content-Type") !==
|
if ( xhr.getResponseHeader("Content-Type") !==
|
||||||
@ -295,13 +302,10 @@ async function driverCall_(
|
|||||||
signal_stream_update = tp.resolve.bind(tp);
|
signal_stream_update = tp.resolve.bind(tp);
|
||||||
await tp;
|
await tp;
|
||||||
if ( response_complete ) break;
|
if ( response_complete ) break;
|
||||||
while ( parts_received.length > 0 ) {
|
while ( lines_received.length > 0 ) {
|
||||||
const value = parts_received.pop();
|
const line = lines_received.shift();
|
||||||
const parts = value.split('\n');
|
if ( line.trim() === '' ) continue;
|
||||||
for ( const part of parts ) {
|
yield JSON.parse(line);
|
||||||
if ( part.trim() === '' ) continue;
|
|
||||||
yield JSON.parse(part);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -322,8 +326,19 @@ async function driverCall_(
|
|||||||
const newText = xhr.responseText.slice(lastLength);
|
const newText = xhr.responseText.slice(lastLength);
|
||||||
lastLength = xhr.responseText.length; // Update lastLength to the current length
|
lastLength = xhr.responseText.length; // Update lastLength to the current length
|
||||||
|
|
||||||
parts_received.push(newText);
|
let hasUpdates = false;
|
||||||
signal_stream_update();
|
for ( let i = 0; i < newText.length; i++ ) {
|
||||||
|
buffer += newText[i];
|
||||||
|
if ( newText[i] === '\n' ) {
|
||||||
|
hasUpdates = true;
|
||||||
|
lines_received.push(buffer);
|
||||||
|
buffer = '';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( hasUpdates ) {
|
||||||
|
signal_stream_update();
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// ========================
|
// ========================
|
||||||
|
Loading…
Reference in New Issue
Block a user