Skip to content

Commit

Permalink
NSFS | versioning | calculate multipart upload version by creation time
Browse files Browse the repository at this point in the history
Signed-off-by: nadav mizrahi <[email protected]>
  • Loading branch information
nadavMiz committed Jan 19, 2025
1 parent 52887eb commit 5bcf21d
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 2 deletions.
3 changes: 3 additions & 0 deletions src/api/common_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,9 @@ module.exports = {
},
safeunlink: {
$ref: 'common_api#/definitions/op_stats_val'
},
utimensat: {
$ref: 'common_api#/definitions/op_stats_val'
}
}
},
Expand Down
46 changes: 46 additions & 0 deletions src/native/fs/fs_napi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <sys/fcntl.h>
#include <sys/stat.h>
#include <sys/syscall.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <sys/xattr.h>
Expand Down Expand Up @@ -1351,6 +1352,48 @@ struct GetPwName : public FSWorker
}
};

/**
* Utimensat is an fs op
* change file times with nanosecond precision (access and modification times)
* see https://man7.org/linux/man-pages/man3/utimensat.3p.html
*/
struct Utimensat : public FSWorker
{
std::string _path;
struct timespec _times[2];
Utimensat(const Napi::CallbackInfo& info)
: FSWorker(info)
{
_path = info[1].As<Napi::String>();
Napi::Object napi_times = info[2].As<Napi::Object>();
std::string new_atime;
get_time_from_info(napi_times, "atime", _times[0], new_atime);
std::string new_mtime;
get_time_from_info(napi_times, "mtime", _times[1], new_mtime);
Begin(XSTR() << "utimensat " << DVAL(_path) << DVAL(new_atime) << DVAL(new_mtime));
}
virtual void Work()
{
SYSCALL_OR_RETURN(utimensat(AT_FDCWD, _path.c_str(), _times, 0));
}

private:
void get_time_from_info(const Napi::Object& napi_times, const std::string& time_type, struct timespec& timeval, std::string& log_str) {
if(napi_times.Get(time_type).IsBigInt()) {
const Napi::BigInt bigint = napi_times.Get(time_type).As<Napi::BigInt>();
//TODO: handle lossless
bool lossless = true;
const int64_t time = bigint.Int64Value(&lossless);
timeval.tv_sec = time / int64_t(1e9);
timeval.tv_nsec = time % int64_t(1e9);
log_str = std::to_string(time);
} else {
timeval.tv_nsec = UTIME_OMIT;
log_str = "UTIME_OMIT";
}
}
};

struct FileWrap : public Napi::ObjectWrap<FileWrap>
{
std::string _path;
Expand All @@ -1374,6 +1417,7 @@ struct FileWrap : public Napi::ObjectWrap<FileWrap>
InstanceMethod<&FileWrap::flock>("flock"),
InstanceMethod<&FileWrap::fcntllock>("fcntllock"),
InstanceAccessor<&FileWrap::getfd>("fd"),
InstanceMethod<&FileWrap::utimensat>("utimensat"),
}));
constructor.SuppressDestruct();
}
Expand Down Expand Up @@ -1403,6 +1447,7 @@ struct FileWrap : public Napi::ObjectWrap<FileWrap>
Napi::Value getfd(const Napi::CallbackInfo& info);
Napi::Value flock(const Napi::CallbackInfo& info);
Napi::Value fcntllock(const Napi::CallbackInfo& info);
Napi::Value utimensat(const Napi::CallbackInfo& info);
};

Napi::FunctionReference FileWrap::constructor;
Expand Down Expand Up @@ -2320,6 +2365,7 @@ fs_napi(Napi::Env env, Napi::Object exports)
exports_fs["getsinglexattr"] = Napi::Function::New(env, api<GetSingleXattr>);
exports_fs["getpwname"] = Napi::Function::New(env, api<GetPwName>);
exports_fs["symlink"] = Napi::Function::New(env, api<Symlink>);
exports_fs["utimensat"] = Napi::Function::New(env, api<Utimensat>);

