Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 2973: add different notification levels to publish migration states for mirror disks #2987

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -423,21 +423,25 @@ void TDiskRegistryState::AllowNotifications(
const TDiskId& diskId,
const TDiskState& disk)
{
// currently we don't want to notify users about mirrored disks since they are not
// supposed to break
if (disk.MasterDiskId) {
return;
}

// We do not want to notify the user about the breakdowns of the shadow disks.
if (disk.CheckpointReplica.GetCheckpointId()) {
return;
}

// currently we don't want to notify users about mirrored disks errors since
// they are not supposed to break
if (disk.MasterDiskId || IsReliableDiskRegistryMediaKind(disk.MediaKind)) {
vladstepanyuk marked this conversation as resolved.
Show resolved Hide resolved
NotificationSystem.AllowNotifications(
diskId,
NDiskRegistry::ENotificationLevel::InfoNotifications);
return;
}

Y_DEBUG_ABORT_UNLESS(IsDiskRegistryMediaKind(disk.MediaKind));
if (!IsReliableDiskRegistryMediaKind(disk.MediaKind)) {
NotificationSystem.AllowNotifications(diskId);
NotificationSystem.AllowNotifications(
diskId,
NDiskRegistry::ENotificationLevel::AllNotifications);
}
}

Expand Down Expand Up @@ -4838,7 +4842,10 @@ void TDiskRegistryState::AddUserNotification(
TDiskRegistryDatabase& db,
NProto::TUserNotification notification)
{
NotificationSystem.AddUserNotification(db, std::move(notification));
NotificationSystem.AddUserNotification(
db,
std::move(notification),
NDiskRegistry::ENotificationLevel::AllNotifications);
}

