Skip to content

Commit

Permalink
[CELEBORN-1821][CIP-14] Add controlMessages to cppClient
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR adds ControlMessages to cppClient.

### Why are the changes needed?
The ControlMessages are used to communicate with the CelebornServer and LifecycleManager.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Compilation and UTs.

Closes apache#3052 from HolyLow/issue/celeborn-1821-add-control-messages-to-cpp-client.

Authored-by: HolyLow <[email protected]>
Signed-off-by: mingji <[email protected]>
  • Loading branch information
HolyLow authored and FMX committed Jan 8, 2025
1 parent 2962c11 commit b74e05b
Show file tree
Hide file tree
Showing 10 changed files with 691 additions and 4 deletions.
3 changes: 2 additions & 1 deletion cpp/celeborn/protocol/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ add_library(
protocol
STATIC
PartitionLocation.cpp
TransportMessage.cpp)
TransportMessage.cpp
ControlMessages.cpp)

target_include_directories(protocol PUBLIC ${CMAKE_BINARY_DIR})

Expand Down
182 changes: 182 additions & 0 deletions cpp/celeborn/protocol/ControlMessages.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "celeborn/protocol/ControlMessages.h"
#include "celeborn/utils/CelebornUtils.h"

namespace celeborn {
namespace protocol {
TransportMessage GetReducerFileGroup::toTransportMessage() const {
MessageType type = GET_REDUCER_FILE_GROUP;
PbGetReducerFileGroup pb;
pb.set_shuffleid(shuffleId);
std::string payload = pb.SerializeAsString();
return TransportMessage(type, std::move(payload));
}

std::unique_ptr<GetReducerFileGroupResponse>
GetReducerFileGroupResponse::fromTransportMessage(
const TransportMessage& transportMessage) {
CELEBORN_CHECK(
transportMessage.type() == GET_REDUCER_FILE_GROUP_RESPONSE,
"transportMessageType mismatch");
auto payload = transportMessage.payload();
auto pbGetReducerFileGroupResponse =
utils::parseProto<PbGetReducerFileGroupResponse>(
reinterpret_cast<const uint8_t*>(payload.c_str()), payload.size());
auto response = std::make_unique<GetReducerFileGroupResponse>();
response->status = toStatusCode(pbGetReducerFileGroupResponse->status());
auto fileGroups = pbGetReducerFileGroupResponse->filegroups();
for (auto& kv : fileGroups) {
auto& fileGroup = response->fileGroups[kv.first];
// Legacy mode is deprecated.
CELEBORN_CHECK_EQ(
kv.second.locations().size(),
0,
"legecy PartitionLocation pb is deprecated");
// Packed mode: must use packedPartitionLocations.
auto& pbPackedPartitionLocationsPair = kv.second.partitionlocationspair();
int inputLocationSize = pbPackedPartitionLocationsPair.inputlocationsize();
auto& pbPackedPartitionLocations =
pbPackedPartitionLocationsPair.locations();
std::vector<std::unique_ptr<PartitionLocation>> partialLocations;
auto& pbIds = pbPackedPartitionLocations.ids();
for (int idx = 0; idx < pbIds.size(); idx++) {
partialLocations.push_back(
PartitionLocation::fromPackedPb(pbPackedPartitionLocations, idx));
}
for (int idx = 0; idx < inputLocationSize; idx++) {
auto replicaIdx = pbPackedPartitionLocationsPair.peerindexes(idx);
// has peer
if (replicaIdx != INT_MAX) {
CELEBORN_CHECK_GE(replicaIdx, inputLocationSize);
CELEBORN_CHECK_LT(replicaIdx, partialLocations.size());
auto location = std::move(partialLocations[idx]);
auto peerLocation = std::move(partialLocations[replicaIdx]);
// make sure the location is primary and peer location is replica
if (location->mode == PartitionLocation::Mode::REPLICA) {
std::swap(location, peerLocation);
}
CELEBORN_CHECK(location->mode == PartitionLocation::Mode::PRIMARY);
CELEBORN_CHECK(peerLocation->mode == PartitionLocation::Mode::REPLICA);
location->replicaPeer = std::move(peerLocation);
fileGroup.insert(std::move(location));
}
// has no peer
else {
fileGroup.insert(std::move(partialLocations[idx]));
}
}
}
auto attempts = pbGetReducerFileGroupResponse->attempts();
response->attempts.reserve(attempts.size());
for (auto attempt : attempts) {
response->attempts.push_back(attempt);
}
auto partitionIds = pbGetReducerFileGroupResponse->partitionids();
for (auto partitionId : partitionIds) {
response->partitionIds.insert(partitionId);
}
return std::move(response);
}

OpenStream::OpenStream(
const std::string& shuffleKey,
const std::string& filename,
int32_t startMapIndex,
int32_t endMapIndex)
: shuffleKey(shuffleKey),
filename(filename),
startMapIndex(startMapIndex),
endMapIndex(endMapIndex) {}

TransportMessage OpenStream::toTransportMessage() const {
MessageType type = OPEN_STREAM;
PbOpenStream pb;
pb.set_shufflekey(shuffleKey);
pb.set_filename(filename);
pb.set_startindex(startMapIndex);
pb.set_endindex(endMapIndex);
std::string payload = pb.SerializeAsString();
return TransportMessage(type, std::move(payload));
}

std::unique_ptr<StreamHandler> StreamHandler::fromTransportMessage(
const TransportMessage& transportMessage) {
CELEBORN_CHECK(
transportMessage.type() == STREAM_HANDLER,
"transportMessageType should be STREAM_HANDLER");
auto payload = transportMessage.payload();
auto pbStreamHandler = utils::parseProto<PbStreamHandler>(
reinterpret_cast<const uint8_t*>(payload.c_str()), payload.size());
auto streamHandler = std::make_unique<StreamHandler>();
streamHandler->streamId = pbStreamHandler->streamid();
streamHandler->numChunks = pbStreamHandler->numchunks();
for (auto chunkOffset : pbStreamHandler->chunkoffsets()) {
streamHandler->chunkOffsets.push_back(chunkOffset);
}
streamHandler->fullPath = pbStreamHandler->fullpath();
return std::move(streamHandler);
}

std::unique_ptr<PbStreamChunkSlice> StreamChunkSlice::toProto() const {
auto pb = std::make_unique<PbStreamChunkSlice>();
pb->set_streamid(streamId);
pb->set_chunkindex(chunkIndex);
pb->set_offset(offset);
pb->set_len(len);
return std::move(pb);
}

StreamChunkSlice StreamChunkSlice::decodeFrom(
memory::ReadOnlyByteBuffer& data) {
CELEBORN_CHECK_GE(data.remainingSize(), 20);
StreamChunkSlice slice;
slice.streamId = data.read<long>();
slice.chunkIndex = data.read<int>();
slice.offset = data.read<int>();
slice.len = data.read<int>();
return slice;
}

size_t StreamChunkSlice::Hasher::operator()(const StreamChunkSlice& lhs) const {
const auto hashStreamId = std::hash<long>()(lhs.streamId);
const auto hashChunkIndex = std::hash<int>()(lhs.chunkIndex) << 1;
const auto hashOffset = std::hash<int>()(lhs.offset) << 2;
const auto hashLen = std::hash<int>()(lhs.len) << 3;
return hashStreamId ^ hashChunkIndex ^ hashOffset ^ hashLen;
}

TransportMessage ChunkFetchRequest::toTransportMessage() const {
MessageType type = CHUNK_FETCH_REQUEST;
PbChunkFetchRequest pb;
pb.unsafe_arena_set_allocated_streamchunkslice(
streamChunkSlice.toProto().release());
std::string payload = pb.SerializeAsString();
return TransportMessage(type, std::move(payload));
}

TransportMessage BufferStreamEnd::toTransportMessage() const {
MessageType type = BUFFER_STREAM_END;
PbBufferStreamEnd pb;
pb.set_streamtype(ChunkStream);
pb.set_streamid(streamId);
std::string payload = pb.SerializeAsString();
return TransportMessage(type, std::move(payload));
}
} // namespace protocol
} // namespace celeborn
107 changes: 107 additions & 0 deletions cpp/celeborn/protocol/ControlMessages.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <map>
#include <set>

