Skip to content

Commit

Permalink
ARROW-6481: [C++] Avoid copying large ConvertOptions
Browse files Browse the repository at this point in the history
If you have N columns and a large subset of them are customized in ConvertOptions,
copying ConvertOptions in each Converter or ColumnBuilder produces a quadratic
explosion.

With this PR, the reproducer in ARROW-6481 drops down from 8 seconds to 100 ms (on my machine). And it doesn't consume 8 GB RAM anymore.

Closes apache#5334 from pitrou/ARROW-6481-large-convert-options and squashes the following commits:

967142e <Antoine Pitrou> ARROW-6481:  Avoid copying large ConvertOptions

Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Wes McKinney <[email protected]>
  • Loading branch information
pitrou authored and wesm committed Sep 10, 2019
1 parent 3f2a33f commit b1025c2
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 46 deletions.
8 changes: 6 additions & 2 deletions cpp/src/arrow/csv/column_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ class TypedColumnBuilder : public ColumnBuilder {

std::shared_ptr<DataType> type_;
int32_t col_index_;
ConvertOptions options_;
// CAUTION: ConvertOptions can grow large (if it customizes hundreds or
// thousands of columns), so avoid copying it in each TypedColumnBuilder.
const ConvertOptions& options_;
MemoryPool* pool_;

std::shared_ptr<Converter> converter_;
Expand Down Expand Up @@ -231,7 +233,9 @@ class InferringColumnBuilder : public ColumnBuilder {
std::mutex mutex_;

int32_t col_index_;
ConvertOptions options_;
// CAUTION: ConvertOptions can grow large (if it customizes hundreds or
// thousands of columns), so avoid copying it in each InferringColumnBuilder.
const ConvertOptions& options_;
MemoryPool* pool_;
std::shared_ptr<Converter> converter_;

Expand Down
92 changes: 49 additions & 43 deletions cpp/src/arrow/csv/column_builder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ void AssertBuilding(const std::shared_ptr<ColumnBuilder>& builder,
ASSERT_OK((*out)->Validate());
}

static ConvertOptions default_options = ConvertOptions::Defaults();

//////////////////////////////////////////////////////////////////////////
// Tests for null column builder

Expand Down Expand Up @@ -123,10 +125,11 @@ TEST(NullColumnBuilder, InsertTyped) {
// Tests for fixed-type column builder

TEST(ColumnBuilder, Empty) {
auto options = ConvertOptions::Defaults();
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), int32(), 0,
ConvertOptions::Defaults(), tg, &builder));
ASSERT_OK(
ColumnBuilder::Make(default_memory_pool(), int32(), 0, options, tg, &builder));

std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {}, &actual);
Expand All @@ -136,10 +139,11 @@ TEST(ColumnBuilder, Empty) {
}

TEST(ColumnBuilder, Basics) {
auto options = ConvertOptions::Defaults();
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), int32(), 0,
ConvertOptions::Defaults(), tg, &builder));
ASSERT_OK(
ColumnBuilder::Make(default_memory_pool(), int32(), 0, options, tg, &builder));

std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{"123", "-456"}}, &actual);
Expand All @@ -151,10 +155,11 @@ TEST(ColumnBuilder, Basics) {

TEST(ColumnBuilder, Insert) {
// Test ColumnBuilder::Insert()
auto options = ConvertOptions::Defaults();
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), int32(), 0,
ConvertOptions::Defaults(), tg, &builder));
ASSERT_OK(
ColumnBuilder::Make(default_memory_pool(), int32(), 0, options, tg, &builder));

std::shared_ptr<BlockParser> parser;
std::shared_ptr<ChunkedArray> actual, expected;
Expand All @@ -171,10 +176,11 @@ TEST(ColumnBuilder, Insert) {
}

TEST(ColumnBuilder, MultipleChunks) {
auto options = ConvertOptions::Defaults();
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), int32(), 0,
ConvertOptions::Defaults(), tg, &builder));
ASSERT_OK(
ColumnBuilder::Make(default_memory_pool(), int32(), 0, options, tg, &builder));

std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{"1", "2", "3"}, {"4", "5"}}, &actual);
Expand All @@ -185,10 +191,11 @@ TEST(ColumnBuilder, MultipleChunks) {
}

