Skip to content

Commit

Permalink
Change proto
Browse files Browse the repository at this point in the history
  • Loading branch information
drbasic committed Feb 7, 2025
1 parent 1079c06 commit a53ab35
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 81 deletions.
12 changes: 7 additions & 5 deletions cloud/blockstore/config/server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ message TLocation

////////////////////////////////////////////////////////////////////////////////

enum EChecksumFlags
message TChecksumFlags
{
CHECKSUM_FLAGS_NONE = 0;
CHECKSUM_FLAGS_CHECK_FOR_MIRROR = 1;
// If enabled, the data buffer calculates the checksum before and after
// writing. This is done to ensure that the same data is written to all
// mirror disk replicas.
optional bool CheckBufferModificationForMirrorDisk = 1;
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -213,8 +215,8 @@ message TServerConfig
// the parent process die.
optional uint32 VhostServerTimeoutAfterParentExit = 118;

//
optional EChecksumFlags ChecksumFlags = 119;
// Flags for managing checksums.
optional TChecksumFlags ChecksumFlags = 119;
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
3 changes: 2 additions & 1 deletion cloud/blockstore/libs/daemon/common/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ void TBootstrapBase::Init()
auto nbdEndpointListener = CreateNbdEndpointListener(
NbdServer,
Logging,
ServerStats);
ServerStats,
Configs->ServerConfig->GetChecksumFlags());

endpointListeners.emplace(
NProto::IPC_NBD,
Expand Down
15 changes: 12 additions & 3 deletions cloud/blockstore/libs/endpoints_nbd/nbd_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <cloud/blockstore/libs/nbd/server.h>
#include <cloud/blockstore/libs/nbd/server_handler.h>
#include <cloud/blockstore/libs/service/device_handler.h>
#include <cloud/storage/core/libs/common/media.h>
#include <cloud/storage/core/libs/diagnostics/logging.h>

namespace NCloud::NBlockStore::NServer {
Expand All @@ -25,15 +26,18 @@ class TNbdEndpointListener final
const NBD::IServerPtr Server;
const ILoggingServicePtr Logging;
const IServerStatsPtr ServerStats;
const NProto::TChecksumFlags ChecksumFlags;

public:
TNbdEndpointListener(
NBD::IServerPtr server,
ILoggingServicePtr logging,
IServerStatsPtr serverStats)
IServerStatsPtr serverStats,
NProto::TChecksumFlags checksumFlags)
: Server(std::move(server))
, Logging(std::move(logging))
, ServerStats(std::move(serverStats))
, ChecksumFlags(std::move(checksumFlags))
{}

TFuture<NProto::TError> StartEndpoint(
Expand All @@ -48,6 +52,9 @@ class TNbdEndpointListener final
options.BlocksCount = volume.GetBlocksCount();
options.UnalignedRequestsDisabled = request.GetUnalignedRequestsDisabled();
options.SendMinBlockSize = request.GetSendNbdMinBlockSize();
options.UnalignedRequestsDisabled =
ChecksumFlags.GetCheckBufferModificationForMirrorDisk() &&
IsReliableDiskRegistryMediaKind(volume.GetStorageMediaKind());

auto requestFactory = CreateServerHandlerFactory(
CreateDefaultDeviceHandlerFactory(),
Expand Down Expand Up @@ -109,12 +116,14 @@ class TNbdEndpointListener final
IEndpointListenerPtr CreateNbdEndpointListener(
NBD::IServerPtr server,
ILoggingServicePtr logging,
IServerStatsPtr serverStats)
IServerStatsPtr serverStats,
NProto::TChecksumFlags checksumFlags)
{
return std::make_shared<TNbdEndpointListener>(
std::move(server),
std::move(logging),
std::move(serverStats));
std::move(serverStats),
std::move(checksumFlags));
}

} // namespace NCloud::NBlockStore::NServer
4 changes: 3 additions & 1 deletion cloud/blockstore/libs/endpoints_nbd/nbd_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "public.h"