FileWrap::init(env);
exports_fs["open"] = Napi::Function::New(env, api<FileOpen>);
Expand Down
31 changes: 29 additions & 2 deletions src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -1505,6 +1505,16 @@ class NamespaceFS {
await this._delete_null_version_from_versions_directory(key, fs_context);
}
}
//in case new version is not the latest move straight to .versions dir. can happen for multipart upload
const prev_version_info = latest_ver_info || await this.find_max_version_past(fs_context, key);
if (this._is_versioning_enabled() && prev_version_info &&
this._is_version_more_recent(prev_version_info, new_ver_info)) {
const new_versioned_path = this._get_version_path(key, new_ver_info.version_id_str);
dbg.log1('NamespaceFS._move_to_dest_version version ID of key is not the latest move to .versions ');
await native_fs_utils.safe_move(fs_context, new_ver_tmp_path, new_versioned_path, new_ver_info,
gpfs_options?.move_source_to_version, bucket_tmp_dir_path);
break; //since moving staight to .version, no need to change the latest version
}
if (latest_ver_info &&
((this._is_versioning_enabled()) ||
(this._is_versioning_suspended() && latest_ver_info.version_id_str !== NULL_VERSION_ID))) {
Expand Down Expand Up @@ -1865,9 +1875,14 @@ class NamespaceFS {
}
if (!target_file) target_file = await native_fs_utils.open_file(fs_context, this.bucket_path, upload_path, open_mode);

const create_object_upload_path = path.join(params.mpu_path, 'create_object_upload');
const create_object_upload_stat = await nb_native().fs.stat(fs_context, create_object_upload_path);
//according to aws, multipart upload version time should be calculated based on creation rather then completion time.
//change the files mtime to match the creation time
await nb_native().fs.utimensat(fs_context, upload_path, {modtime: create_object_upload_stat.mtimeNsBigint});
const { data: create_params_buffer } = await nb_native().fs.readFile(
fs_context,
path.join(params.mpu_path, 'create_object_upload')
create_object_upload_path
);

const upload_params = { fs_context, upload_path, open_mode, file_path, params, target_file };
Expand Down Expand Up @@ -2725,6 +2740,17 @@ class NamespaceFS {
return { mtimeNsBigint: size_utils.string_to_bigint(arr[0], 36), ino: parseInt(arr[1], 36) };
}

/**
* _is_version_more_recent compares two version strings
* returns true if ver1 is more recent than ver2, otherwise return false
* @param {Object} ver1 version_info
* @param {Object} ver2 version_info
* @returns {Boolean}
*/
_is_version_more_recent(ver1, ver2) {
return ver1 && ver2 && ver1.mtimeNsBigint > ver2.mtimeNsBigint;
}

_get_version_id_by_xattr(stat) {
return (stat && stat.xattr[XATTR_VERSION_ID]) || 'null';
}
Expand Down Expand Up @@ -3230,7 +3256,8 @@ class NamespaceFS {
}
return {
move_to_versions: { src_file: dst_file, dir_file },
move_to_dst: { src_file, dst_file, dir_file}
move_to_dst: { src_file, dst_file, dir_file},
move_src_to_versions: {src_file, dir_file}
};
} catch (err) {
dbg.warn('NamespaceFS._open_files_gpfs couldn\'t open files', err);
Expand Down
4 changes: 4 additions & 0 deletions src/sdk/nb.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,10 @@ interface NativeFS {
checkAccess(fs_context: NativeFSContext, path: string): Promise<void>;
getsinglexattr(fs_context: NativeFSContext, path: string, key: string): Promise<string>;
getpwname(fs_context: NativeFSContext, user: string): Promise<NativeFSUserObject>;
utimensat(fs_context: NativeFSContext, path: string, times: {
mtime?: bigint,
atime?: bigint
}): Promise<void>;

readFile(
fs_context: NativeFSContext,
Expand Down
80 changes: 80 additions & 0 deletions src/test/unit_tests/test_bucketspace_versioning.js
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,87 @@ mocha.describe('bucketspace namespace_fs - versioning', function() {
const exist = await version_file_exists(full_path, mpu_key1, '', prev_version_id);
assert.ok(exist);
});

mocha.it('mpu object - versioning enabled - put object before running mpu complete. mpu version should move to .versions dir and not latest', async function() {
const mpu_res = await s3_uid6.createMultipartUpload({ Bucket: bucket_name, Key: mpu_key1 });
const upload_id = mpu_res.UploadId;
const part1 = await s3_uid6.uploadPart({
Bucket: bucket_name, Key: mpu_key1, Body: body1, UploadId: upload_id, PartNumber: 1 });
const res_put_object = await s3_uid6.putObject({Bucket: bucket_name, Key: mpu_key1, Body: body1});
const res = await s3_uid6.completeMultipartUpload({
Bucket: bucket_name,
Key: mpu_key1,
UploadId: upload_id,
MultipartUpload: {
Parts: [{
ETag: part1.ETag,
PartNumber: 1
}]
}
});
const exist = await version_file_exists(full_path, mpu_key1, '', res.VersionId);
assert.ok(exist);
const comp_res = await compare_version_ids(full_path, mpu_key1, res_put_object.VersionId, res.VersionId);
assert.ok(comp_res);

const res_head_object = await s3_uid6.headObject({Bucket: bucket_name, Key: mpu_key1});
assert.equal(res_head_object.VersionId, res_put_object.VersionId);
});

mocha.it('mpu object - versioning enabled - create delete marker before running mpu complete. mpu should move to .versions dir and not latest', async function() {
const mpu_res = await s3_uid6.createMultipartUpload({ Bucket: bucket_name, Key: mpu_key1 });
const upload_id = mpu_res.UploadId;
const part1 = await s3_uid6.uploadPart({
Bucket: bucket_name, Key: mpu_key1, Body: body1, UploadId: upload_id, PartNumber: 1 });
await s3_uid6.deleteObject({Bucket: bucket_name, Key: mpu_key1});
const res = await s3_uid6.completeMultipartUpload({
Bucket: bucket_name,
Key: mpu_key1,
UploadId: upload_id,
MultipartUpload: {
Parts: [{
ETag: part1.ETag,
PartNumber: 1
}]
}
});
const exist = await version_file_exists(full_path, mpu_key1, '', res.VersionId);
assert.ok(exist);
const latest_version_dont_exist = fs_utils.file_not_exists(path.join(full_path, mpu_key1));
assert.ok(latest_version_dont_exist);
});

mocha.it('mpu object - versioning enabled - put null object before running mpu complete. mpu version should move to .versions dir and not latest', async function() {
const mpu_res = await s3_uid6.createMultipartUpload({ Bucket: bucket_name, Key: mpu_key1 });
const upload_id = mpu_res.UploadId;
const part1 = await s3_uid6.uploadPart({
Bucket: bucket_name, Key: mpu_key1, Body: body1, UploadId: upload_id, PartNumber: 1 });

//change to suspended mode to put null version id
await s3_uid6.putBucketVersioning({ Bucket: bucket_name, VersioningConfiguration: { MFADelete: 'Disabled', Status: 'Suspended' } });
await s3_uid6.putObject({Bucket: bucket_name, Key: mpu_key1, Body: body1});

//enable versioning so mpu will have version id
await s3_uid6.putBucketVersioning({ Bucket: bucket_name, VersioningConfiguration: { MFADelete: 'Disabled', Status: 'Enabled' } });
const res = await s3_uid6.completeMultipartUpload({
Bucket: bucket_name,
Key: mpu_key1,
UploadId: upload_id,
MultipartUpload: {
Parts: [{
ETag: part1.ETag,
PartNumber: 1
}]
}
});
const exist = await version_file_exists(full_path, mpu_key1, '', res.VersionId);
assert.ok(exist);

const res_head_object = await s3_uid6.headObject({Bucket: bucket_name, Key: mpu_key1});
assert.equal(res_head_object.VersionId, 'null');
});
});

});