TEST(ColumnBuilder, MultipleChunksParallel) {
auto options = ConvertOptions::Defaults();
auto tg = TaskGroup::MakeThreaded(GetCpuThreadPool());
std::shared_ptr<ColumnBuilder> builder;
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), int32(), 0,
ConvertOptions::Defaults(), tg, &builder));
ASSERT_OK(
ColumnBuilder::Make(default_memory_pool(), int32(), 0, options, tg, &builder));

std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{"1", "2"}, {"3"}, {"4", "5"}, {"6", "7"}}, &actual);
Expand All @@ -202,10 +209,10 @@ TEST(ColumnBuilder, MultipleChunksParallel) {
// Tests for type-inferring column builder

TEST(InferringColumnBuilder, Empty) {
auto options = ConvertOptions::Defaults();
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
&builder));
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder));

std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {}, &actual);
Expand All @@ -215,10 +222,10 @@ TEST(InferringColumnBuilder, Empty) {
}

TEST(InferringColumnBuilder, SingleChunkNull) {
auto options = ConvertOptions::Defaults();
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
&builder));
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder));

std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{"", "NA"}}, &actual);
Expand All @@ -228,10 +235,10 @@ TEST(InferringColumnBuilder, SingleChunkNull) {
}

TEST(InferringColumnBuilder, MultipleChunkNull) {
auto options = ConvertOptions::Defaults();
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
&builder));
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder));

std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{"", "NA"}, {""}, {"NaN"}}, &actual);
Expand All @@ -241,10 +248,10 @@ TEST(InferringColumnBuilder, MultipleChunkNull) {
}

TEST(InferringColumnBuilder, SingleChunkInteger) {
auto options = ConvertOptions::Defaults();
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
&builder));
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder));

std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{"", "123", "456"}}, &actual);
Expand All @@ -255,10 +262,10 @@ TEST(InferringColumnBuilder, SingleChunkInteger) {
}

TEST(InferringColumnBuilder, MultipleChunkInteger) {
auto options = ConvertOptions::Defaults();
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
&builder));
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder));

std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{""}, {"NA", "123", "456"}}, &actual);
Expand All @@ -270,10 +277,10 @@ TEST(InferringColumnBuilder, MultipleChunkInteger) {
}

TEST(InferringColumnBuilder, SingleChunkBoolean) {
auto options = ConvertOptions::Defaults();
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
&builder));
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder));

std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{"", "0", "FALSE"}}, &actual);
Expand All @@ -285,10 +292,10 @@ TEST(InferringColumnBuilder, SingleChunkBoolean) {
}

TEST(InferringColumnBuilder, MultipleChunkBoolean) {
auto options = ConvertOptions::Defaults();
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
&builder));
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder));

std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{""}, {"1", "True", "0"}}, &actual);
Expand All @@ -300,10 +307,10 @@ TEST(InferringColumnBuilder, MultipleChunkBoolean) {
}

TEST(InferringColumnBuilder, SingleChunkReal) {
auto options = ConvertOptions::Defaults();
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
&builder));
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder));

std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{"", "0.0", "12.5"}}, &actual);
Expand All @@ -315,10 +322,10 @@ TEST(InferringColumnBuilder, SingleChunkReal) {
}

TEST(InferringColumnBuilder, MultipleChunkReal) {
auto options = ConvertOptions::Defaults();
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
&builder));
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder));

std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{""}, {"008"}, {"NaN", "12.5"}}, &actual);
Expand All @@ -330,10 +337,10 @@ TEST(InferringColumnBuilder, MultipleChunkReal) {
}

TEST(InferringColumnBuilder, SingleChunkTimestamp) {
auto options = ConvertOptions::Defaults();
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
&builder));
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder));

std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{"", "1970-01-01", "2018-11-13 17:11:10"}}, &actual);
Expand All @@ -346,10 +353,10 @@ TEST(InferringColumnBuilder, SingleChunkTimestamp) {
}

TEST(InferringColumnBuilder, MultipleChunkTimestamp) {
auto options = ConvertOptions::Defaults();
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
&builder));
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder));

std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{""}, {"1970-01-01"}, {"2018-11-13 17:11:10"}}, &actual);
Expand All @@ -362,22 +369,21 @@ TEST(InferringColumnBuilder, MultipleChunkTimestamp) {
}

TEST(InferringColumnBuilder, SingleChunkString) {
auto options = ConvertOptions::Defaults();
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
std::shared_ptr<ChunkedArray> actual;
std::shared_ptr<ChunkedArray> expected;

// With valid UTF8
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
&builder));
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder));
AssertBuilding(builder, {{"", "foo", "baré"}}, &actual);

