dev: ll_copy -> provider.copy_tree

This commit is contained in:
KernelDeimos 2025-01-03 14:27:20 -05:00
parent 8a9164d7c5
commit 71f221dd07
3 changed files with 188 additions and 147 deletions

View File

@ -11,6 +11,11 @@ const capabilityNames = [
'symlink',
'trash',
// Macro Capabilities
'copy-tree',
'move-tree',
'remove-tree',
// Behavior Capabilities
'case-sensitive',

View File

@ -24,6 +24,7 @@ const { NodeUIDSelector } = require('../node/selectors');
const { RESOURCE_STATUS_PENDING_CREATE } = require('../../modules/puterfs/ResourceService');
const { UploadProgressTracker } = require('../storage/UploadProgressTracker');
const { LLFilesystemOperation } = require('./definitions');
const fsCapabilities = require('../definitions/capabilities');
class LLCopy extends LLFilesystemOperation {
static MODULES = {
@ -74,154 +75,18 @@ class LLCopy extends LLFilesystemOperation {
}
}
const raw_fsentry = {
uuid,
is_dir: source.entry.is_dir,
...(source.entry.is_shortcut ? {
is_shortcut: source.entry.is_shortcut,
shortcut_to: source.entry.shortcut_to,
} :{}),
parent_uid: parent.uid,
name: target_name,
created: ts,
modified: ts,
path: _path.join(await parent.get('path'), target_name),
// if property exists but the value is undefined,
// it will still be included in the INSERT, causing
// an error
...(source.entry.thumbnail ?
{ thumbnail: source.entry.thumbnail } : {}),
user_id: user.id,
};
svc_event.emit('fs.pending.file', {
fsentry: FSNodeContext.sanitize_pending_entry_info(raw_fsentry),
context: this.context,
})
this.checkpoint('emitted fs.pending.file');
if ( await source.get('has-s3') ) {
Object.assign(raw_fsentry, {
size: source.entry.size,
associated_app_id: source.entry.associated_app_id,
bucket: source.entry.bucket,
bucket_region: source.entry.bucket_region,
});
await tracer.startActiveSpan(`fs:cp:storage-copy`, async span => {
let progress_tracker = new UploadProgressTracker();
svc_event.emit('fs.storage.progress.copy', {
upload_tracker: progress_tracker,
context: Context.get(),
meta: {
item_uid: uuid,
item_path: raw_fsentry.path,
}
});
this.checkpoint('emitted fs.storage.progress.copy');
// const storage = new PuterS3StorageStrategy({ services: svc });
const storage = Context.get('storage');
const state_copy = storage.create_copy();
await state_copy.run({
src_node: source,
dst_storage: {
key: uuid,
bucket: raw_fsentry.bucket,
bucket_region: raw_fsentry.bucket_region,
},
storage_api: { progress_tracker },
});
this.checkpoint('finished storage copy');
span.end();
const capabilities = source.provider.get_capabilities();
if ( capabilities.has(fsCapabilities.COPY_TREE) ) {
const result_node = await source.provider.copy_tree({
context,
source,
parent,
target_name,
});
return result_node;
} else {
throw new Error('only copy_tree is current supported by ll_copy');
}
{
const svc_size = svc.get('sizeService');
await svc_size.add_node_size(undefined, source, user);
this.checkpoint('added source size');
}
const svc_resource = svc.get('resourceService');
svc_resource.register({
uid: uuid,
status: RESOURCE_STATUS_PENDING_CREATE,
});
const svc_fsEntry = svc.get('fsEntryService');
this.log.info(`inserting entry: ` + uuid);
const entryOp = await svc_fsEntry.insert(raw_fsentry);
let node;
this.checkpoint('before parallel tasks');
const tasks = new ParallelTasks({ tracer, max: 4 });
await Context.arun(`fs:cp:parallel-portion`, async () => {
this.checkpoint('starting parallel tasks');
// Add child copy tasks if this is a directory
if ( source.entry.is_dir ) {
const fsEntryService = svc.get('fsEntryService');
const children = await fsEntryService.fast_get_direct_descendants(
source.uid
);
for ( const child_uuid of children ) {
tasks.add(`fs:cp:copy-child`, async () => {
const child_node = await fs.node(
new NodeUIDSelector(child_uuid)
);
const child_name = await child_node.get('name');
// TODO: this should be LLCopy instead
const ll_copy = new LLCopy();
await ll_copy.run({
source: await fs.node(
new NodeUIDSelector(child_uuid)
),
parent: await fs.node(
new NodeUIDSelector(uuid)
),
user,
target_name: child_name,
});
});
}
}
// Add task to await entry
tasks.add(`fs:cp:entry-op`, async () => {
await entryOp.awaitDone();
svc_resource.free(uuid);
this.log.info(`done inserting entry: ` + uuid);
const copy_fsNode = await fs.node(new NodeUIDSelector(uuid));
copy_fsNode.entry = raw_fsentry;
copy_fsNode.found = true;
copy_fsNode.path = raw_fsentry.path;
node = copy_fsNode;
svc_event.emit('fs.create.file', {
node,
context: this.context,
})
}, { force: true });
this.checkpoint('waiting for parallel tasks');
await tasks.awaitAll();
this.checkpoint('finishing up');
});
node = node || await fs.node(new NodeUIDSelector(uuid));
// TODO: What event do we emit? How do we know if we're overwriting?
return node;
}
}

View File

@ -5,8 +5,17 @@ const { TDetachable } = putility.traits;
const { NodeInternalIDSelector, NodeChildSelector, NodeUIDSelector, RootNodeSelector, NodePathSelector } = require("../../../filesystem/node/selectors");
const { Context } = require("../../../util/context");
const fsCapabilities = require('../../../filesystem/definitions/capabilities');
const { UploadProgressTracker } = require('../../../filesystem/storage/UploadProgressTracker');
const FSNodeContext = require('../../../filesystem/FSNodeContext');
const { RESOURCE_STATUS_PENDING_CREATE } = require('../ResourceService');
const { ParallelTasks } = require('../../../util/otelutil');
class PuterFSProvider extends putility.AdvancedBase {
static MODULES = {
_path: require('path'),
uuidv4: require('uuid').v4,
}
class PuterFSProvider {
get_capabilities () {
return new Set([
fsCapabilities.THUMBNAIL,
@ -14,6 +23,8 @@ class PuterFSProvider {
fsCapabilities.OPERATION_TRACE,
fsCapabilities.READDIR_UUID_MODE,
fsCapabilities.COPY_TREE,
fsCapabilities.READ,
fsCapabilities.WRITE,
fsCapabilities.CASE_SENSITIVE,
@ -121,6 +132,166 @@ class PuterFSProvider {
.fast_get_direct_descendants(uuid);
return child_uuids;
}
async copy_tree ({ context, source, parent, target_name }) {
return await this.copy_tree_(
{ context, source, parent, target_name });
}
async copy_tree_ ({ context, source, parent, target_name }) {
// Modules
const { _path, uuidv4 } = this.modules;
// Services
const services = context.get('services');
const svc_event = services.get('event');
const svc_trace = services.get('traceService');
const svc_size = services.get('sizeService');
const svc_resource = services.get('resourceService');
const svc_fsEntry = services.get('fsEntryService');
const svc_fs = services.get('filesystem');
// Context
const actor = Context.get('actor');
const user = actor.type.user;
const tracer = svc_trace.tracer;
const uuid = uuidv4();
const ts = Math.round(Date.now()/1000);
await parent.fetchEntry();
await source.fetchEntry({ thumbnail: true });
// New filesystem entry
const raw_fsentry = {
uuid,
is_dir: source.entry.is_dir,
...(source.entry.is_shortcut ? {
is_shortcut: source.entry.is_shortcut,
shortcut_to: source.entry.shortcut_to,
} :{}),
parent_uid: parent.uid,
name: target_name,
created: ts,
modified: ts,
path: _path.join(await parent.get('path'), target_name),
// if property exists but the value is undefined,
// it will still be included in the INSERT, causing
// an error
...(source.entry.thumbnail ?
{ thumbnail: source.entry.thumbnail } : {}),
user_id: user.id,
};
svc_event.emit('fs.pending.file', {
fsentry: FSNodeContext.sanitize_pending_entry_info(raw_fsentry),
context: context,
})
if ( await source.get('has-s3') ) {
Object.assign(raw_fsentry, {
size: source.entry.size,
associated_app_id: source.entry.associated_app_id,
bucket: source.entry.bucket,
bucket_region: source.entry.bucket_region,
});
await tracer.startActiveSpan(`fs:cp:storage-copy`, async span => {
let progress_tracker = new UploadProgressTracker();
svc_event.emit('fs.storage.progress.copy', {
upload_tracker: progress_tracker,
context,
meta: {
item_uid: uuid,
item_path: raw_fsentry.path,
}
});
// const storage = new PuterS3StorageStrategy({ services: svc });
const storage = context.get('storage');
const state_copy = storage.create_copy();
await state_copy.run({
src_node: source,
dst_storage: {
key: uuid,
bucket: raw_fsentry.bucket,
bucket_region: raw_fsentry.bucket_region,
},
storage_api: { progress_tracker },
});
span.end();
});
}
{
await svc_size.add_node_size(undefined, source, user);
}
svc_resource.register({
uid: uuid,
status: RESOURCE_STATUS_PENDING_CREATE,
});
const entryOp = await svc_fsEntry.insert(raw_fsentry);
let node;
const tasks = new ParallelTasks({ tracer, max: 4 });
await context.arun(`fs:cp:parallel-portion`, async () => {
// Add child copy tasks if this is a directory
if ( source.entry.is_dir ) {
const children = await svc_fsEntry.fast_get_direct_descendants(
source.uid
);
for ( const child_uuid of children ) {
tasks.add(`fs:cp:copy-child`, async () => {
const child_node = await svc_fs.node(
new NodeUIDSelector(child_uuid)
);
const child_name = await child_node.get('name');
// TODO: this should be LLCopy instead
await this.copy_tree_({
context,
source: await svc_fs.node(
new NodeUIDSelector(child_uuid)
),
parent: await svc_fs.node(
new NodeUIDSelector(uuid)
),
target_name: child_name,
});
});
}
}
// Add task to await entry
tasks.add(`fs:cp:entry-op`, async () => {
await entryOp.awaitDone();
svc_resource.free(uuid);
const copy_fsNode = await svc_fs.node(new NodeUIDSelector(uuid));
copy_fsNode.entry = raw_fsentry;
copy_fsNode.found = true;
copy_fsNode.path = raw_fsentry.path;
node = copy_fsNode;
svc_event.emit('fs.create.file', {
node,
context,
})
}, { force: true });
await tasks.awaitAll();
});
node = node || await svc_fs.node(new NodeUIDSelector(uuid));
// TODO: What event do we emit? How do we know if we're overwriting?
return node;
}
}
module.exports = {