Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FabricClient] Support 'register service notification filter' api #60

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/libs/core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mod tests;
// FabricClient safe wrapper
// The design of FabricClient follows from the csharp client:
// https://github.com/microsoft/service-fabric/blob/master/src/prod/src/managed/Api/src/System/Fabric/FabricClient.cs

#[derive(Debug, Clone)]
pub struct FabricClient {
com_property_client: IFabricPropertyManagementClient2,
com_service_client: IFabricServiceManagementClient6,
Expand Down
96 changes: 88 additions & 8 deletions crates/libs/core/src/client/svc_mgmt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,22 @@ use mssf_com::{
FABRIC_PARTITION_KEY_TYPE_INVALID, FABRIC_PARTITION_KEY_TYPE_NONE,
FABRIC_PARTITION_KEY_TYPE_STRING, FABRIC_REMOVE_REPLICA_DESCRIPTION,
FABRIC_RESOLVED_SERVICE_ENDPOINT, FABRIC_RESTART_REPLICA_DESCRIPTION,
FABRIC_SERVICE_ENDPOINT_ROLE, FABRIC_SERVICE_PARTITION_KIND,
FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE, FABRIC_SERVICE_PARTITION_KIND_INVALID,
FABRIC_SERVICE_PARTITION_KIND_NAMED, FABRIC_SERVICE_PARTITION_KIND_SINGLETON,
FABRIC_SERVICE_ROLE_INVALID, FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY,
FABRIC_SERVICE_ROLE_STATEFUL_SECONDARY, FABRIC_SERVICE_ROLE_STATELESS, FABRIC_URI,
FABRIC_SERVICE_ENDPOINT_ROLE, FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION,
FABRIC_SERVICE_PARTITION_KIND, FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE,
FABRIC_SERVICE_PARTITION_KIND_INVALID, FABRIC_SERVICE_PARTITION_KIND_NAMED,
FABRIC_SERVICE_PARTITION_KIND_SINGLETON, FABRIC_SERVICE_ROLE_INVALID,
FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY, FABRIC_SERVICE_ROLE_STATEFUL_SECONDARY,
FABRIC_SERVICE_ROLE_STATELESS, FABRIC_URI,
},
};
use windows_core::{HSTRING, PCWSTR};

use crate::{
iter::{FabricIter, FabricListAccessor},
sync::{fabric_begin_end_proxy, FabricReceiver},
types::{RemoveReplicaDescription, RestartReplicaDescription},
types::{
RemoveReplicaDescription, RestartReplicaDescription, ServiceNotificationFilterDescription,
},
};

// Service Management Client
Expand Down Expand Up @@ -89,6 +92,40 @@ impl ServiceManagementClient {
move |ctx| unsafe { com2.EndRemoveReplica(ctx) },
)
}

fn register_service_notification_filter_internal(
&self,
desc: &FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION,
timeout_milliseconds: u32,
) -> FabricReceiver<crate::Result<i64>> {
let com1 = &self.com;
let com2 = self.com.clone();
fabric_begin_end_proxy(
move |callback| unsafe {
com1.BeginRegisterServiceNotificationFilter(desc, timeout_milliseconds, callback)
},
move |ctx| unsafe { com2.EndRegisterServiceNotificationFilter(ctx) },
)
}

fn unregister_service_notification_filter_internal(
&self,
filterid: i64,
timeout_milliseconds: u32,
) -> FabricReceiver<crate::Result<()>> {
let com1 = &self.com;
let com2 = self.com.clone();
fabric_begin_end_proxy(
move |callback| unsafe {
com1.BeginUnregisterServiceNotificationFilter(
filterid,
timeout_milliseconds,
callback,
)
},
move |ctx| unsafe { com2.EndUnregisterServiceNotificationFilter(ctx) },
)
}
}

