dev: migrate ExpectationService and PagerService

This commit is contained in:
KernelDeimos 2024-12-05 13:20:06 -05:00
parent a2072c5fac
commit 3df5f649a4
10 changed files with 138 additions and 183 deletions

View File

@ -86,9 +86,7 @@ const install = async ({ services, app, useapi, modapi }) => {
// call to services.registerService. We'll clean this up
// in a future PR.
const { PagerService } = require('./services/runtime-analysis/PagerService');
const { CommandService } = require('./services/CommandService');
const { ExpectationService } = require('./services/runtime-analysis/ExpectationService');
const { HTTPThumbnailService } = require('./services/thumbnails/HTTPThumbnailService');
const { PureJSThumbnailService } = require('./services/thumbnails/PureJSThumbnailService');
const { NAPIThumbnailService } = require('./services/thumbnails/NAPIThumbnailService');
@ -141,8 +139,6 @@ const install = async ({ services, app, useapi, modapi }) => {
services.registerService('__api-filesystem', FilesystemAPIService);
services.registerService('__api', PuterAPIService);
services.registerService('__gui', ServeGUIService);
services.registerService('expectations', ExpectationService);
services.registerService('pager', PagerService);
services.registerService('registry', RegistryService);
services.registerService('__registrant', RegistrantService);
services.registerService('fslock', FSLockService);
@ -352,7 +348,6 @@ const install = async ({ services, app, useapi, modapi }) => {
}
const install_legacy = async ({ services }) => {
const { ProcessEventService } = require('./services/runtime-analysis/ProcessEventService');
// const { FilesystemService } = require('./filesystem/FilesystemService');
const PerformanceMonitor = require('./monitor/PerformanceMonitor');
const { OperationTraceService } = require('./services/OperationTraceService');
@ -362,7 +357,6 @@ const install_legacy = async ({ services }) => {
const { FileCacheService } = require('./services/file-cache/FileCacheService');
// === Services which do not yet extend BaseService ===
services.registerService('process-event', ProcessEventService);
// services.registerService('filesystem', FilesystemService);
services.registerService('operationTrace', OperationTraceService);
services.registerService('file-cache', FileCacheService);

View File

@ -19,11 +19,11 @@
const { AdvancedBase } = require('@heyputer/putility');
const PathResolver = require('../../routers/filesystem_api/batch/PathResolver');
const commands = require('./commands').commands;
const { WorkUnit } = require('../../services/runtime-analysis/ExpectationService');
const APIError = require('../../api/APIError');
const { Context } = require('../../util/context');
const config = require('../../config');
const { TeePromise } = require('../../util/promise');
const { WorkUnit } = require('../../modules/core/lib/expect');
class BatchExecutor extends AdvancedBase {
constructor (x, { actor, log, errors }) {

View File

@ -27,6 +27,15 @@ class Core2Module extends AdvancedBase {
const { ErrorService } = require("./ErrorService.js");
services.registerService('error-service', ErrorService);
const { PagerService } = require("./PagerService.js");
services.registerService('pager', PagerService);
const { ExpectationService } = require("./ExpectationService.js");
services.registerService('expectations', ExpectationService);
const { ProcessEventService } = require("./ProcessEventService.js");
services.registerService('process-event', ProcessEventService);
}
}

View File

@ -18,82 +18,8 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
const { v4: uuidv4 } = require('uuid');
const { quot } = require('../../util/strutil');
const BaseService = require('../BaseService');
const BaseService = require('../../services/BaseService');
/**
* @class WorkUnit
* @description The WorkUnit class represents a unit of work that can be tracked and monitored for checkpoints.
* It includes methods to create instances, set checkpoints, and manage the state of the work unit.
*/
class WorkUnit {
/**
* Represents a unit of work with checkpointing capabilities.
*
* @class
*/
/**
* Creates and returns a new instance of WorkUnit.
*
* @static
* @returns {WorkUnit} A new instance of WorkUnit.
*/
static create () {
return new WorkUnit();
}
/**
* Creates a new instance of the WorkUnit class.
* @static
* @returns {WorkUnit} A new WorkUnit instance.
*/
constructor () {
this.id = uuidv4();
this.checkpoint_ = null;
}
checkpoint (label) {
console.log('CHECKPOINT', label);
this.checkpoint_ = label;
}
}
/**
* @class CheckpointExpectation
* @classdesc The CheckpointExpectation class is used to represent an expectation that a specific checkpoint
* will be reached during the execution of a work unit. It includes methods to check if the checkpoint has
* been reached and to report the results of this check.
*/
class CheckpointExpectation {
constructor (workUnit, checkpoint) {
this.workUnit = workUnit;
this.checkpoint = checkpoint;
}
/**
* Constructor for CheckpointExpectation class.
* Initializes the instance with a WorkUnit and a checkpoint label.
* @param {WorkUnit} workUnit - The work unit associated with the checkpoint.
* @param {string} checkpoint - The checkpoint label to be checked.
*/
check () {
// TODO: should be true if checkpoint was ever reached
return this.workUnit.checkpoint_ == this.checkpoint;
}
report (log) {
if ( this.check() ) return;
log.log(
`operation(${this.workUnit.id}): ` +
`expected ${quot(this.checkpoint)} ` +
`and got ${quot(this.workUnit.checkpoint_)}.`
);
}
}
/**
* This service helps diagnose errors involving the potentially
* complex relationships between asynchronous operations.
*/
/**
* @class ExpectationService
* @extends BaseService
@ -108,6 +34,10 @@ class CheckpointExpectation {
* runtime behaviors in a system.
*/
class ExpectationService extends BaseService {
static USE = {
expect: 'core.expect'
};
/**
* Constructs the ExpectationService and initializes its internal state.
* This method is intended to be called asynchronously.
@ -119,6 +49,29 @@ class ExpectationService extends BaseService {
this.expectations_ = [];
}
/**
* ExpectationService registers its commands at the consolidation phase because
* the '_init' method of CommandService may not have been called yet.
*/
['__on_boot.consolidation'] () {
const commands = this.services.get('commands');
commands.registerCommands('expectations', [
{
id: 'pending',
description: 'lists pending expectations',
handler: async (args, log) => {
this.purgeExpectations_();
if ( this.expectations_.length < 1 ) {
log.log(`there are none`);
return;
}
for ( const expectation of this.expectations_ ) {
expectation.report(log);
}
}
}
]);
}
/**
* Initializes the ExpectationService, setting up interval functions and registering commands.
@ -145,24 +98,6 @@ class ExpectationService extends BaseService {
setInterval(() => {
this.purgeExpectations_();
}, 1000);
const commands = services.get('commands');
commands.registerCommands('expectations', [
{
id: 'pending',
description: 'lists pending expectations',
handler: async (args, log) => {
this.purgeExpectations_();
if ( this.expectations_.length < 1 ) {
log.log(`there are none`);
return;
}
for ( const expectation of this.expectations_ ) {
expectation.report(log);
}
}
}
]);
}
@ -187,13 +122,12 @@ class ExpectationService extends BaseService {
}
expect_eventually ({ workUnit, checkpoint }) {
this.expectations_.push(new CheckpointExpectation(workUnit, checkpoint));
this.expectations_.push(new this.expect.CheckpointExpectation(workUnit, checkpoint));
}
}
module.exports = {
WorkUnit,
ExpectationService
};

View File

@ -18,9 +18,8 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
const pdjs = require('@pagerduty/pdjs');
const BaseService = require('../BaseService');
const BaseService = require('../../services/BaseService');
const util = require('util');
const { Context } = require('../../util/context');
/**
@ -33,11 +32,24 @@ const { Context } = require('../../util/context');
* command registration.
*/
class PagerService extends BaseService {
static USE = {
Context: 'core.context',
}
async _construct () {
this.config = this.global_config.pager;
this.alertHandlers_ = [];
}
/**
* PagerService registers its commands at the consolidation phase because
* the '_init' method of CommandService may not have been called yet.
*/
['__on_boot.consolidation'] () {
this._register_commands(this.services.get('commands'));
}
/**
* Initializes the PagerService instance by setting the configuration and
* initializing an empty alert handler array.
@ -56,11 +68,8 @@ class PagerService extends BaseService {
}
this.onInit();
this._register_commands(services.get('commands'));
}
/**
* Initializes PagerDuty configuration and registers alert handlers.
* If PagerDuty is enabled in the configuration, it sets up an alert handler
@ -83,7 +92,7 @@ class PagerService extends BaseService {
server_id: this.global_config.server_id,
};
const ctx = Context.get(undefined, { allow_fallback: true });
const ctx = this.Context.get(undefined, { allow_fallback: true });
// Add request payload if any exists
const req = ctx.get('req');

View File

@ -17,8 +17,8 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
const { Context } = require("../../util/context");
const BaseService = require("../../services/BaseService");
/**
* Service class that handles process-wide events and errors.
@ -28,8 +28,13 @@ const { Context } = require("../../util/context");
*
* @class ProcessEventService
*/
class ProcessEventService {
constructor ({ services }) {
class ProcessEventService extends BaseService {
static USE = {
Context: 'core.context',
};
_init () {
const services = this.services;
const log = services.get('log-service').create('process-event-service');
const errors = services.get('error-service').create(log);
@ -44,7 +49,7 @@ class ProcessEventService {
* @param {string} origin - The origin of the uncaught exception
* @returns {Promise<void>}
*/
await Context.allow_fallback(async () => {
await this.Context.allow_fallback(async () => {
errors.report('process:uncaughtException', {
source: err,
origin,
@ -62,7 +67,7 @@ class ProcessEventService {
* @param {Promise} promise - The rejected promise
* @returns {Promise<void>} Resolves when error is reported
*/
await Context.allow_fallback(async () => {
await this.Context.allow_fallback(async () => {
errors.report('process:unhandledRejection', {
source: reason,
promise,

View File

@ -5,4 +5,5 @@ module.exports = {
identutil: require('./identifier.js'),
stdioutil: require('./stdio.js'),
},
expect: require('./expect.js'),
};

View File

@ -0,0 +1,73 @@
// METADATA // {"def":"core.expect"}
/**
* @class WorkUnit
* @description The WorkUnit class represents a unit of work that can be tracked and monitored for checkpoints.
* It includes methods to create instances, set checkpoints, and manage the state of the work unit.
*/
class WorkUnit {
/**
* Represents a unit of work with checkpointing capabilities.
*
* @class
*/
/**
* Creates and returns a new instance of WorkUnit.
*
* @static
* @returns {WorkUnit} A new instance of WorkUnit.
*/
static create () {
return new WorkUnit();
}
/**
* Creates a new instance of the WorkUnit class.
* @static
* @returns {WorkUnit} A new WorkUnit instance.
*/
constructor () {
this.id = uuidv4();
this.checkpoint_ = null;
}
checkpoint (label) {
console.log('CHECKPOINT', label);
this.checkpoint_ = label;
}
}
/**
* @class CheckpointExpectation
* @classdesc The CheckpointExpectation class is used to represent an expectation that a specific checkpoint
* will be reached during the execution of a work unit. It includes methods to check if the checkpoint has
* been reached and to report the results of this check.
*/
class CheckpointExpectation {
constructor (workUnit, checkpoint) {
this.workUnit = workUnit;
this.checkpoint = checkpoint;
}
/**
* Constructor for CheckpointExpectation class.
* Initializes the instance with a WorkUnit and a checkpoint label.
* @param {WorkUnit} workUnit - The work unit associated with the checkpoint.
* @param {string} checkpoint - The checkpoint label to be checked.
*/
check () {
// TODO: should be true if checkpoint was ever reached
return this.workUnit.checkpoint_ == this.checkpoint;
}
report (log) {
if ( this.check() ) return;
log.log(
`operation(${this.workUnit.id}): ` +
`expected ${JSON.stringify(this.checkpoint)} ` +
`and got ${JSON.stringify(this.workUnit.checkpoint_)}.`
);
}
}
module.exports = {
WorkUnit,
CheckpointExpectation,
};

View File

@ -20,7 +20,6 @@ const APIError = require("../../../api/APIError");
const eggspress = require("../../../api/eggspress");
const config = require("../../../config");
const PathResolver = require("./PathResolver");
const { WorkUnit } = require("../../../services/runtime-analysis/ExpectationService");
const { Context } = require("../../../util/context");
const Busboy = require('busboy');
const { BatchExecutor } = require("../../../filesystem/batch/BatchExecutor");

View File

@ -1,69 +0,0 @@
// METADATA // {"ai-commented":{"service":"openai-completion","model":"gpt-4o"}}
/*
* Copyright (C) 2024 Puter Technologies Inc.
*
* This file is part of Puter.
*
* Puter is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
const memwatch = require('@airbnb/node-memwatch');
/**
* The HeapMonService class monitors the application's memory usage,
* utilizing the memwatch library to detect heap memory leaks and
* gather heap statistics at specified intervals. It interfaces with
* logging and alarm services to report memory conditions and
* trigger alerts as necessary.
*/
class HeapMonService {
constructor ({ services, my_config }) {
this.log = services.get('log-service').create('heap-monitor');
this.alarm = services.get('alarm');
let hd, hd_ts;
if ( my_config.heapdiff ) {
hd = new memwatch.HeapDiff();
hd_ts = Date.now();
}
let heapdiff_interval = my_config.heapdiff_interval ?? 1;
heapdiff_interval *= 1000;
memwatch.on('stats', (stats) => {
this.log.info('stats', stats);
(() => {
if ( ! my_config.heapdiff ) return
const now = Date.now();
if ( (now - hd_ts) < heapdiff_interval ) return;
const diff = hd.end();
this.log.info('heapdiff', diff);
hd = new memwatch.HeapDiff();
hd_ts = now;
})();
});
memwatch.on('leak', (info) => {
this.log.error('leak', info);
this.alarm.create('heap-leak', 'memory leak detected', info);
});
}
}
module.exports = { HeapMonService };