ChunkedArrayFromVector<StringType, std::string>({{true, true, true}},
{{"", "foo", "baré"}}, &expected);
AssertChunkedEqual(*expected, *actual);

// With invalid UTF8, non-checking
auto options = ConvertOptions::Defaults();
options.check_utf8 = false;
tg = TaskGroup::MakeSerial();
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder));
Expand All @@ -389,14 +395,14 @@ TEST(InferringColumnBuilder, SingleChunkString) {
}

TEST(InferringColumnBuilder, SingleChunkBinary) {
auto options = ConvertOptions::Defaults();
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
std::shared_ptr<ChunkedArray> actual;
std::shared_ptr<ChunkedArray> expected;

// With invalid UTF8, checking
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
&builder));
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder));
AssertBuilding(builder, {{"", "foo\xff", "baré"}}, &actual);

ChunkedArrayFromVector<BinaryType, std::string>({{true, true, true}},
Expand All @@ -405,10 +411,10 @@ TEST(InferringColumnBuilder, SingleChunkBinary) {
}

TEST(InferringColumnBuilder, MultipleChunkString) {
auto options = ConvertOptions::Defaults();
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
&builder));
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder));

std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{""}, {"008"}, {"NaN", "baré"}}, &actual);
Expand All @@ -420,10 +426,10 @@ TEST(InferringColumnBuilder, MultipleChunkString) {
}

TEST(InferringColumnBuilder, MultipleChunkBinary) {
auto options = ConvertOptions::Defaults();
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
&builder));
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder));

std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{""}, {"008"}, {"NaN", "baré\xff"}}, &actual);
Expand All @@ -438,10 +444,10 @@ TEST(InferringColumnBuilder, MultipleChunkBinary) {
// (see python/pyarrow/tests/test_csv.py)

TEST(InferringColumnBuilder, MultipleChunkIntegerParallel) {
auto options = ConvertOptions::Defaults();
auto tg = TaskGroup::MakeThreaded(GetCpuThreadPool());
std::shared_ptr<ColumnBuilder> builder;
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
&builder));
ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder));

std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{"1", "2"}, {"3"}, {"4", "5"}, {"6", "7"}}, &actual);
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/csv/converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ class ARROW_EXPORT Converter {

virtual Status Initialize() = 0;

const ConvertOptions options_;
// CAUTION: ConvertOptions can grow large (if it customizes hundreds or
// thousands of columns), so avoid copying it in each Converter.
const ConvertOptions& options_;
MemoryPool* pool_;
std::shared_ptr<DataType> type_;
};
Expand Down
32 changes: 32 additions & 0 deletions python/pyarrow/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import shutil
import string
import tempfile
import time
import unittest

import pytest
Expand Down Expand Up @@ -60,6 +61,13 @@ def make_random_csv(num_cols=2, num_rows=10, linesep=u'\r\n'):
return csv, expected


def make_empty_csv(column_names):
csv = io.StringIO()
csv.write(u",".join(column_names))
csv.write(u"\n")
return csv.getvalue().encode()


def test_read_options():
cls = ReadOptions
opts = cls()
Expand Down Expand Up @@ -693,6 +701,30 @@ def test_stress_block_sizes(self):
# Better error output
assert table.to_pydict() == expected.to_pydict()

def test_stress_convert_options_blowup(self):
# ARROW-6481: A convert_options with a very large number of columns
# should not blow memory and CPU time.
try:
clock = time.thread_time
except AttributeError:
clock = time.time
num_columns = 10000
col_names = ["K{0}".format(i) for i in range(num_columns)]
csv = make_empty_csv(col_names)
t1 = clock()
convert_options = ConvertOptions(
column_types={k: pa.string() for k in col_names[::2]})
table = self.read_bytes(csv, convert_options=convert_options)
dt = clock() - t1
# Check that processing time didn't blow up.
# This is a conservative check (it takes less than 300 ms
# in debug mode on my local machine).
assert dt <= 10.0
# Check result
assert table.num_columns == num_columns
assert table.num_rows == 0
assert table.column_names == col_names


class TestSerialCSVRead(BaseTestCSVRead, unittest.TestCase):

Expand Down

0 comments on commit b1025c2

Please sign in to comment.