From e2f35b4844b10df8dabe14a9d32ca0b9969c3618 Mon Sep 17 00:00:00 2001 From: KernelDeimos Date: Mon, 5 Aug 2024 18:48:49 -0400 Subject: [PATCH] dev: CLink and SLink classes --- doc/contributors/comment_prefixes.md | 5 +- .../src/modules/broadcast/BroadcastService.js | 270 +----------------- .../modules/broadcast/connection/BaseLink.js | 34 +++ .../src/modules/broadcast/connection/CLink.js | 103 +++++++ .../broadcast/connection/KeyPairHelper.js | 72 +++++ .../src/modules/broadcast/connection/SLink.js | 87 ++++++ src/backend/src/traits/ChannelFeature.js | 46 +++ 7 files changed, 360 insertions(+), 257 deletions(-) create mode 100644 src/backend/src/modules/broadcast/connection/BaseLink.js create mode 100644 src/backend/src/modules/broadcast/connection/CLink.js create mode 100644 src/backend/src/modules/broadcast/connection/KeyPairHelper.js create mode 100644 src/backend/src/modules/broadcast/connection/SLink.js create mode 100644 src/backend/src/traits/ChannelFeature.js diff --git a/doc/contributors/comment_prefixes.md b/doc/contributors/comment_prefixes.md index eefb0e4a..7a24afd1 100644 --- a/doc/contributors/comment_prefixes.md +++ b/doc/contributors/comment_prefixes.md @@ -27,4 +27,7 @@ This document will be updated on an _as-needed_ basis. [track-comments.md](../devmeta/track-comments.md) - `wet:` is usesd to track anything that doesn't adhere to the DRY principle; the following message should describe - where similar code is \ No newline at end of file + where similar code is +- `compare():` is used to note differences between other + implementations of a similar idea +- `name:` pedantic commentary on the name of something diff --git a/src/backend/src/modules/broadcast/BroadcastService.js b/src/backend/src/modules/broadcast/BroadcastService.js index 0b393dff..dd7d6634 100644 --- a/src/backend/src/modules/broadcast/BroadcastService.js +++ b/src/backend/src/modules/broadcast/BroadcastService.js @@ -18,257 +18,8 @@ */ const { AdvancedBase } = require("@heyputer/puter-js-common"); const BaseService = require("../../services/BaseService"); - -class KeyPairHelper extends AdvancedBase { - static MODULES = { - tweetnacl: require('tweetnacl'), - }; - - constructor ({ - kpublic, - ksecret, - }) { - super(); - this.kpublic = kpublic; - this.ksecret = ksecret; - this.nonce_ = 0; - } - - to_nacl_key_ (key) { - console.log('WUT', key); - const full_buffer = Buffer.from(key, 'base64'); - - // Remove version byte (assumed to be 0x31 and ignored for now) - const buffer = full_buffer.slice(1); - - return new Uint8Array(buffer); - } - - get naclSecret () { - return this.naclSecret_ ?? ( - this.naclSecret_ = this.to_nacl_key_(this.ksecret)); - } - get naclPublic () { - return this.naclPublic_ ?? ( - this.naclPublic_ = this.to_nacl_key_(this.kpublic)); - } - - write (text) { - const require = this.require; - const nacl = require('tweetnacl'); - - const nonce = nacl.randomBytes(nacl.box.nonceLength); - const message = {}; - - const textUint8 = new Uint8Array(Buffer.from(text, 'utf-8')); - const encryptedText = nacl.box( - textUint8, nonce, - this.naclPublic, this.naclSecret - ); - message.text = Buffer.from(encryptedText); - message.nonce = Buffer.from(nonce); - - return message; - } - - read (message) { - const require = this.require; - const nacl = require('tweetnacl'); - - const arr = nacl.box.open( - new Uint8Array(message.text), - new Uint8Array(message.nonce), - this.naclPublic, - this.naclSecret, - ); - - return Buffer.from(arr).toString('utf-8'); - } -} - -class Peer extends AdvancedBase { - static AUTHENTICATING = Symbol('AUTHENTICATING'); - static ONLINE = Symbol('ONLINE'); - static OFFLINE = Symbol('OFFLINE'); - - static MODULES = { - sioclient: require('socket.io-client'), - crypto: require('crypto'), - }; - - constructor (svc_broadcast, config) { - super(); - this.svc_broadcast = svc_broadcast; - this.log = this.svc_broadcast.log; - this.config = config; - } - - send (data) { - if ( ! this.socket ) return; - const require = this.require; - const crypto = require('crypto'); - const iv = crypto.randomBytes(16); - const cipher = crypto.createCipheriv( - 'aes-256-cbc', - this.aesKey, - iv, - ); - const jsonified = JSON.stringify(data); - let buffers = []; - buffers.push(cipher.update(Buffer.from(jsonified, 'utf-8'))); - buffers.push(cipher.final()); - const buffer = Buffer.concat(buffers); - this.socket.send({ - iv, - message: buffer, - }); - } - - get state () { - try { - if ( this.socket?.connected ) return this.constructor.ONLINE; - } catch (e) { - console.error('could not get peer state', e); - } - return this.constructor.OFFLINE; - } - - connect () { - const address = this.config.address; - const socket = this.modules.sioclient(address, { - transports: ['websocket'], - path: '/wssinternal', - reconnection: true, - extraHeaders: { - ...(this.config.host ? { - Host: this.config.host, - } : {}) - } - }); - socket.on('connect', () => { - this.log.info(`connected`, { - address: this.config.address - }); - - const require = this.require; - const crypto = require('crypto'); - this.aesKey = crypto.randomBytes(32); - - const kp_helper = new KeyPairHelper({ - kpublic: this.config.key, - ksecret: this.svc_broadcast.config.keys.secret, - }); - socket.send({ - $: 'take-my-key', - key: this.svc_broadcast.config.keys.public, - message: kp_helper.write( - this.aesKey.toString('base64') - ), - }); - }); - socket.on('disconnect', () => { - this.log.info(`disconnected`, { - address: this.config.address - }); - }); - socket.on('connect_error', e => { - this.log.info(`connection error`, { - address: this.config.address, - message: e.message, - }); - }); - socket.on('error', e => { - this.log.info('error', { - message: e.message, - }); - }); - - this.socket = socket; - } -} - -class Connection extends AdvancedBase { - static MODULES = { - crypto: require('crypto'), - } - - static AUTHENTICATING = { - on_message (data) { - if ( data.$ !== 'take-my-key' ) { - this.disconnect(); - return; - } - - const hasKey = this.svc_broadcast.trustedPublicKeys_[data.key]; - if ( ! hasKey ) { - this.disconnect(); - return; - } - - const is_trusted = - this.svc_broadcast.trustedPublicKeys_ - .hasOwnProperty(data.key) - if ( ! is_trusted ) { - this.disconnect(); - return; - } - - const kp_helper = new KeyPairHelper({ - kpublic: data.key, - ksecret: this.svc_broadcast.config.keys.secret, - }); - - const message = kp_helper.read(data.message); - this.aesKey = Buffer.from(message, 'base64'); - - this.state = this.constructor.ONLINE; - } - } - static ONLINE = { - on_message (data) { - if ( ! this.on_message ) return; - - const require = this.require; - const crypto = require('crypto'); - const decipher = crypto.createDecipheriv( - 'aes-256-cbc', - this.aesKey, - data.iv, - ) - const buffers = []; - buffers.push(decipher.update(data.message)); - buffers.push(decipher.final()); - - const rawjson = Buffer.concat(buffers).toString('utf-8'); - - const output = JSON.parse(rawjson); - - this.on_message(output); - } - } - static OFFLINE = { - on_message () { - throw new Error('unexpected message'); - } - } - - constructor (svc_broadcast, socket) { - super(); - this.state = this.constructor.AUTHENTICATING; - this.svc_broadcast = svc_broadcast; - this.log = this.svc_broadcast.log; - this.socket = socket; - - socket.on('message', data => { - this.state.on_message.call(this, data); - }); - } - - disconnect () { - this.socket.disconnect(true); - this.state = this.constructor.OFFLINE; - } -} +const { CLink } = require("./connection/CLink"); +const { SLink } = require("./connection/SLink"); class BroadcastService extends BaseService { static MODULES = { @@ -286,7 +37,11 @@ class BroadcastService extends BaseService { const peers = this.config.peers ?? []; for ( const peer_config of peers ) { this.trustedPublicKeys_[peer_config.key] = true; - const peer = new Peer(this, peer_config); + const peer = new CLink({ + keys: this.config.keys, + config: peer_config, + log: this.log, + }); this.peers_.push(peer); peer.connect(); } @@ -301,7 +56,6 @@ class BroadcastService extends BaseService { if ( meta.from_outside ) return; for ( const peer of this.peers_ ) { - if ( peer.state !== Peer.ONLINE ) continue; peer.send({ key, data, meta }); } } @@ -318,10 +72,14 @@ class BroadcastService extends BaseService { }); io.on('connection', async socket => { - const conn = new Connection(this, socket); + const conn = new SLink({ + keys: this.config.keys, + trustedKeys: this.trustedPublicKeys_, + socket, + }); this.connections_.push(conn); - conn.on_message = ({ key, data, meta }) => { + conn.channels.message.on(({ key, data, meta }) => { if ( meta.from_outside ) { this.log.noticeme('possible over-sending'); return; @@ -335,7 +93,7 @@ class BroadcastService extends BaseService { meta.from_outside = true; svc_event.emit(key, data, meta); - }; + }); }); diff --git a/src/backend/src/modules/broadcast/connection/BaseLink.js b/src/backend/src/modules/broadcast/connection/BaseLink.js new file mode 100644 index 00000000..0f391886 --- /dev/null +++ b/src/backend/src/modules/broadcast/connection/BaseLink.js @@ -0,0 +1,34 @@ +const { AdvancedBase } = require("@heyputer/puter-js-common"); +const { ChannelFeature } = require("../../../traits/ChannelFeature"); + +class BaseLink extends AdvancedBase { + static FEATURES = [ + new ChannelFeature(), + ]; + static CHANNELS = ['message']; + + static MODULES = { + crypto: require('crypto'), + }; + + static AUTHENTICATING = {}; + static ONLINE = {}; + static OFFLINE = {}; + + send (data) { + if ( this.state !== this.constructor.ONLINE ) { + return false; + } + + return this._send(data); + } + + constructor () { + super(); + this.state = this.constructor.AUTHENTICATING; + } +} + +module.exports = { + BaseLink, +}; diff --git a/src/backend/src/modules/broadcast/connection/CLink.js b/src/backend/src/modules/broadcast/connection/CLink.js new file mode 100644 index 00000000..e6675546 --- /dev/null +++ b/src/backend/src/modules/broadcast/connection/CLink.js @@ -0,0 +1,103 @@ +const { BaseLink } = require("./BaseLink"); +const { KeyPairHelper } = require("./KeyPairHelper"); + +class CLink extends BaseLink { + static MODULES = { + sioclient: require('socket.io-client'), + }; + + _send (data) { + if ( ! this.socket ) return; + const require = this.require; + const crypto = require('crypto'); + const iv = crypto.randomBytes(16); + const cipher = crypto.createCipheriv( + 'aes-256-cbc', + this.aesKey, + iv, + ); + const jsonified = JSON.stringify(data); + let buffers = []; + buffers.push(cipher.update(Buffer.from(jsonified, 'utf-8'))); + buffers.push(cipher.final()); + const buffer = Buffer.concat(buffers); + this.socket.send({ + iv, + message: buffer, + }); + } + + constructor ({ + keys, + log, + config, + }) { + super(); + // keys of client (local) + this.keys = keys; + // keys of server (remote) + this.config = config; + this.log = log; + } + + connect () { + const address = this.config.address; + const socket = this.modules.sioclient(address, { + transports: ['websocket'], + path: '/wssinternal', + reconnection: true, + extraHeaders: { + ...(this.config.host ? { + Host: this.config.host, + } : {}) + } + }); + socket.on('connect', () => { + this.log.info(`connected`, { + address: this.config.address + }); + + const require = this.require; + const crypto = require('crypto'); + this.aesKey = crypto.randomBytes(32); + + const kp_helper = new KeyPairHelper({ + kpublic: this.config.key, + ksecret: this.keys.secret, + }); + socket.send({ + $: 'take-my-key', + key: this.keys.public, + message: kp_helper.write( + this.aesKey.toString('base64') + ), + }); + this.state = this.constructor.ONLINE; + }); + socket.on('disconnect', () => { + this.log.info(`disconnected`, { + address: this.config.address + }); + }); + socket.on('connect_error', e => { + this.log.info(`connection error`, { + address: this.config.address, + message: e.message, + }); + }); + socket.on('error', e => { + this.log.info('error', { + message: e.message, + }); + }); + socket.on('message', data => { + if ( this.state.on_message ) { + this.state.on_message.call(this, data); + } + }); + + this.socket = socket; + } +} + +module.exports = { CLink }; diff --git a/src/backend/src/modules/broadcast/connection/KeyPairHelper.js b/src/backend/src/modules/broadcast/connection/KeyPairHelper.js new file mode 100644 index 00000000..ab508168 --- /dev/null +++ b/src/backend/src/modules/broadcast/connection/KeyPairHelper.js @@ -0,0 +1,72 @@ +const { AdvancedBase } = require('@heyputer/puter-js-common'); + +class KeyPairHelper extends AdvancedBase { + static MODULES = { + tweetnacl: require('tweetnacl'), + }; + + constructor ({ + kpublic, + ksecret, + }) { + super(); + this.kpublic = kpublic; + this.ksecret = ksecret; + this.nonce_ = 0; + } + + to_nacl_key_ (key) { + console.log('WUT', key); + const full_buffer = Buffer.from(key, 'base64'); + + // Remove version byte (assumed to be 0x31 and ignored for now) + const buffer = full_buffer.slice(1); + + return new Uint8Array(buffer); + } + + get naclSecret () { + return this.naclSecret_ ?? ( + this.naclSecret_ = this.to_nacl_key_(this.ksecret)); + } + get naclPublic () { + return this.naclPublic_ ?? ( + this.naclPublic_ = this.to_nacl_key_(this.kpublic)); + } + + write (text) { + const require = this.require; + const nacl = require('tweetnacl'); + + const nonce = nacl.randomBytes(nacl.box.nonceLength); + const message = {}; + + const textUint8 = new Uint8Array(Buffer.from(text, 'utf-8')); + const encryptedText = nacl.box( + textUint8, nonce, + this.naclPublic, this.naclSecret + ); + message.text = Buffer.from(encryptedText); + message.nonce = Buffer.from(nonce); + + return message; + } + + read (message) { + const require = this.require; + const nacl = require('tweetnacl'); + + const arr = nacl.box.open( + new Uint8Array(message.text), + new Uint8Array(message.nonce), + this.naclPublic, + this.naclSecret, + ); + + return Buffer.from(arr).toString('utf-8'); + } +} + +module.exports = { + KeyPairHelper, +}; diff --git a/src/backend/src/modules/broadcast/connection/SLink.js b/src/backend/src/modules/broadcast/connection/SLink.js new file mode 100644 index 00000000..1bf0c589 --- /dev/null +++ b/src/backend/src/modules/broadcast/connection/SLink.js @@ -0,0 +1,87 @@ +const { BaseLink } = require("./BaseLink"); +const { KeyPairHelper } = require("./KeyPairHelper"); + +class SLink extends BaseLink { + static AUTHENTICATING = { + on_message (data) { + if ( data.$ !== 'take-my-key' ) { + this.disconnect(); + return; + } + + const trustedKeys = this.trustedKeys; + + const hasKey = trustedKeys[data.key]; + if ( ! hasKey ) { + this.disconnect(); + return; + } + + const is_trusted = trustedKeys.hasOwnProperty(data.key) + if ( ! is_trusted ) { + this.disconnect(); + return; + } + + const kp_helper = new KeyPairHelper({ + kpublic: data.key, + ksecret: this.keys.secret, + }); + + const message = kp_helper.read(data.message); + this.aesKey = Buffer.from(message, 'base64'); + + this.state = this.constructor.ONLINE; + } + }; + static ONLINE = { + on_message (data) { + const require = this.require; + const crypto = require('crypto'); + const decipher = crypto.createDecipheriv( + 'aes-256-cbc', + this.aesKey, + data.iv, + ) + const buffers = []; + buffers.push(decipher.update(data.message)); + buffers.push(decipher.final()); + + const rawjson = Buffer.concat(buffers).toString('utf-8'); + + const output = JSON.parse(rawjson); + + this.channels.message.emit(output); + } + } + static OFFLINE = { + on_message () { + throw new Error('unexpected message'); + } + } + + _send () { + // TODO: implement as a fallback + throw new Error('cannot send via SLink yet'); + } + + constructor ({ + keys, + trustedKeys, + socket, + }) { + super(); + this.state = this.constructor.AUTHENTICATING; + // Keys of server (local) + this.keys = keys; + // Allowed client keys (remote) + this.trustedKeys = trustedKeys; + this.socket = socket; + + socket.on('message', data => { + this.state.on_message.call(this, data); + }); + } +} + +module.exports = { SLink }; diff --git a/src/backend/src/traits/ChannelFeature.js b/src/backend/src/traits/ChannelFeature.js new file mode 100644 index 00000000..a78435d8 --- /dev/null +++ b/src/backend/src/traits/ChannelFeature.js @@ -0,0 +1,46 @@ +// name: 'Channel' does not behave the same as Golang's channel construct; it +// behaves more like an EventEmitter. +class Channel { + constructor () { + this.listeners_ = []; + } + + // compare(EventService): EventService has an 'on' method, + // but it accepts a 'selector' argument to narrow the scope of events + on (callback) { + // wet: EventService also creates an object like this + const det = { + detach: () => { + const idx = this.listeners_.indexOf(callback); + if ( idx !== -1 ) { + this.listeners_.splice(idx, 1); + } + } + }; + + this.listeners_.push(callback); + + return det; + } + + emit (...a) { + for ( const lis of this.listeners_ ) { + lis(...a); + } + } +} + +class ChannelFeature { + install_in_instance (instance) { + const channels = instance._get_merged_static_array('CHANNELS'); + + instance.channels = {}; + for ( const name of channels ) { + instance.channels[name] = new Channel(name); + } + } +} + +module.exports = { + ChannelFeature, +};