Skip to content

Commit

Permalink
[FabricClient] Support 'register service notification filter' api (#60)
Browse files Browse the repository at this point in the history
Partial impl for #57 
FabricClient can register the service notification filter and unregister
it.
See dotnet
[documentation](https://learn.microsoft.com/en-us/dotnet/api/system.fabric.fabricclient.servicemanagementclient.registerservicenotificationfilterasync?view=azure-dotnet)
The registration mechanism is used inside the dotnet implementation of
[servicepartitionresolver](https://learn.microsoft.com/en-us/dotnet/api/microsoft.servicefabric.services.client.servicepartitionresolver?view=azure-dotnet)

The registered service to get notification will no longer need to pass
the previous endpoint resolve result to force a refresh. This is tested
in the stateful example app. But the notification seems to be not very
fast (lag of a couple of seconds). But this notification utilizes
FabricClient cache, and reduces the cluster api load.
I will compare with direct resolution next.
youyuanwu authored Aug 13, 2024
1 parent d6657c5 commit 4b303e2
Showing 9 changed files with 535 additions and 101 deletions.
2 changes: 1 addition & 1 deletion crates/libs/core/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ mod tests;
// FabricClient safe wrapper
// The design of FabricClient follows from the csharp client:
// https://github.com/microsoft/service-fabric/blob/master/src/prod/src/managed/Api/src/System/Fabric/FabricClient.cs

#[derive(Debug, Clone)]
pub struct FabricClient {
com_property_client: IFabricPropertyManagementClient2,
com_service_client: IFabricServiceManagementClient6,
96 changes: 88 additions & 8 deletions crates/libs/core/src/client/svc_mgmt_client.rs
Original file line number Diff line number Diff line change
@@ -12,19 +12,22 @@ use mssf_com::{
FABRIC_PARTITION_KEY_TYPE_INVALID, FABRIC_PARTITION_KEY_TYPE_NONE,
FABRIC_PARTITION_KEY_TYPE_STRING, FABRIC_REMOVE_REPLICA_DESCRIPTION,
FABRIC_RESOLVED_SERVICE_ENDPOINT, FABRIC_RESTART_REPLICA_DESCRIPTION,
FABRIC_SERVICE_ENDPOINT_ROLE, FABRIC_SERVICE_PARTITION_KIND,
FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE, FABRIC_SERVICE_PARTITION_KIND_INVALID,
FABRIC_SERVICE_PARTITION_KIND_NAMED, FABRIC_SERVICE_PARTITION_KIND_SINGLETON,
FABRIC_SERVICE_ROLE_INVALID, FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY,
FABRIC_SERVICE_ROLE_STATEFUL_SECONDARY, FABRIC_SERVICE_ROLE_STATELESS, FABRIC_URI,
FABRIC_SERVICE_ENDPOINT_ROLE, FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION,
FABRIC_SERVICE_PARTITION_KIND, FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE,
FABRIC_SERVICE_PARTITION_KIND_INVALID, FABRIC_SERVICE_PARTITION_KIND_NAMED,
FABRIC_SERVICE_PARTITION_KIND_SINGLETON, FABRIC_SERVICE_ROLE_INVALID,
FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY, FABRIC_SERVICE_ROLE_STATEFUL_SECONDARY,
FABRIC_SERVICE_ROLE_STATELESS, FABRIC_URI,
},
};
use windows_core::{HSTRING, PCWSTR};

use crate::{
iter::{FabricIter, FabricListAccessor},
sync::{fabric_begin_end_proxy, FabricReceiver},
types::{RemoveReplicaDescription, RestartReplicaDescription},
types::{
RemoveReplicaDescription, RestartReplicaDescription, ServiceNotificationFilterDescription,
},
};

// Service Management Client
@@ -89,6 +92,40 @@ impl ServiceManagementClient {
move |ctx| unsafe { com2.EndRemoveReplica(ctx) },
)
}

fn register_service_notification_filter_internal(
&self,
desc: &FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION,
timeout_milliseconds: u32,
) -> FabricReceiver<crate::Result<i64>> {
let com1 = &self.com;
let com2 = self.com.clone();
fabric_begin_end_proxy(
move |callback| unsafe {
com1.BeginRegisterServiceNotificationFilter(desc, timeout_milliseconds, callback)
},
move |ctx| unsafe { com2.EndRegisterServiceNotificationFilter(ctx) },
)
}

fn unregister_service_notification_filter_internal(
&self,
filterid: i64,
timeout_milliseconds: u32,
) -> FabricReceiver<crate::Result<()>> {
let com1 = &self.com;
let com2 = self.com.clone();
fabric_begin_end_proxy(
move |callback| unsafe {
com1.BeginUnregisterServiceNotificationFilter(
filterid,
timeout_milliseconds,
callback,
)
},
move |ctx| unsafe { com2.EndUnregisterServiceNotificationFilter(ctx) },
)
}
}

// public implementation block
@@ -141,6 +178,8 @@ impl ServiceManagementClient {
/// This API gives a running replica the chance to cleanup its state and be gracefully shutdown.
/// WARNING: There are no safety checks performed when this API is used.
/// Incorrect use of this API can lead to data loss for stateful services.
/// Remarks:
/// For stateless services, Instance Abort is called.
pub async fn remove_replica(
&self,
desc: &RemoveReplicaDescription,
@@ -150,6 +189,47 @@ impl ServiceManagementClient {
self.remove_replica_internal(&raw, timeout.as_millis() as u32)
.await
}

/// Remarks:
/// There is a cache of service endpoints in the client that gets updated by notifications
/// and this same cache is used to satisfy complaint based resolution requests
/// (see resolve_service_partition())). Applications that both register for notifications
/// and use complaint based resolution on the same client instance typically only need to
/// pass null for the ResolvedServicePartition argument during resolution.
/// This will always return the endpoints in the client cache updated by the latest notification.
/// The notification mechanism itself will keep the client cache updated when service endpoints change.
/// TODO: explore the relation to IFabricServiceNotification.
pub async fn register_service_notification_filter(
&self,
desc: &ServiceNotificationFilterDescription,
timeout: Duration,
) -> crate::Result<FilterIdHandle> {
let raw: FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION = desc.into();
let id = self
.register_service_notification_filter_internal(&raw, timeout.as_millis() as u32)
.await?;
Ok(FilterIdHandle { id })
}

/// It's not necessary to unregister individual filters if the client itself
/// will no longer be used since all ServiceNotificationFilterDescription
/// objects registered by the FabricClient will be automatically unregistered when client is disposed.
pub async fn unregister_service_notification_filter(
&self,
filter_id_handle: FilterIdHandle,
timeout: Duration,
) -> crate::Result<()> {
self.unregister_service_notification_filter_internal(
filter_id_handle.id,
timeout.as_millis() as u32,
)
.await
}
}

// Handle to the registered service notification filter
pub struct FilterIdHandle {
id: i64,
}

// see ComFabricClient.cpp for conversion details in cpp
@@ -207,7 +287,7 @@ impl PartitionKeyType {
}
}

#[derive(Clone, Copy, Debug)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ServicePartitionKind {
Int64Range,
Invalid,
@@ -344,7 +424,7 @@ impl FabricListAccessor<FABRIC_RESOLVED_SERVICE_ENDPOINT> for ResolvedServiceEnd
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ResolvedServiceEndpoint {
pub address: HSTRING,
pub role: ServiceEndpointRole,
42 changes: 42 additions & 0 deletions crates/libs/core/src/types/client/mod.rs
Original file line number Diff line number Diff line change
@@ -5,8 +5,50 @@

// This mod contains fabric client related types
mod partition;
use mssf_com::FabricTypes::{
FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION, FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS,
FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS_NAME_PREFIX,
FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS_NONE,
FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS_PRIMARY_ONLY, FABRIC_URI,
};
pub use partition::*;
mod node;
pub use node::*;
mod replica;
pub use replica::*;
use windows_core::HSTRING;

// FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS
bitflags::bitflags! {
#[derive(Debug, Clone)]
pub struct ServiceNotificationFilterFlags: i32{
const None = FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS_NONE.0;
const NamePrefix = FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS_NAME_PREFIX.0;
const PrimaryOnly = FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS_PRIMARY_ONLY.0;
}
}

impl From<&ServiceNotificationFilterFlags> for FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS {
fn from(value: &ServiceNotificationFilterFlags) -> Self {
Self(value.bits())
}
}

// FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION
#[derive(Debug, Clone)]
pub struct ServiceNotificationFilterDescription {
pub name: HSTRING,
pub flags: ServiceNotificationFilterFlags,
}

impl From<&ServiceNotificationFilterDescription>
for FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION
{
fn from(value: &ServiceNotificationFilterDescription) -> Self {
Self {
Name: FABRIC_URI(value.name.as_ptr() as *mut u16),
Flags: (&value.flags).into(),
Reserved: std::ptr::null_mut(),
}
}
}
4 changes: 3 additions & 1 deletion crates/libs/core/src/types/client/partition.rs
Original file line number Diff line number Diff line change
@@ -111,7 +111,7 @@ impl From<&FABRIC_SERVICE_PARTITION_QUERY_RESULT_ITEM> for ServicePartition {
}
}

#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub enum ServicePartitionStatus {
Invalid,
Ready,
@@ -136,6 +136,7 @@ impl From<&FABRIC_QUERY_SERVICE_PARTITION_STATUS> for ServicePartitionStatus {
}

// FABRIC_STATEFUL_SERVICE_PARTITION_QUERY_RESULT_ITEM
#[derive(Debug, Clone)]
pub struct StatefulServicePartition {
pub partition_information: ServicePartitionInformation,
pub target_replica_set_size: u32,
@@ -160,6 +161,7 @@ impl From<&FABRIC_STATEFUL_SERVICE_PARTITION_QUERY_RESULT_ITEM> for StatefulServ
}
}