// public implementation block
Expand Down Expand Up @@ -141,6 +178,8 @@ impl ServiceManagementClient {
/// This API gives a running replica the chance to cleanup its state and be gracefully shutdown.
/// WARNING: There are no safety checks performed when this API is used.
/// Incorrect use of this API can lead to data loss for stateful services.
/// Remarks:
/// For stateless services, Instance Abort is called.
pub async fn remove_replica(
&self,
desc: &RemoveReplicaDescription,
Expand All @@ -150,6 +189,47 @@ impl ServiceManagementClient {
self.remove_replica_internal(&raw, timeout.as_millis() as u32)
.await
}

/// Remarks:
/// There is a cache of service endpoints in the client that gets updated by notifications
/// and this same cache is used to satisfy complaint based resolution requests
/// (see resolve_service_partition())). Applications that both register for notifications
/// and use complaint based resolution on the same client instance typically only need to
/// pass null for the ResolvedServicePartition argument during resolution.
/// This will always return the endpoints in the client cache updated by the latest notification.
/// The notification mechanism itself will keep the client cache updated when service endpoints change.
/// TODO: explore the relation to IFabricServiceNotification.
pub async fn register_service_notification_filter(
&self,
desc: &ServiceNotificationFilterDescription,
timeout: Duration,
) -> crate::Result<FilterIdHandle> {
let raw: FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION = desc.into();
let id = self
.register_service_notification_filter_internal(&raw, timeout.as_millis() as u32)
.await?;
Ok(FilterIdHandle { id })
}

/// It's not necessary to unregister individual filters if the client itself
/// will no longer be used since all ServiceNotificationFilterDescription
/// objects registered by the FabricClient will be automatically unregistered when client is disposed.
pub async fn unregister_service_notification_filter(
&self,
filter_id_handle: FilterIdHandle,
timeout: Duration,
) -> crate::Result<()> {
self.unregister_service_notification_filter_internal(
filter_id_handle.id,
timeout.as_millis() as u32,
)
.await
}
}

// Handle to the registered service notification filter
pub struct FilterIdHandle {
id: i64,
}

// see ComFabricClient.cpp for conversion details in cpp
Expand Down Expand Up @@ -207,7 +287,7 @@ impl PartitionKeyType {
}
}

#[derive(Clone, Copy, Debug)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ServicePartitionKind {
Int64Range,
Invalid,
Expand Down Expand Up @@ -344,7 +424,7 @@ impl FabricListAccessor<FABRIC_RESOLVED_SERVICE_ENDPOINT> for ResolvedServiceEnd
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ResolvedServiceEndpoint {
pub address: HSTRING,
pub role: ServiceEndpointRole,
Expand Down
42 changes: 42 additions & 0 deletions crates/libs/core/src/types/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,50 @@

// This mod contains fabric client related types
mod partition;
use mssf_com::FabricTypes::{
FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION, FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS,
FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS_NAME_PREFIX,
FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS_NONE,
FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS_PRIMARY_ONLY, FABRIC_URI,
};
pub use partition::*;
mod node;
pub use node::*;
mod replica;
pub use replica::*;
use windows_core::HSTRING;

// FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS
bitflags::bitflags! {
#[derive(Debug, Clone)]
pub struct ServiceNotificationFilterFlags: i32{
const None = FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS_NONE.0;
const NamePrefix = FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS_NAME_PREFIX.0;
const PrimaryOnly = FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS_PRIMARY_ONLY.0;
}
}

impl From<&ServiceNotificationFilterFlags> for FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS {
fn from(value: &ServiceNotificationFilterFlags) -> Self {
Self(value.bits())
}
}

// FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION
#[derive(Debug, Clone)]
pub struct ServiceNotificationFilterDescription {
pub name: HSTRING,
pub flags: ServiceNotificationFilterFlags,
}

impl From<&ServiceNotificationFilterDescription>
for FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION
{
fn from(value: &ServiceNotificationFilterDescription) -> Self {
Self {
Name: FABRIC_URI(value.name.as_ptr() as *mut u16),
Flags: (&value.flags).into(),
Reserved: std::ptr::null_mut(),
}
}
}
4 changes: 3 additions & 1 deletion crates/libs/core/src/types/client/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl From<&FABRIC_SERVICE_PARTITION_QUERY_RESULT_ITEM> for ServicePartition {
}
}