void TDiskRegistryState::DeleteUserNotification(
Expand Down Expand Up @@ -5498,9 +5505,11 @@ bool TDiskRegistryState::TryUpdateDiskState(

UpdateAndReallocateDisk(db, diskId, disk);

const auto& diskIdToNotify = disk.MasterDiskId ? disk.MasterDiskId : diskId;

NotificationSystem.OnDiskStateChanged(
db,
diskId,
diskIdToNotify,
oldState,
newState,
timestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ NProto::TUserNotification MakeBlankNotification(
return notif;
}

bool MsgSupportedByNotificationLevel(
ENotificationLevel msgLevel,
ENotificationLevel supportedLevel)
{
return msgLevel <= supportedLevel;
}

} // namespace

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -90,11 +97,23 @@ void TNotificationSystem::PullInUserNotifications(
}
}

bool TNotificationSystem::IsNotificationSupported(const TDiskId& diskId, ENotificationLevel level)
{
auto* supportedLevel = SupportsNotifications.FindPtr(diskId);
if (!supportedLevel) {
return false;
}

return MsgSupportedByNotificationLevel(level, *supportedLevel);
}

////////////////////////////////////////////////////////////////////////////////

void TNotificationSystem::AllowNotifications(const TDiskId& diskId)
void TNotificationSystem::AllowNotifications(
const TDiskId& diskId,
ENotificationLevel notificationLevel)
{
SupportsNotifications.insert(diskId);
SupportsNotifications[diskId] = notificationLevel;
}

void TNotificationSystem::DeleteDisk(
Expand All @@ -118,12 +137,13 @@ void TNotificationSystem::DeleteDisk(

void TNotificationSystem::AddUserNotification(
TDiskRegistryDatabase& db,
NProto::TUserNotification notification)
NProto::TUserNotification notification,
ENotificationLevel level)
{
const auto& id = GetEntityId(notification);

// Note: Only disk events supported at the moment
if (!SupportsNotifications.contains(id)) {
if (IsNotificationSupported(id, level)) {
return;
}

Expand Down Expand Up @@ -290,21 +310,33 @@ void TNotificationSystem::OnDiskStateChanged(
db.UpdateDiskState(diskState, seqNo);
db.WriteLastDiskStateSeqNo(DiskStateSeqNo);

ENotificationLevel diskStateChangeNotificationLevel =
ENotificationLevel::InfoNotifications;
if (newState >= NProto::DISK_STATE_TEMPORARILY_UNAVAILABLE) {
if (oldState < NProto::DISK_STATE_TEMPORARILY_UNAVAILABLE) {
diskStateChangeNotificationLevel =
ENotificationLevel::AllNotifications;
auto notif = MakeBlankNotification(seqNo, timestamp);
notif.MutableDiskError()->SetDiskId(diskId);
AddUserNotification(db, std::move(notif));
}
AddUserNotification(
db,
std::move(notif),
diskStateChangeNotificationLevel);
}
} else {
if (oldState >= NProto::DISK_STATE_TEMPORARILY_UNAVAILABLE) {
diskStateChangeNotificationLevel =
ENotificationLevel::AllNotifications;
auto notif = MakeBlankNotification(seqNo, timestamp);
notif.MutableDiskBackOnline()->SetDiskId(diskId);
AddUserNotification(db, std::move(notif));
AddUserNotification(
db,
std::move(notif),
diskStateChangeNotificationLevel);
}
}

if (SupportsNotifications.contains(diskId)) {
if (IsNotificationSupported(diskId, diskStateChangeNotificationLevel)) {
DiskStateUpdates.emplace_back(std::move(diskState), seqNo);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ namespace NCloud::NBlockStore::NStorage::NDiskRegistry {

////////////////////////////////////////////////////////////////////////////////

enum class ENotificationLevel
{
InfoNotifications = 0,
vladstepanyuk marked this conversation as resolved.
Show resolved Hide resolved
AllNotifications = 1,
};

class TNotificationSystem
{
using TDiskId = TString;
Expand Down Expand Up @@ -33,7 +39,7 @@ class TNotificationSystem
private:
const TStorageConfigPtr StorageConfig;

THashSet<TDiskId> SupportsNotifications;
THashMap<TDiskId, ENotificationLevel> SupportsNotifications;

// notify users
TUserNotifications UserNotifications;
Expand All @@ -60,12 +66,15 @@ class TNotificationSystem
ui64 diskStateSeqNo,
TVector<TDiskId> outdatedVolumes);

void AllowNotifications(const TDiskId& diskId);
void AllowNotifications(
const TDiskId& diskId,
ENotificationLevel notificationLevel);
void DeleteDisk(TDiskRegistryDatabase& db, const TDiskId& diskId);

void AddUserNotification(
TDiskRegistryDatabase& db,
NProto::TUserNotification notification);
NProto::TUserNotification notification,
ENotificationLevel level);

void DeleteUserNotification(
TDiskRegistryDatabase& db,
Expand Down Expand Up @@ -122,6 +131,10 @@ class TNotificationSystem
void PullInUserNotifications(
TVector<TString> errorNotifications,
TVector<NProto::TUserNotification> userNotifications);

bool IsNotificationSupported(
const TDiskId& diskId,
ENotificationLevel level);
};

} // namespace NCloud::NBlockStore::NStorage::NDiskRegistry
Original file line number Diff line number Diff line change
Expand Up @@ -11167,12 +11167,18 @@ Y_UNIT_TEST_SUITE(TDiskRegistryStateTest)
online.MutableDiskBackOnline()->SetDiskId(diskId);

ui64 seqNo = 0;
auto add = [&state, &db, &seqNo] (auto notif) {
auto add = [&state, &db, &seqNo](auto notif)
{
notif.SetSeqNo(++seqNo);
state.AddUserNotification(db, std::move(notif));
state.AddUserNotification(
db,
std::move(notif),
NDiskRegistry::ENotificationLevel::AllNotifications);
};

state.AllowNotifications(diskId);
state.AllowNotifications(
diskId,
NDiskRegistry::ENotificationLevel::AllNotifications);

add(error);
add(error);
Expand Down Expand Up @@ -11219,6 +11225,89 @@ Y_UNIT_TEST_SUITE(TDiskRegistryStateTest)
});
}

Y_UNIT_TEST(ShouldSupportDifferentNotificationLevels)
{
NDiskRegistry::TNotificationSystem state(
CreateStorageConfig(CreateDefaultStorageConfigProto()),
{},
{},
{},
{},
0,
{});

TTestExecutor executor;
executor.WriteTx([](TDiskRegistryDatabase db) { db.InitSchema(); });

const TString diskId = "disk";
ui64 seqNo = 0;

auto add = [&state, &seqNo](auto& db, auto notif)
{
notif.SetSeqNo(++seqNo);
state.AddUserNotification(
db,
std::move(notif),
NDiskRegistry::ENotificationLevel::AllNotifications);
};

auto addInfo = [&state, &seqNo](auto& db, auto notif)
{
notif.SetSeqNo(++seqNo);
state.AddUserNotification(
db,
std::move(notif),
NDiskRegistry::ENotificationLevel::InfoNotifications);
};

executor.WriteTx(
[&state, &diskId](TDiskRegistryDatabase db)
{
NProto::TUserNotification error;
error.MutableDiskError()->SetDiskId(diskId);

NProto::TUserNotification online;
online.MutableDiskBackOnline()->SetDiskId(diskId);

state.AllowNotifications(
diskId,
NDiskRegistry::ENotificationLevel::InfoNotifications);

add(db, error);

addInfo(db, online);
});

TVector<NProto::TUserNotification> userNotifications;
state.GetUserNotifications(userNotifications);

UNIT_ASSERT_VALUES_EQUAL(1, userNotifications.size());
UNIT_ASSERT(userNotifications[0].HasDiskBackOnline());

executor.WriteTx(
[&state, &diskId](TDiskRegistryDatabase db)
{
NProto::TUserNotification error;
error.MutableDiskError()->SetDiskId(diskId);

state.AllowNotifications(
diskId,
NDiskRegistry::ENotificationLevel::AllNotifications);

add(db, error);
});

TVector<NProto::TUserNotification> userNotifications;
state.GetUserNotifications(userNotifications);
SortBy(
userNotifications,
[](const auto& notif) { return notif.GetSeqNo(); });

UNIT_ASSERT_VALUES_EQUAL(2, userNotifications.size());
UNIT_ASSERT(userNotifications[0].HasDiskBackOnline());
UNIT_ASSERT(userNotifications[1].HasDiskError());
}

Y_UNIT_TEST(ShouldPullInLegacyDiskErrorUserNotifications)
{
auto state = TDiskRegistryStateBuilder()
Expand Down
Loading