#include <cloud/blockstore/config/server.pb.h>
#include <cloud/blockstore/libs/diagnostics/public.h>
#include <cloud/blockstore/libs/endpoints/public.h>
#include <cloud/blockstore/libs/nbd/public.h>
Expand All @@ -13,6 +14,7 @@ namespace NCloud::NBlockStore::NServer {
IEndpointListenerPtr CreateNbdEndpointListener(
NBD::IServerPtr server,
ILoggingServicePtr logging,
IServerStatsPtr serverStats);
IServerStatsPtr serverStats,
NProto::TChecksumFlags checksumFlags);

} // namespace NCloud::NBlockStore::NServer
22 changes: 5 additions & 17 deletions cloud/blockstore/libs/endpoints_vhost/vhost_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,21 @@ using namespace NThreading;

namespace {

constexpr ui32 ProtoFlag(int value)
{
return value ? 1 << (value - 1) : value;
}

constexpr bool HasFlag(ui32 flags, ui32 value)
{
return flags & ProtoFlag(value);
}

////////////////////////////////////////////////////////////////////////////////

class TVhostEndpointListener final
: public IEndpointListener
{
private:
const NVhost::IServerPtr Server;
const NProto::EChecksumFlags ChecksumFlags;
const NProto::TChecksumFlags ChecksumFlags;

public:
TVhostEndpointListener(
NVhost::IServerPtr server,
NProto::EChecksumFlags checksumFlags)
NProto::TChecksumFlags checksumFlags)
: Server(std::move(server))
, ChecksumFlags(checksumFlags)
, ChecksumFlags(std::move(checksumFlags))
{}

TFuture<NProto::TError> StartEndpoint(
Expand All @@ -52,9 +42,7 @@ class TVhostEndpointListener final
options.VhostQueuesCount = request.GetVhostQueuesCount();
options.UnalignedRequestsDisabled = request.GetUnalignedRequestsDisabled();
options.CheckBufferModificationDuringWriting =
HasFlag(
ChecksumFlags,
NProto::CHECKSUM_FLAGS_CHECK_FOR_MIRROR) &&
ChecksumFlags.GetCheckBufferModificationForMirrorDisk() &&
IsReliableDiskRegistryMediaKind(volume.GetStorageMediaKind());

return Server->StartEndpoint(
Expand Down Expand Up @@ -104,7 +92,7 @@ class TVhostEndpointListener final

IEndpointListenerPtr CreateVhostEndpointListener(
NVhost::IServerPtr server,
NProto::EChecksumFlags checksumFlags)
const NProto::TChecksumFlags& checksumFlags)
{
return std::make_shared<TVhostEndpointListener>(
std::move(server),
Expand Down
2 changes: 1 addition & 1 deletion cloud/blockstore/libs/endpoints_vhost/vhost_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ namespace NCloud::NBlockStore::NServer {

IEndpointListenerPtr CreateVhostEndpointListener(
NVhost::IServerPtr server,
NProto::EChecksumFlags checksumFlags);
const NProto::TChecksumFlags& checksumFlags);

} // namespace NCloud::NBlockStore::NServer
52 changes: 7 additions & 45 deletions cloud/blockstore/libs/server/config.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "config.h"

#include <cloud/storage/core/libs/common/proto_helpers.h>

#include <library/cpp/monlib/service/pages/templates.h>

#include <util/generic/size_literals.h>
Expand Down Expand Up @@ -101,9 +103,7 @@ constexpr TDuration Seconds(int s)
xxx(NodeRegistrationToken, TString, "root@builtin" )\
xxx(EndpointStorageNotImplementedErrorIsFatal, bool, false )\
xxx(VhostServerTimeoutAfterParentExit, TDuration, Seconds(60) )\
xxx(ChecksumFlags, \
NProto::EChecksumFlags, \
NProto::CHECKSUM_FLAGS_NONE )
xxx(ChecksumFlags, NProto::TChecksumFlags, {} )
// BLOCKSTORE_SERVER_CONFIG

#define BLOCKSTORE_SERVER_DECLARE_CONFIG(name, type, value) \
Expand Down Expand Up @@ -154,25 +154,6 @@ TAffinity ConvertValue<TAffinity, NProto::TAffinity>(
return TAffinity(std::move(vec));
}

template <typename T>
bool IsEmpty(const T& t)
{
return !t;
}

template <typename T>
bool IsEmpty(
const google::protobuf::RepeatedPtrField<T>& value)
{
return value.empty();
}

template <>
bool IsEmpty(const NProto::TAffinity& value)
{
return value.GetCPU().empty();
}

template <typename T>
void DumpImpl(const T& t, IOutputStream& os)
{
Expand Down Expand Up @@ -242,26 +223,6 @@ void DumpImpl(
}
}

template <>
void DumpImpl(
const NProto::EChecksumFlags& value,
IOutputStream& os)
{
switch (value) {
case NProto::CHECKSUM_FLAGS_NONE:
os << "CHECKSUM_FLAGS_NONE";
break;
case NProto::CHECKSUM_FLAGS_CHECK_FOR_MIRROR:
os << "CHECKSUM_FLAGS_CHECK_FOR_MIRROR";
break;
default:
os << "(Unknown EChecksumFlags value "
<< static_cast<int>(value)
<< ")";
break;
}
}

} // namespace

////////////////////////////////////////////////////////////////////////////////
Expand All @@ -287,9 +248,10 @@ TServerAppConfig::TServerAppConfig(NProto::TServerAppConfig appConfig)
#define BLOCKSTORE_CONFIG_GETTER(name, type, ...) \
type TServerAppConfig::Get##name() const \
{ \
const auto value = ServerConfig->Get##name(); \
return !IsEmpty(value) ? ConvertValue<type>(value) : Default##name; \
} \
return NCloud::HasField(*ServerConfig, #name) \
? ConvertValue<type>(ServerConfig->Get##name()) \
: Default##name; \
}
// BLOCKSTORE_CONFIG_GETTER

BLOCKSTORE_SERVER_CONFIG(BLOCKSTORE_CONFIG_GETTER)
Expand Down
2 changes: 1 addition & 1 deletion cloud/blockstore/libs/server/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class TServerAppConfig
bool GetEndpointStorageNotImplementedErrorIsFatal() const;
TDuration GetVhostServerTimeoutAfterParentExit() const;
TString GetNodeRegistrationToken() const;
NProto::EChecksumFlags GetChecksumFlags() const;
NProto::TChecksumFlags GetChecksumFlags() const;

void Dump(IOutputStream& out) const override;
void DumpHtml(IOutputStream& out) const override;
Expand Down
19 changes: 12 additions & 7 deletions cloud/blockstore/libs/service/checksum_storage_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ TErrorResponse CreateRequestDestroyedResponse()
return {E_CANCELLED, "request destroyed"};
}

ui32 CalcChecksum(const TGuardedSgList& sgList)
std::optional<ui32> CalcChecksum(const TGuardedSgList& sgList)
{
auto guard = sgList.Acquire();
if (!guard) {
return 0;
return std::nullopt;
}

const TSgList& blockList = guard.Get();
Expand Down Expand Up @@ -115,11 +115,17 @@ TChecksumStorageWrapper::WriteBlocksLocal(
{
auto requestCopy =
std::make_shared<NProto::TWriteBlocksLocalRequest>(*request);
ui32 checksum = CalcChecksum(request->Sglist);
auto checksumBefore = CalcChecksum(request->Sglist);
if (!checksumBefore) {
return MakeFuture<NProto::TWriteBlocksLocalResponse>(
CreateRequestDestroyedResponse());
}

auto result = Storage->WriteBlocksLocal(callContext, std::move(request));

return result.Apply(
[callContext = std::move(callContext),
checksum = checksum,
checksumBefore = *checksumBefore,
requestCopy = std::move(requestCopy),
weakSelf = weak_from_this()](const auto& future) mutable
{
Expand All @@ -128,10 +134,9 @@ TChecksumStorageWrapper::WriteBlocksLocal(
return MakeFuture(std::move(response));
}

ui32 checksumAfter =
checksum ? CalcChecksum(requestCopy->Sglist) : 0;
auto checksumAfter = CalcChecksum(requestCopy->Sglist);

if (checksum != checksumAfter && checksumAfter) {
if (checksumAfter && checksumBefore != *checksumAfter) {
if (auto self = weakSelf.lock()) {
return self->RetryWriteBlocksLocal(
std::move(callContext),
Expand Down
3 changes: 3 additions & 0 deletions example/nbs/nbs-server.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@ ServerConfig {
NbdEnabled: true
VhostEnabled: true
VhostServerPath: "../cloud/blockstore/vhost-server/blockstore-vhost-server"
ChecksumFlags {
CheckBufferModificationForMirrorDisk: true
}
}

0 comments on commit a53ab35

Please sign in to comment.