Skip to content

Commit

Permalink
[native] Advance velox.
Browse files Browse the repository at this point in the history
  • Loading branch information
amitkdutta committed Jan 25, 2025
1 parent f3342e8 commit 13cb729
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -424,12 +424,14 @@ std::unique_ptr<exec::SerializedPage> waitForNextPage(
const std::shared_ptr<exec::ExchangeQueue>& queue) {
bool atEnd;
facebook::velox::ContinueFuture future;
auto pages = queue->dequeueLocked(1, &atEnd, &future);
ContinuePromise stalePromise = ContinuePromise::makeEmpty();
auto pages = queue->dequeueLocked(0, 1, &atEnd, &future, &stalePromise);
EXPECT_LE(pages.size(), 1);
EXPECT_FALSE(atEnd);
if (pages.empty()) {
std::move(future).get();
pages = queue->dequeueLocked(1, &atEnd, &future);
ContinuePromise stalePromise = ContinuePromise::makeEmpty();
pages = queue->dequeueLocked(0, 1, &atEnd, &future, &stalePromise);
EXPECT_EQ(pages.size(), 1);
}
return std::move(pages.front());
Expand All @@ -438,11 +440,13 @@ std::unique_ptr<exec::SerializedPage> waitForNextPage(
void waitForEndMarker(const std::shared_ptr<exec::ExchangeQueue>& queue) {
bool atEnd;
facebook::velox::ContinueFuture future;
auto pages = queue->dequeueLocked(1, &atEnd, &future);
ContinuePromise stalePromise = ContinuePromise::makeEmpty();
auto pages = queue->dequeueLocked(0, 1, &atEnd, &future, &stalePromise);
ASSERT_TRUE(pages.empty());
if (!atEnd) {
std::move(future).get();
pages = queue->dequeueLocked(1, &atEnd, &future);
ContinuePromise stalePromise = ContinuePromise::makeEmpty();
pages = queue->dequeueLocked(0, 1, &atEnd, &future, &stalePromise);
ASSERT_TRUE(pages.empty());
ASSERT_TRUE(atEnd);
}
Expand Down Expand Up @@ -525,7 +529,7 @@ class PrestoExchangeSourceTest : public ::testing::TestWithParam<Params> {
}

std::shared_ptr<exec::ExchangeQueue> makeSingleSourceQueue() {
auto queue = std::make_shared<exec::ExchangeQueue>();
auto queue = std::make_shared<exec::ExchangeQueue>(1, 0);
queue->addSourceLocked();
queue->noMoreSources();
return queue;
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/velox
Submodule velox updated 130 files

0 comments on commit 13cb729

Please sign in to comment.