// The res of putBucketVersioning is different depends on the versioning state:
Expand Down
27 changes: 27 additions & 0 deletions src/test/unit_tests/test_nb_native_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,33 @@ mocha.describe('nb_native fs', async function() {
await fs_utils.file_delete(tmp_mv_path);
}
});

mocha.describe('Utimensat', async function() {
mocha.it('Utimensat - change both atime and mtime', async function() {
const { utimensat } = nb_native().fs;
const PATH1 = `/tmp/utimensat${Date.now()}_1`;
await create_file(PATH1);
const new_time = BigInt(Date.now());
await utimensat(DEFAULT_FS_CONFIG, PATH1, {mtime: new_time, atime: new_time});
const res = await nb_native().fs.stat(DEFAULT_FS_CONFIG, PATH1);
assert.equal(res.atimeNsBigint, new_time);
assert.equal(res.mtimeNsBigint, new_time);
});

mocha.it('Utimensat - change only mtime', async function() {
const { utimensat } = nb_native().fs;
const PATH1 = `/tmp/utimensat${Date.now()}_1`;
await create_file(PATH1);
const new_time = BigInt(Date.now());
const res0 = await nb_native().fs.stat(DEFAULT_FS_CONFIG, PATH1);
await utimensat(DEFAULT_FS_CONFIG, PATH1, {mtime: new_time});
const res1 = await nb_native().fs.stat(DEFAULT_FS_CONFIG, PATH1);
//check that time really changed
assert.notEqual(res1.mtimeNsBigint, res0.mtimeNsBigint);
assert.equal(res1.atimeNsBigint, res0.atimeNsBigint);
assert.equal(res1.mtimeNsBigint, new_time);
});
});
});
});

Expand Down

0 comments on commit 5bcf21d

Please sign in to comment.