From b1025c23be6f6304a8894829ebe62a515703b4d7 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 10 Sep 2019 09:41:02 -0500 Subject: [PATCH] ARROW-6481: [C++] Avoid copying large ConvertOptions If you have N columns and a large subset of them are customized in ConvertOptions, copying ConvertOptions in each Converter or ColumnBuilder produces a quadratic explosion. With this PR, the reproducer in ARROW-6481 drops down from 8 seconds to 100 ms (on my machine). And it doesn't consume 8 GB RAM anymore. Closes #5334 from pitrou/ARROW-6481-large-convert-options and squashes the following commits: 967142e23 ARROW-6481: Avoid copying large ConvertOptions Authored-by: Antoine Pitrou Signed-off-by: Wes McKinney --- cpp/src/arrow/csv/column_builder.cc | 8 ++- cpp/src/arrow/csv/column_builder_test.cc | 92 +++++++++++++----------- cpp/src/arrow/csv/converter.h | 4 +- python/pyarrow/tests/test_csv.py | 32 +++++++++ 4 files changed, 90 insertions(+), 46 deletions(-) diff --git a/cpp/src/arrow/csv/column_builder.cc b/cpp/src/arrow/csv/column_builder.cc index 6bb0dbedb5c7e..da65602077335 100644 --- a/cpp/src/arrow/csv/column_builder.cc +++ b/cpp/src/arrow/csv/column_builder.cc @@ -153,7 +153,9 @@ class TypedColumnBuilder : public ColumnBuilder { std::shared_ptr type_; int32_t col_index_; - ConvertOptions options_; + // CAUTION: ConvertOptions can grow large (if it customizes hundreds or + // thousands of columns), so avoid copying it in each TypedColumnBuilder. + const ConvertOptions& options_; MemoryPool* pool_; std::shared_ptr converter_; @@ -231,7 +233,9 @@ class InferringColumnBuilder : public ColumnBuilder { std::mutex mutex_; int32_t col_index_; - ConvertOptions options_; + // CAUTION: ConvertOptions can grow large (if it customizes hundreds or + // thousands of columns), so avoid copying it in each InferringColumnBuilder. + const ConvertOptions& options_; MemoryPool* pool_; std::shared_ptr converter_; diff --git a/cpp/src/arrow/csv/column_builder_test.cc b/cpp/src/arrow/csv/column_builder_test.cc index e92de3d79c1d9..540b1bc48825b 100644 --- a/cpp/src/arrow/csv/column_builder_test.cc +++ b/cpp/src/arrow/csv/column_builder_test.cc @@ -52,6 +52,8 @@ void AssertBuilding(const std::shared_ptr& builder, ASSERT_OK((*out)->Validate()); } +static ConvertOptions default_options = ConvertOptions::Defaults(); + ////////////////////////////////////////////////////////////////////////// // Tests for null column builder @@ -123,10 +125,11 @@ TEST(NullColumnBuilder, InsertTyped) { // Tests for fixed-type column builder TEST(ColumnBuilder, Empty) { + auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), int32(), 0, - ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK( + ColumnBuilder::Make(default_memory_pool(), int32(), 0, options, tg, &builder)); std::shared_ptr actual; AssertBuilding(builder, {}, &actual); @@ -136,10 +139,11 @@ TEST(ColumnBuilder, Empty) { } TEST(ColumnBuilder, Basics) { + auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), int32(), 0, - ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK( + ColumnBuilder::Make(default_memory_pool(), int32(), 0, options, tg, &builder)); std::shared_ptr actual; AssertBuilding(builder, {{"123", "-456"}}, &actual); @@ -151,10 +155,11 @@ TEST(ColumnBuilder, Basics) { TEST(ColumnBuilder, Insert) { // Test ColumnBuilder::Insert() + auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), int32(), 0, - ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK( + ColumnBuilder::Make(default_memory_pool(), int32(), 0, options, tg, &builder)); std::shared_ptr parser; std::shared_ptr actual, expected; @@ -171,10 +176,11 @@ TEST(ColumnBuilder, Insert) { } TEST(ColumnBuilder, MultipleChunks) { + auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), int32(), 0, - ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK( + ColumnBuilder::Make(default_memory_pool(), int32(), 0, options, tg, &builder)); std::shared_ptr actual; AssertBuilding(builder, {{"1", "2", "3"}, {"4", "5"}}, &actual); @@ -185,10 +191,11 @@ TEST(ColumnBuilder, MultipleChunks) { } TEST(ColumnBuilder, MultipleChunksParallel) { + auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeThreaded(GetCpuThreadPool()); std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), int32(), 0, - ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK( + ColumnBuilder::Make(default_memory_pool(), int32(), 0, options, tg, &builder)); std::shared_ptr actual; AssertBuilding(builder, {{"1", "2"}, {"3"}, {"4", "5"}, {"6", "7"}}, &actual); @@ -202,10 +209,10 @@ TEST(ColumnBuilder, MultipleChunksParallel) { // Tests for type-inferring column builder TEST(InferringColumnBuilder, Empty) { + auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, - &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); std::shared_ptr actual; AssertBuilding(builder, {}, &actual); @@ -215,10 +222,10 @@ TEST(InferringColumnBuilder, Empty) { } TEST(InferringColumnBuilder, SingleChunkNull) { + auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, - &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); std::shared_ptr actual; AssertBuilding(builder, {{"", "NA"}}, &actual); @@ -228,10 +235,10 @@ TEST(InferringColumnBuilder, SingleChunkNull) { } TEST(InferringColumnBuilder, MultipleChunkNull) { + auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, - &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); std::shared_ptr actual; AssertBuilding(builder, {{"", "NA"}, {""}, {"NaN"}}, &actual); @@ -241,10 +248,10 @@ TEST(InferringColumnBuilder, MultipleChunkNull) { } TEST(InferringColumnBuilder, SingleChunkInteger) { + auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, - &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); std::shared_ptr actual; AssertBuilding(builder, {{"", "123", "456"}}, &actual); @@ -255,10 +262,10 @@ TEST(InferringColumnBuilder, SingleChunkInteger) { } TEST(InferringColumnBuilder, MultipleChunkInteger) { + auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, - &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); std::shared_ptr actual; AssertBuilding(builder, {{""}, {"NA", "123", "456"}}, &actual); @@ -270,10 +277,10 @@ TEST(InferringColumnBuilder, MultipleChunkInteger) { } TEST(InferringColumnBuilder, SingleChunkBoolean) { + auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, - &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); std::shared_ptr actual; AssertBuilding(builder, {{"", "0", "FALSE"}}, &actual); @@ -285,10 +292,10 @@ TEST(InferringColumnBuilder, SingleChunkBoolean) { } TEST(InferringColumnBuilder, MultipleChunkBoolean) { + auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, - &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); std::shared_ptr actual; AssertBuilding(builder, {{""}, {"1", "True", "0"}}, &actual); @@ -300,10 +307,10 @@ TEST(InferringColumnBuilder, MultipleChunkBoolean) { } TEST(InferringColumnBuilder, SingleChunkReal) { + auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, - &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); std::shared_ptr actual; AssertBuilding(builder, {{"", "0.0", "12.5"}}, &actual); @@ -315,10 +322,10 @@ TEST(InferringColumnBuilder, SingleChunkReal) { } TEST(InferringColumnBuilder, MultipleChunkReal) { + auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, - &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); std::shared_ptr actual; AssertBuilding(builder, {{""}, {"008"}, {"NaN", "12.5"}}, &actual); @@ -330,10 +337,10 @@ TEST(InferringColumnBuilder, MultipleChunkReal) { } TEST(InferringColumnBuilder, SingleChunkTimestamp) { + auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, - &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); std::shared_ptr actual; AssertBuilding(builder, {{"", "1970-01-01", "2018-11-13 17:11:10"}}, &actual); @@ -346,10 +353,10 @@ TEST(InferringColumnBuilder, SingleChunkTimestamp) { } TEST(InferringColumnBuilder, MultipleChunkTimestamp) { + auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, - &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); std::shared_ptr actual; AssertBuilding(builder, {{""}, {"1970-01-01"}, {"2018-11-13 17:11:10"}}, &actual); @@ -362,14 +369,14 @@ TEST(InferringColumnBuilder, MultipleChunkTimestamp) { } TEST(InferringColumnBuilder, SingleChunkString) { + auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); std::shared_ptr builder; std::shared_ptr actual; std::shared_ptr expected; // With valid UTF8 - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, - &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); AssertBuilding(builder, {{"", "foo", "baré"}}, &actual); ChunkedArrayFromVector({{true, true, true}}, @@ -377,7 +384,6 @@ TEST(InferringColumnBuilder, SingleChunkString) { AssertChunkedEqual(*expected, *actual); // With invalid UTF8, non-checking - auto options = ConvertOptions::Defaults(); options.check_utf8 = false; tg = TaskGroup::MakeSerial(); ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); @@ -389,14 +395,14 @@ TEST(InferringColumnBuilder, SingleChunkString) { } TEST(InferringColumnBuilder, SingleChunkBinary) { + auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); std::shared_ptr builder; std::shared_ptr actual; std::shared_ptr expected; // With invalid UTF8, checking - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, - &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); AssertBuilding(builder, {{"", "foo\xff", "baré"}}, &actual); ChunkedArrayFromVector({{true, true, true}}, @@ -405,10 +411,10 @@ TEST(InferringColumnBuilder, SingleChunkBinary) { } TEST(InferringColumnBuilder, MultipleChunkString) { + auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, - &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); std::shared_ptr actual; AssertBuilding(builder, {{""}, {"008"}, {"NaN", "baré"}}, &actual); @@ -420,10 +426,10 @@ TEST(InferringColumnBuilder, MultipleChunkString) { } TEST(InferringColumnBuilder, MultipleChunkBinary) { + auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, - &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); std::shared_ptr actual; AssertBuilding(builder, {{""}, {"008"}, {"NaN", "baré\xff"}}, &actual); @@ -438,10 +444,10 @@ TEST(InferringColumnBuilder, MultipleChunkBinary) { // (see python/pyarrow/tests/test_csv.py) TEST(InferringColumnBuilder, MultipleChunkIntegerParallel) { + auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeThreaded(GetCpuThreadPool()); std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, - &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); std::shared_ptr actual; AssertBuilding(builder, {{"1", "2"}, {"3"}, {"4", "5"}, {"6", "7"}}, &actual); diff --git a/cpp/src/arrow/csv/converter.h b/cpp/src/arrow/csv/converter.h index d64fe695d0a26..fa2f0865a3fdd 100644 --- a/cpp/src/arrow/csv/converter.h +++ b/cpp/src/arrow/csv/converter.h @@ -57,7 +57,9 @@ class ARROW_EXPORT Converter { virtual Status Initialize() = 0; - const ConvertOptions options_; + // CAUTION: ConvertOptions can grow large (if it customizes hundreds or + // thousands of columns), so avoid copying it in each Converter. + const ConvertOptions& options_; MemoryPool* pool_; std::shared_ptr type_; }; diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py index a43412f3ace15..1607f0c8b5363 100644 --- a/python/pyarrow/tests/test_csv.py +++ b/python/pyarrow/tests/test_csv.py @@ -25,6 +25,7 @@ import shutil import string import tempfile +import time import unittest import pytest @@ -60,6 +61,13 @@ def make_random_csv(num_cols=2, num_rows=10, linesep=u'\r\n'): return csv, expected +def make_empty_csv(column_names): + csv = io.StringIO() + csv.write(u",".join(column_names)) + csv.write(u"\n") + return csv.getvalue().encode() + + def test_read_options(): cls = ReadOptions opts = cls() @@ -693,6 +701,30 @@ def test_stress_block_sizes(self): # Better error output assert table.to_pydict() == expected.to_pydict() + def test_stress_convert_options_blowup(self): + # ARROW-6481: A convert_options with a very large number of columns + # should not blow memory and CPU time. + try: + clock = time.thread_time + except AttributeError: + clock = time.time + num_columns = 10000 + col_names = ["K{0}".format(i) for i in range(num_columns)] + csv = make_empty_csv(col_names) + t1 = clock() + convert_options = ConvertOptions( + column_types={k: pa.string() for k in col_names[::2]}) + table = self.read_bytes(csv, convert_options=convert_options) + dt = clock() - t1 + # Check that processing time didn't blow up. + # This is a conservative check (it takes less than 300 ms + # in debug mode on my local machine). + assert dt <= 10.0 + # Check result + assert table.num_columns == num_columns + assert table.num_rows == 0 + assert table.column_names == col_names + class TestSerialCSVRead(BaseTestCSVRead, unittest.TestCase):