#[derive(Debug, Clone)]
pub struct StatelessServicePartition {
pub partition_information: ServicePartitionInformation,
pub instance_count: u32,
5 changes: 4 additions & 1 deletion crates/libs/core/src/types/client/replica.rs
Original file line number Diff line number Diff line change
@@ -72,6 +72,7 @@ impl FabricListAccessor<FABRIC_SERVICE_REPLICA_QUERY_RESULT_ITEM> for ServiceRep
}

// FABRIC_SERVICE_REPLICA_QUERY_RESULT_ITEM
#[derive(Debug, Clone)]
pub enum ServiceReplicaQueryResult {
Invalid,
Stateful(StatefulServiceReplicaQueryResult),
@@ -103,6 +104,7 @@ impl From<&FABRIC_SERVICE_REPLICA_QUERY_RESULT_ITEM> for ServiceReplicaQueryResu
}

// FABRIC_STATEFUL_SERVICE_REPLICA_QUERY_RESULT_ITEM
#[derive(Debug, Clone)]
pub struct StatefulServiceReplicaQueryResult {
pub replica_id: i64,
pub replica_role: ReplicaRole,
@@ -131,7 +133,7 @@ impl From<&FABRIC_STATEFUL_SERVICE_REPLICA_QUERY_RESULT_ITEM>
}