#include "celeborn/protocol/PartitionLocation.h"
#include "celeborn/protocol/StatusCode.h"
#include "celeborn/protocol/TransportMessage.h"

namespace celeborn {
namespace protocol {
struct GetReducerFileGroup {
int shuffleId;

TransportMessage toTransportMessage() const;
};

struct GetReducerFileGroupResponse {
StatusCode status;
std::map<int, std::set<std::shared_ptr<const PartitionLocation>>> fileGroups;
std::vector<int> attempts;
std::set<int> partitionIds;

static std::unique_ptr<GetReducerFileGroupResponse> fromTransportMessage(
const TransportMessage& transportMessage);
};

struct OpenStream {
std::string shuffleKey;
std::string filename;
int32_t startMapIndex;
int32_t endMapIndex;

OpenStream(
const std::string& shuffleKey,
const std::string& filename,
int32_t startMapIndex,
int32_t endMapIndex);

TransportMessage toTransportMessage() const;
};

struct StreamHandler {
int64_t streamId;
int32_t numChunks;
std::vector<int64_t> chunkOffsets;
std::string fullPath;

static std::unique_ptr<StreamHandler> fromTransportMessage(
const TransportMessage& transportMessage);
};

struct StreamChunkSlice {
long streamId;
int chunkIndex;
int offset{0};
int len{INT_MAX};

std::unique_ptr<PbStreamChunkSlice> toProto() const;

static StreamChunkSlice decodeFrom(memory::ReadOnlyByteBuffer& data);

std::string toString() const {
return std::to_string(streamId) + "-" + std::to_string(chunkIndex) + "-" +
std::to_string(offset) + "-" + std::to_string(len);
}

bool operator==(const StreamChunkSlice& rhs) const {
return streamId == rhs.streamId && chunkIndex == rhs.chunkIndex &&
offset == rhs.offset && len == rhs.len;
}

struct Hasher {
size_t operator()(const StreamChunkSlice& lhs) const;
};
};

struct ChunkFetchRequest {
StreamChunkSlice streamChunkSlice;

TransportMessage toTransportMessage() const;
};

struct BufferStreamEnd {
long streamId;

TransportMessage toTransportMessage() const;
};
} // namespace protocol
} // namespace celeborn
31 changes: 31 additions & 0 deletions cpp/celeborn/protocol/PartitionLocation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,37 @@ std::unique_ptr<const PartitionLocation> PartitionLocation::fromPb(
return std::move(result);
}


