Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](inverted index)Fix load data coredump when encounter exception #45733

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -801,10 +801,8 @@ bool DorisRAMCompoundDirectory::doDeleteFile(const char* name) {
SCOPED_LOCK_MUTEX(this->THIS_LOCK);
sizeInBytes -= itr->second->sizeInBytes;
filesMap->removeitr(itr);
return true;
} else {
return false;
}
return true;
}

bool DorisRAMCompoundDirectory::deleteDirectory() {
Expand Down
12 changes: 8 additions & 4 deletions be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,8 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {

void close_on_error() override {
try {
if (_index_writer) {
_index_writer->close();
}
// delete index file must be done before close index writer
// because index writer will close the directory, the files in RAMDirectory will be cleared.
if (_dir) {
_dir->deleteDirectory();
io::Path cfs_path(_dir->getCfsDirName());
Expand All @@ -134,6 +133,9 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
DorisCompoundDirectory::COMPOUND_FILE_EXTENSION;
_dir->deleteFile(idx_name.c_str());
}
if (_index_writer) {
_index_writer->close();
}
} catch (CLuceneError& e) {
LOG(ERROR) << "InvertedIndexWriter close_on_error failure: " << e.what();
}
Expand Down Expand Up @@ -676,6 +678,9 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
std::unique_ptr<lucene::document::Document> _doc {};
lucene::document::Field* _field {};
bool _single_field = true;
// Since _index_writer's write.lock is created by _dir.lockFactory,
// _dir must destruct after _index_writer, so _dir must be defined before _index_writer.
std::unique_ptr<DorisCompoundDirectory> _dir;
std::unique_ptr<lucene::index::IndexWriter> _index_writer {};
std::unique_ptr<lucene::analysis::Analyzer> _analyzer {};
std::unique_ptr<lucene::util::Reader> _char_string_reader {};
Expand All @@ -687,7 +692,6 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
const TabletIndex* _index_meta;
InvertedIndexParserType _parser_type;
std::wstring _field_name;
std::unique_ptr<DorisCompoundDirectory> _dir;
uint32_t _ignore_above;
};

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,32 @@
suite("test_index_compound_directory_failure_injection", "nonConcurrent") {
// define a sql table
def testTable_dup = "httplogs_dup_compound"
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);

def set_be_config = { key, value ->
for (String backend_id: backendId_to_backendIP.keySet()) {
def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value)
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
}
}

def check_config = { String key, String value ->
for (String backend_id: backendId_to_backendIP.keySet()) {
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List
for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == key) {
assertEquals(value, ((List<String>) ele)[2])
}
}
}
}

def create_httplogs_dup_table = {testTablex ->
// multi-line sql
Expand Down Expand Up @@ -85,74 +111,111 @@ suite("test_index_compound_directory_failure_injection", "nonConcurrent") {
}
}

try {
sql "DROP TABLE IF EXISTS ${testTable_dup}"
create_httplogs_dup_table.call(testTable_dup)

try {
GetDebugPoint().enableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor")
load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor")
}
qt_sql "select COUNT() from ${testTable_dup} where request match 'images'"
try {
GetDebugPoint().enableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close")
load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close")
}
qt_sql "select COUNT() from ${testTable_dup} where request match 'images'"
try {
GetDebugPoint().enableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._set_writer_finalize_status_error")
load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._set_writer_finalize_status_error")
}
qt_sql "select COUNT() from ${testTable_dup} where request match 'images'"
try {
GetDebugPoint().enableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._set_writer_close_status_error")
load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._set_writer_close_status_error")
}
qt_sql "select COUNT() from ${testTable_dup} where request match 'images'"
def run_test = { String is_enable ->
boolean invertedIndexRAMDirEnable = false
boolean has_update_be_config = false
try {
def test_index_compound_directory = "test_index_compound_directory1"
sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
create_httplogs_dup_table.call(test_index_compound_directory)
GetDebugPoint().enableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer")
load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json')
qt_sql "select COUNT() from ${test_index_compound_directory} where request match 'gif'"
try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
} catch(Exception ex) {
logger.info("_mock_append_data_error_in_fsindexoutput_flushBuffer, result: " + ex)
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer")
}
try {
def test_index_compound_directory = "test_index_compound_directory2"
sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
create_httplogs_dup_table.call(test_index_compound_directory)
GetDebugPoint().enableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer")
load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json')
qt_sql "select COUNT() from ${test_index_compound_directory} where request match 'images'"
try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer")
}
try {
def test_index_compound_directory = "test_index_compound_directory3"
sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
create_httplogs_dup_table.call(test_index_compound_directory)
GetDebugPoint().enableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init")
load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json')
qt_sql "select COUNT() from ${test_index_compound_directory} where request match 'png'"
try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
String backend_id;
backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))

logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List

for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == "inverted_index_ram_dir_enable") {
invertedIndexRAMDirEnable = Boolean.parseBoolean(((List<String>) ele)[2])
logger.info("inverted_index_ram_dir_enable: ${((List<String>) ele)[2]}")
}
}
set_be_config.call("inverted_index_ram_dir_enable", is_enable)
has_update_be_config = true
// check updated config
check_config.call("inverted_index_ram_dir_enable", is_enable);


sql "DROP TABLE IF EXISTS ${testTable_dup}"
create_httplogs_dup_table.call(testTable_dup)

def res = sql "select COUNT() from ${testTable_dup} where request match 'images'"
assertEquals(0, res[0][0])
try {
GetDebugPoint().enableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close")
load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close")
}
res = sql "select COUNT() from ${testTable_dup} where request match 'images'"
assertEquals(0, res[0][0])
try {
GetDebugPoint().enableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._set_writer_finalize_status_error")
load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._set_writer_finalize_status_error")
}
res = sql "select COUNT() from ${testTable_dup} where request match 'images'"
assertEquals(0, res[0][0])
try {
GetDebugPoint().enableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._set_writer_close_status_error")
load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._set_writer_close_status_error")
}
res = sql "select COUNT() from ${testTable_dup} where request match 'images'"
assertEquals(0, res[0][0])
try {
def test_index_compound_directory = "test_index_compound_directory1"
sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
create_httplogs_dup_table.call(test_index_compound_directory)
GetDebugPoint().enableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer")
load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json')
res = sql "select COUNT() from ${test_index_compound_directory} where request match 'gif'"
assertEquals(702, res[0][0])
// try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
} catch(Exception ex) {
if (is_enable == "false") {
assertTrue(ex.toString().contains("failed to initialize storage reader"))
} else {
fail("unexpected exception: " + ex)
}
logger.info("_mock_append_data_error_in_fsindexoutput_flushBuffer, result: " + ex)
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer")
}
try {
def test_index_compound_directory = "test_index_compound_directory2"
sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
create_httplogs_dup_table.call(test_index_compound_directory)
GetDebugPoint().enableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer")
load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json')
res = sql "select COUNT() from ${test_index_compound_directory} where request match 'images'"
assertEquals(0, res[0][0])
// try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer")
}
try {
def test_index_compound_directory = "test_index_compound_directory3"
sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
create_httplogs_dup_table.call(test_index_compound_directory)
GetDebugPoint().enableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init")
load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json')
res = sql "select COUNT() from ${test_index_compound_directory} where request match 'png'"
assertEquals(0, res[0][0])
// try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init")
}
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init")
if (has_update_be_config) {
set_be_config.call("inverted_index_ram_dir_enable", invertedIndexRAMDirEnable.toString())
}
}
} finally {
//try_sql("DROP TABLE IF EXISTS ${testTable}")
}

run_test.call("false")
run_test.call("true")
}
Loading