#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub enum ServicePartitionStatus {
Invalid,
Ready,
Expand All @@ -136,6 +136,7 @@ impl From<&FABRIC_QUERY_SERVICE_PARTITION_STATUS> for ServicePartitionStatus {
}

// FABRIC_STATEFUL_SERVICE_PARTITION_QUERY_RESULT_ITEM
#[derive(Debug, Clone)]
pub struct StatefulServicePartition {
pub partition_information: ServicePartitionInformation,
pub target_replica_set_size: u32,
Expand All @@ -160,6 +161,7 @@ impl From<&FABRIC_STATEFUL_SERVICE_PARTITION_QUERY_RESULT_ITEM> for StatefulServ
}
}

#[derive(Debug, Clone)]
pub struct StatelessServicePartition {
pub partition_information: ServicePartitionInformation,
pub instance_count: u32,
Expand Down
5 changes: 4 additions & 1 deletion crates/libs/core/src/types/client/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl FabricListAccessor<FABRIC_SERVICE_REPLICA_QUERY_RESULT_ITEM> for ServiceRep
}

// FABRIC_SERVICE_REPLICA_QUERY_RESULT_ITEM
#[derive(Debug, Clone)]
pub enum ServiceReplicaQueryResult {
Invalid,
Stateful(StatefulServiceReplicaQueryResult),
Expand Down Expand Up @@ -103,6 +104,7 @@ impl From<&FABRIC_SERVICE_REPLICA_QUERY_RESULT_ITEM> for ServiceReplicaQueryResu
}

// FABRIC_STATEFUL_SERVICE_REPLICA_QUERY_RESULT_ITEM
#[derive(Debug, Clone)]
pub struct StatefulServiceReplicaQueryResult {
pub replica_id: i64,
pub replica_role: ReplicaRole,
Expand Down Expand Up @@ -131,7 +133,7 @@ impl From<&FABRIC_STATEFUL_SERVICE_REPLICA_QUERY_RESULT_ITEM>
}

// FABRIC_QUERY_SERVICE_REPLICA_STATUS
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub enum QueryServiceReplicaStatus {
Invalid,
Inbuild,
Expand All @@ -156,6 +158,7 @@ impl From<&FABRIC_QUERY_SERVICE_REPLICA_STATUS> for QueryServiceReplicaStatus {
}

//FABRIC_STATELESS_SERVICE_INSTANCE_QUERY_RESULT_ITEM
#[derive(Debug, Clone)]
pub struct StatelessServiceInstanceQueryResult {
pub instance_id: i64,
pub replica_status: QueryServiceReplicaStatus,
Expand Down
2 changes: 1 addition & 1 deletion crates/libs/core/src/types/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use mssf_com::FabricTypes::{
};

// FABRIC_HEALTH_STATE
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub enum HealthState {
Invalid,
Ok,
Expand Down
8 changes: 4 additions & 4 deletions crates/libs/core/src/types/common/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,27 @@ use windows_core::GUID;
use crate::strings::HSTRINGWrap;

// FABRIC_SERVICE_PARTITION_INFORMATION
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum ServicePartitionInformation {
Invalid,
Singleton(SingletonPartitionInfomation),
Int64Range(Int64PartitionInfomation),
Named(NamedPartitionInfomation),
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct SingletonPartitionInfomation {
pub id: GUID,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Int64PartitionInfomation {
pub id: ::windows_core::GUID,
pub low_key: i64,
pub high_key: i64,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct NamedPartitionInfomation {
pub id: ::windows_core::GUID,
pub name: ::windows_core::HSTRING,
Expand Down
Loading
Loading