std::unique_ptr<PartitionLocation> PartitionLocation::fromPackedPb(
const PbPackedPartitionLocations& pb,
int idx) {
auto& workerIdStr = pb.workeridsset(pb.workerids(idx));
auto workerIdParts = utils::parseColonSeparatedHostPorts(workerIdStr, 4);
std::string filePath = pb.filepaths(idx);
if (!filePath.empty()) {
filePath = pb.mountpointsset(pb.mountpoints(idx)) + pb.filepaths(idx);
}

auto result = std::make_unique<PartitionLocation>();
result->id = pb.ids(idx);
result->epoch = pb.epoches(idx);
result->host = workerIdParts[0];
result->rpcPort = utils::strv2val<int>(workerIdParts[1]);
result->pushPort = utils::strv2val<int>(workerIdParts[2]);
result->fetchPort = utils::strv2val<int>(workerIdParts[3]);
result->replicatePort = utils::strv2val<int>(workerIdParts[4]);
result->mode = static_cast<Mode>(pb.modes(idx));
result->replicaPeer = nullptr;
result->storageInfo = std::make_unique<StorageInfo>();
result->storageInfo->type = static_cast<StorageInfo::Type>(pb.types(idx));
result->storageInfo->mountPoint = pb.mountpointsset(pb.mountpoints(idx));
result->storageInfo->finalResult = pb.finalresult(idx);
result->storageInfo->filePath = filePath;
result->storageInfo->availableStorageTypes = pb.availablestoragetypes(idx);

return std::move(result);
}

PartitionLocation::PartitionLocation(const PartitionLocation& other)
: id(other.id),
epoch(other.epoch),
Expand Down
4 changes: 4 additions & 0 deletions cpp/celeborn/protocol/PartitionLocation.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ struct PartitionLocation {
static std::unique_ptr<const PartitionLocation> fromPb(
const PbPartitionLocation& pb);

static std::unique_ptr<PartitionLocation> fromPackedPb(
const PbPackedPartitionLocations& pb,
int idx);

PartitionLocation() = default;

PartitionLocation(const PartitionLocation& other);
Expand Down
3 changes: 2 additions & 1 deletion cpp/celeborn/protocol/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
add_executable(
celeborn_protocol_test
PartitionLocationTest.cpp
TransportMessageTest.cpp)
TransportMessageTest.cpp
ControlMessagesTest.cpp)

add_test(NAME celeborn_protocol_test COMMAND celeborn_protocol_test)

Expand Down
Loading

0 comments on commit b74e05b

Please sign in to comment.