// FABRIC_QUERY_SERVICE_REPLICA_STATUS
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub enum QueryServiceReplicaStatus {
Invalid,
Inbuild,
@@ -156,6 +158,7 @@ impl From<&FABRIC_QUERY_SERVICE_REPLICA_STATUS> for QueryServiceReplicaStatus {
}

//FABRIC_STATELESS_SERVICE_INSTANCE_QUERY_RESULT_ITEM
#[derive(Debug, Clone)]
pub struct StatelessServiceInstanceQueryResult {
pub instance_id: i64,
pub replica_status: QueryServiceReplicaStatus,
2 changes: 1 addition & 1 deletion crates/libs/core/src/types/common/mod.rs
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@ use mssf_com::FabricTypes::{
};

// FABRIC_HEALTH_STATE
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub enum HealthState {
Invalid,
Ok,
8 changes: 4 additions & 4 deletions crates/libs/core/src/types/common/partition.rs
Original file line number Diff line number Diff line change
@@ -14,27 +14,27 @@ use windows_core::GUID;
use crate::strings::HSTRINGWrap;

// FABRIC_SERVICE_PARTITION_INFORMATION
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum ServicePartitionInformation {
Invalid,
Singleton(SingletonPartitionInfomation),
Int64Range(Int64PartitionInfomation),
Named(NamedPartitionInfomation),
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct SingletonPartitionInfomation {
pub id: GUID,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Int64PartitionInfomation {
pub id: ::windows_core::GUID,
pub low_key: i64,
pub high_key: i64,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct NamedPartitionInfomation {
pub id: ::windows_core::GUID,
pub name: ::windows_core::HSTRING,
Loading

0 comments on commit 4b303e2

Please sign in to comment.