From 271954911a53f7e66cd116b01f6e842e17769c89 Mon Sep 17 00:00:00 2001 From: Guga Cavalieri Date: Sat, 5 Nov 2022 13:09:03 -0600 Subject: [PATCH 1/3] Add Buffer Size Parameter to dump create --- replibyte/src/cli.rs | 2 ++ replibyte/src/commands/dump.rs | 12 ++++++------ replibyte/src/tasks/full_dump.rs | 6 ++++-- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/replibyte/src/cli.rs b/replibyte/src/cli.rs index 50910299..108ca593 100644 --- a/replibyte/src/cli.rs +++ b/replibyte/src/cli.rs @@ -104,6 +104,8 @@ pub struct DumpCreateArgs { /// import dump from stdin #[clap(name = "input", short, long, requires = "source_type")] pub input: bool, + #[clap(name = "buffer_size", short, long, default_value_t = 100)] + pub buffer_size: usize, #[clap(short, long, parse(from_os_str), value_name = "dump file")] /// dump file pub file: Option, diff --git a/replibyte/src/commands/dump.rs b/replibyte/src/commands/dump.rs index 22eb765a..a2dd8b6a 100644 --- a/replibyte/src/commands/dump.rs +++ b/replibyte/src/commands/dump.rs @@ -145,7 +145,7 @@ where password.as_str(), ); - let task = FullDumpTask::new(postgres, datastore, options); + let task = FullDumpTask::new(postgres, datastore, options, args.buffer_size); task.run(progress_callback)? } ConnectionUri::Mysql(host, port, username, password, database) => { @@ -157,13 +157,13 @@ where password.as_str(), ); - let task = FullDumpTask::new(mysql, datastore, options); + let task = FullDumpTask::new(mysql, datastore, options, args.buffer_size); task.run(progress_callback)? } ConnectionUri::MongoDB(uri, database) => { let mongodb = MongoDB::new(uri.as_str(), database.as_str()); - let task = FullDumpTask::new(mongodb, datastore, options); + let task = FullDumpTask::new(mongodb, datastore, options, args.buffer_size); task.run(progress_callback)? } }, @@ -177,7 +177,7 @@ where } let postgres = PostgresStdin::default(); - let task = FullDumpTask::new(postgres, datastore, options); + let task = FullDumpTask::new(postgres, datastore, options, args.buffer_size); task.run(progress_callback)? } Some(v) if v == "mysql" => { @@ -189,7 +189,7 @@ where } let mysql = MysqlStdin::default(); - let task = FullDumpTask::new(mysql, datastore, options); + let task = FullDumpTask::new(mysql, datastore, options, args.buffer_size); task.run(progress_callback)? } Some(v) if v == "mongodb" => { @@ -201,7 +201,7 @@ where } let mongodb = MongoDBStdin::default(); - let task = FullDumpTask::new(mongodb, datastore, options); + let task = FullDumpTask::new(mongodb, datastore, options, args.buffer_size); task.run(progress_callback)? } Some(v) => { diff --git a/replibyte/src/tasks/full_dump.rs b/replibyte/src/tasks/full_dump.rs index 907d271a..e337edcd 100644 --- a/replibyte/src/tasks/full_dump.rs +++ b/replibyte/src/tasks/full_dump.rs @@ -18,17 +18,19 @@ where source: S, datastore: Box, options: SourceOptions<'a>, + arg_buffer_size: usize, } impl<'a, S> FullDumpTask<'a, S> where S: Source, { - pub fn new(source: S, datastore: Box, options: SourceOptions<'a>) -> Self { + pub fn new(source: S, datastore: Box, options: SourceOptions<'a>, arg_buffer_size: usize) -> Self { FullDumpTask { source, datastore, options, + arg_buffer_size, } } } @@ -70,7 +72,7 @@ where }); // buffer of 100MB in memory to use and re-use to upload data into datastore - let buffer_size = 100 * 1024 * 1024; + let buffer_size = self.arg_buffer_size * 1024 * 1024; let mut queries = vec![]; let mut consumed_buffer_size = 0usize; let mut total_transferred_bytes = 0usize; From dc83b0aa9ce0aaafe2152b408425e236b121e54d Mon Sep 17 00:00:00 2001 From: Guga Cavalieri Date: Sat, 5 Nov 2022 13:31:20 -0600 Subject: [PATCH 2/3] Add help to buffer_size argument --- replibyte/src/cli.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/replibyte/src/cli.rs b/replibyte/src/cli.rs index 108ca593..884ba276 100644 --- a/replibyte/src/cli.rs +++ b/replibyte/src/cli.rs @@ -104,7 +104,7 @@ pub struct DumpCreateArgs { /// import dump from stdin #[clap(name = "input", short, long, requires = "source_type")] pub input: bool, - #[clap(name = "buffer_size", short, long, default_value_t = 100)] + #[clap(name = "buffer_size", short, long, default_value_t = 100, value_name = "buffer_size", help = "Buffer Size in MB")] pub buffer_size: usize, #[clap(short, long, parse(from_os_str), value_name = "dump file")] /// dump file From 0ecd245cddbfabda58e7be301d6394f2e80ba62d Mon Sep 17 00:00:00 2001 From: Guga Cavalieri Date: Sun, 20 Nov 2022 12:57:34 -0700 Subject: [PATCH 3/3] add dump_chunk_size to SourceOptions --- replibyte/src/cli.rs | 4 ++-- replibyte/src/commands/dump.rs | 13 +++++++------ replibyte/src/source/mod.rs | 1 + replibyte/src/source/mongodb.rs | 7 +++++++ replibyte/src/source/mysql.rs | 7 +++++++ replibyte/src/source/postgres.rs | 11 +++++++++++ replibyte/src/tasks/full_dump.rs | 6 ++---- 7 files changed, 37 insertions(+), 12 deletions(-) diff --git a/replibyte/src/cli.rs b/replibyte/src/cli.rs index 884ba276..b76b5964 100644 --- a/replibyte/src/cli.rs +++ b/replibyte/src/cli.rs @@ -104,8 +104,8 @@ pub struct DumpCreateArgs { /// import dump from stdin #[clap(name = "input", short, long, requires = "source_type")] pub input: bool, - #[clap(name = "buffer_size", short, long, default_value_t = 100, value_name = "buffer_size", help = "Buffer Size in MB")] - pub buffer_size: usize, + #[clap(name = "dump_chunk_size", short, long, default_value_t = 100, value_name = "dump_chunk_size", help = "Dump Chunk Size in MB")] + pub dump_chunk_size: usize, #[clap(short, long, parse(from_os_str), value_name = "dump file")] /// dump file pub file: Option, diff --git a/replibyte/src/commands/dump.rs b/replibyte/src/commands/dump.rs index a2dd8b6a..c218a55a 100644 --- a/replibyte/src/commands/dump.rs +++ b/replibyte/src/commands/dump.rs @@ -132,6 +132,7 @@ where skip_config: &skip_config, database_subset: &source.database_subset, only_tables: &only_tables_config, + dump_chunk_size: &args.dump_chunk_size, }; match args.source_type.as_ref().map(|x| x.as_str()) { @@ -145,7 +146,7 @@ where password.as_str(), ); - let task = FullDumpTask::new(postgres, datastore, options, args.buffer_size); + let task = FullDumpTask::new(postgres, datastore, options); task.run(progress_callback)? } ConnectionUri::Mysql(host, port, username, password, database) => { @@ -157,13 +158,13 @@ where password.as_str(), ); - let task = FullDumpTask::new(mysql, datastore, options, args.buffer_size); + let task = FullDumpTask::new(mysql, datastore, options); task.run(progress_callback)? } ConnectionUri::MongoDB(uri, database) => { let mongodb = MongoDB::new(uri.as_str(), database.as_str()); - let task = FullDumpTask::new(mongodb, datastore, options, args.buffer_size); + let task = FullDumpTask::new(mongodb, datastore, options); task.run(progress_callback)? } }, @@ -177,7 +178,7 @@ where } let postgres = PostgresStdin::default(); - let task = FullDumpTask::new(postgres, datastore, options, args.buffer_size); + let task = FullDumpTask::new(postgres, datastore, options); task.run(progress_callback)? } Some(v) if v == "mysql" => { @@ -189,7 +190,7 @@ where } let mysql = MysqlStdin::default(); - let task = FullDumpTask::new(mysql, datastore, options, args.buffer_size); + let task = FullDumpTask::new(mysql, datastore, options); task.run(progress_callback)? } Some(v) if v == "mongodb" => { @@ -201,7 +202,7 @@ where } let mongodb = MongoDBStdin::default(); - let task = FullDumpTask::new(mongodb, datastore, options, args.buffer_size); + let task = FullDumpTask::new(mongodb, datastore, options); task.run(progress_callback)? } Some(v) => { diff --git a/replibyte/src/source/mod.rs b/replibyte/src/source/mod.rs index 694e3447..d8883597 100644 --- a/replibyte/src/source/mod.rs +++ b/replibyte/src/source/mod.rs @@ -29,4 +29,5 @@ pub struct SourceOptions<'a> { pub skip_config: &'a Vec, pub database_subset: &'a Option, pub only_tables: &'a Vec, + pub dump_chunk_size: &'a usize, } diff --git a/replibyte/src/source/mongodb.rs b/replibyte/src/source/mongodb.rs index 5857febd..adbbf0e9 100644 --- a/replibyte/src/source/mongodb.rs +++ b/replibyte/src/source/mongodb.rs @@ -357,6 +357,10 @@ mod tests { ) } + fn get_default_dump_chunk_size() -> usize { + 100 + } + #[test] fn connect() { let p = get_mongodb(); @@ -368,6 +372,7 @@ mod tests { skip_config: &vec![], database_subset: &None, only_tables: &vec![], + dump_chunk_size: &get_default_dump_chunk_size(), }; assert!(p.read(source_options, |_, _| {}).is_ok()); @@ -380,6 +385,7 @@ mod tests { skip_config: &vec![], database_subset: &None, only_tables: &vec![], + dump_chunk_size: &get_default_dump_chunk_size(), }; assert!(p.read(source_options, |_, _| {}).is_err()); @@ -395,6 +401,7 @@ mod tests { skip_config: &vec![], database_subset: &None, only_tables: &vec![], + dump_chunk_size: &get_default_dump_chunk_size(), }; p.read(source_options, |original_query, query| { diff --git a/replibyte/src/source/mysql.rs b/replibyte/src/source/mysql.rs index 04dd820f..82f50a40 100644 --- a/replibyte/src/source/mysql.rs +++ b/replibyte/src/source/mysql.rs @@ -444,6 +444,10 @@ mod tests { Mysql::new("127.0.0.1", 3306, "world", "root", "wrong_password") } + fn get_default_dump_chunk_size() -> usize { + 100 + } + #[test] fn connect() { let mut p = get_mysql(); @@ -456,6 +460,7 @@ mod tests { skip_config: &vec![], database_subset: &None, only_tables: &vec![], + dump_chunk_size: &get_default_dump_chunk_size(), }; assert!(p.read(source_options, |_original_query, _query| {}).is_ok()); @@ -468,6 +473,7 @@ mod tests { skip_config: &vec![], database_subset: &None, only_tables: &vec![], + dump_chunk_size: &get_default_dump_chunk_size(), }; assert!(p .read(source_options, |_original_query, _query| {}) @@ -484,6 +490,7 @@ mod tests { skip_config: &vec![], database_subset: &None, only_tables: &vec![], + dump_chunk_size: &get_default_dump_chunk_size(), }; let _ = p.read(source_options, |original_query, query| { assert!(original_query.data().len() > 0); diff --git a/replibyte/src/source/postgres.rs b/replibyte/src/source/postgres.rs index 60ebae72..98f677d4 100644 --- a/replibyte/src/source/postgres.rs +++ b/replibyte/src/source/postgres.rs @@ -571,6 +571,10 @@ mod tests { Postgres::new("localhost", 5432, "root", "root", "wrongpassword") } + fn get_default_dump_chunk_size() -> usize { + 100 + } + #[test] fn connect() { let p = get_postgres(); @@ -581,6 +585,7 @@ mod tests { skip_config: &vec![], database_subset: &None, only_tables: &vec![], + dump_chunk_size: &get_default_dump_chunk_size(), }; assert!(p.read(source_options, |original_query, query| {}).is_ok()); @@ -593,6 +598,7 @@ mod tests { skip_config: &vec![], database_subset: &None, only_tables: &vec![], + dump_chunk_size: &get_default_dump_chunk_size(), }; assert!(p.read(source_options, |original_query, query| {}).is_err()); @@ -608,6 +614,7 @@ mod tests { skip_config: &vec![], database_subset: &None, only_tables: &vec![], + dump_chunk_size: &get_default_dump_chunk_size(), }; let _ = p.read(source_options, |original_query, query| { @@ -734,6 +741,7 @@ mod tests { skip_config: &vec![], database_subset: &None, only_tables: &vec![], + dump_chunk_size: &get_default_dump_chunk_size(), }; let _ = p.read(source_options, |original_query, query| { @@ -775,6 +783,7 @@ mod tests { skip_config: &skip_config, database_subset: &None, only_tables: &vec![], + dump_chunk_size: &get_default_dump_chunk_size(), }; let _ = p.read(source_options, |_original_query, query| { @@ -826,6 +835,7 @@ mod tests { passthrough_tables: None, }), only_tables: &vec![], + dump_chunk_size: &get_default_dump_chunk_size(), }; let mut rows_percent_50 = vec![]; @@ -863,6 +873,7 @@ mod tests { passthrough_tables: None, }), only_tables: &vec![], + dump_chunk_size: &get_default_dump_chunk_size(), }; let mut rows_percent_30 = vec![]; diff --git a/replibyte/src/tasks/full_dump.rs b/replibyte/src/tasks/full_dump.rs index e337edcd..0803497b 100644 --- a/replibyte/src/tasks/full_dump.rs +++ b/replibyte/src/tasks/full_dump.rs @@ -18,19 +18,17 @@ where source: S, datastore: Box, options: SourceOptions<'a>, - arg_buffer_size: usize, } impl<'a, S> FullDumpTask<'a, S> where S: Source, { - pub fn new(source: S, datastore: Box, options: SourceOptions<'a>, arg_buffer_size: usize) -> Self { + pub fn new(source: S, datastore: Box, options: SourceOptions<'a>) -> Self { FullDumpTask { source, datastore, options, - arg_buffer_size, } } } @@ -72,7 +70,7 @@ where }); // buffer of 100MB in memory to use and re-use to upload data into datastore - let buffer_size = self.arg_buffer_size * 1024 * 1024; + let buffer_size = self.options.dump_chunk_size * 1024 * 1024; let mut queries = vec![]; let mut consumed_buffer_size = 0usize; let mut total_transferred_bytes = 0usize;