diff --git a/packages/backend/package.json b/packages/backend/package.json index 9b329cfd..daa5753a 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -60,6 +60,7 @@ "response-time": "^2.3.2", "seedrandom": "^3.0.5", "socket.io": "^4.6.2", + "socket.io-client": "^4.6.2", "ssh2": "^1.13.0", "string-hash": "^1.1.3", "string-length": "^6.0.0", diff --git a/packages/backend/src/CoreModule.js b/packages/backend/src/CoreModule.js index e8bcea17..cba3c855 100644 --- a/packages/backend/src/CoreModule.js +++ b/packages/backend/src/CoreModule.js @@ -239,6 +239,9 @@ const install = async ({ services, app, useapi }) => { const { ScriptService } = require('./services/ScriptService'); services.registerService('script', ScriptService); + + const { BroadcastService } = require('./services/BroadcastService'); + services.registerService('broadcast', BroadcastService); } const install_legacy = async ({ services }) => { diff --git a/packages/backend/src/services/BroadcastService.js b/packages/backend/src/services/BroadcastService.js new file mode 100644 index 00000000..af4af35c --- /dev/null +++ b/packages/backend/src/services/BroadcastService.js @@ -0,0 +1,131 @@ +const { AdvancedBase } = require("@heyputer/puter-js-common"); +const { Endpoint } = require("../util/expressutil"); +const { UserActorType } = require("./auth/Actor"); +const BaseService = require("./BaseService"); + +class Peer extends AdvancedBase { + static ONLINE = Symbol('ONLINE'); + static OFFLINE = Symbol('OFFLINE'); + + static MODULES = { + sioclient: require('socket.io-client'), + }; + + constructor (svc_broadcast, config) { + super(); + this.svc_broadcast = svc_broadcast; + this.log = this.svc_broadcast.log; + this.config = config; + } + + send (data) { + if ( ! this.socket ) return; + this.socket.send(data) + } + + get state () { + try { + if ( this.socket?.connected ) return this.constructor.ONLINE; + } catch (e) { + console.error('could not get peer state', e); + } + return this.constructor.OFFLINE; + } + + connect () { + const address = this.config.address; + const socket = this.modules.sioclient(address, { + transports: ['websocket'], + path: '/wssinternal', + reconnection: true, + extraHeaders: { + ...(this.config.host ? { + Host: this.config.host, + } : {}) + } + }); + socket.on('connect', () => { + this.log.info(`connected`, { + address: this.config.address + }); + }); + socket.on('disconnect', () => { + this.log.info(`disconnected`, { + address: this.config.address + }); + }); + socket.on('connect_error', e => { + this.log.info(`connection error`, { + address: this.config.address, + message: e.message, + }); + console.log(e); + }); + socket.on('error', e => { + this.log.info('error', { + message: e.message, + }); + }); + + this.socket = socket; + } +} + +class BroadcastService extends BaseService { + static MODULES = { + express: require('express'), + // ['socket.io']: require('socket.io'), + }; + + _construct () { + this.peers_ = []; + } + + async _init () { + for ( const peer_config of this.config.peers ) { + const peer = new Peer(this, peer_config); + this.peers_.push(peer); + peer.connect(); + } + + const svc_event = this.services.get('event'); + svc_event.on('outer.*', this.on_event.bind(this)); + } + + async on_event (key, data, meta) { + if ( meta.from_outside ) return; + + for ( const peer of this.peers_ ) { + if ( peer.state !== Peer.ONLINE ) continue; + peer.send({ key, data, meta }); + } + } + + async ['__on_install.websockets'] (_, { server }) { + const svc_event = this.services.get('event'); + + const io = require('socket.io')(server, { + cors: { origin: '*' }, + path: '/wssinternal', + }); + + io.on('connection', async socket => { + socket.on('message', ({ key, data, meta }) => { + if ( meta.from_outside ) { + this.log.noticeme('possible over-sending'); + return; + } + + meta.from_outside = true; + svc_event.emit(key, data, meta); + }); + }); + + + this.log.noticeme( + require('node:util').inspect(this.config) + ); + } +} + +module.exports = { BroadcastService }; diff --git a/packages/backend/src/services/EventService.js b/packages/backend/src/services/EventService.js index 0b07a08b..d668b992 100644 --- a/packages/backend/src/services/EventService.js +++ b/packages/backend/src/services/EventService.js @@ -36,9 +36,11 @@ class ScopedEventBus { class EventService extends BaseService { async _construct () { this.listeners_ = {}; + this.global_listeners_ = []; } - emit (key, data) { + emit (key, data, meta) { + meta = meta ?? {}; const parts = key.split('.'); for ( let i = 0; i < parts.length; i++ ) { const part = i === parts.length - 1 @@ -55,7 +57,7 @@ class EventService extends BaseService { // event dispatch. (async () => { try { - await callback(key, data); + await callback(key, data, meta); } catch (e) { this.errors.report('event-service.emit', { source: e, @@ -66,6 +68,22 @@ class EventService extends BaseService { })(); } } + + for ( const callback of this.global_listeners_ ) { + // IIAFE wrapper to catch errors without blocking + // event dispatch. + (async () => { + try { + await callback(key, data, meta); + } catch (e) { + this.errors.report('event-service.emit', { + source: e, + trace: true, + alarm: true, + }); + } + })(); + } } @@ -86,6 +104,10 @@ class EventService extends BaseService { return det; } + + on_all (callback) { + this.global_listeners_.push(callback); + } get_scoped (scope) { return new ScopedEventBus(this, scope); diff --git a/packages/backend/src/services/WSPushService.js b/packages/backend/src/services/WSPushService.js index ba06eb90..e563d2ff 100644 --- a/packages/backend/src/services/WSPushService.js +++ b/packages/backend/src/services/WSPushService.js @@ -36,6 +36,8 @@ class WSPushService extends AdvancedBase { this._on_upload_progress.bind(this)); this.svc_event.on('fs.storage.progress.*', this._on_upload_progress.bind(this)); + this.svc_event.on('outer.gui.*', + this._on_outer_gui.bind(this)); } async _on_fs_create (key, data) { @@ -70,6 +72,11 @@ class WSPushService extends AdvancedBase { for ( const user_id of user_id_list ) { io.to(user_id).emit('item.added', response); } + + this.svc_event.emit('outer.gui.item.added', { + user_id_list, + response, + }); } async _on_fs_update (key, data) { @@ -104,6 +111,11 @@ class WSPushService extends AdvancedBase { for ( const user_id of user_id_list ) { io.to(user_id).emit('item.updated', response); } + + this.svc_event.emit('outer.gui.item.updated', { + user_id_list, + response, + }); } async _on_fs_move (key, data) { @@ -139,6 +151,11 @@ class WSPushService extends AdvancedBase { for ( const user_id of user_id_list ) { io.to(user_id).emit('item.moved', response); } + + this.svc_event.emit('outer.gui.item.moved', { + user_id_list, + response, + }); } async _on_fs_pending (key, data) { @@ -172,6 +189,10 @@ class WSPushService extends AdvancedBase { for ( const user_id of user_id_list ) { io.to(user_id).emit('item.pending', response); } + this.svc_event.emit('outer.gui.item.pending', { + user_id_list, + response, + }); } async _on_upload_progress (key, data) { @@ -225,6 +246,19 @@ class WSPushService extends AdvancedBase { }) }) } + + async _on_outer_gui (key, { user_id_list, response }, meta) { + if ( ! meta.from_outside ) return; + + key = key.slice('outer.gui.'.length); + + const { socketio } = this.modules; + + const io = socketio.getio(); + for ( const user_id of user_id_list ) { + io.to(user_id).emit(key, response); + } + } } module.exports = { diff --git a/packages/backend/src/services/WebServerService.js b/packages/backend/src/services/WebServerService.js index 1a13571c..b92142ee 100644 --- a/packages/backend/src/services/WebServerService.js +++ b/packages/backend/src/services/WebServerService.js @@ -183,6 +183,8 @@ class WebServerService extends BaseService { socket.broadcast.to(socket.user.id).emit('trash.is_empty', msg); }); }); + + await this.services.emit('install.websockets', { server }); } async _init () {