diff --git a/crates/libs/core/src/client/mod.rs b/crates/libs/core/src/client/mod.rs index 7ccb6b50..471e6234 100644 --- a/crates/libs/core/src/client/mod.rs +++ b/crates/libs/core/src/client/mod.rs @@ -19,7 +19,7 @@ mod tests; // FabricClient safe wrapper // The design of FabricClient follows from the csharp client: // https://github.com/microsoft/service-fabric/blob/master/src/prod/src/managed/Api/src/System/Fabric/FabricClient.cs - +#[derive(Debug, Clone)] pub struct FabricClient { com_property_client: IFabricPropertyManagementClient2, com_service_client: IFabricServiceManagementClient6, diff --git a/crates/libs/core/src/client/svc_mgmt_client.rs b/crates/libs/core/src/client/svc_mgmt_client.rs index f29768ee..c4ae00d2 100644 --- a/crates/libs/core/src/client/svc_mgmt_client.rs +++ b/crates/libs/core/src/client/svc_mgmt_client.rs @@ -12,11 +12,12 @@ use mssf_com::{ FABRIC_PARTITION_KEY_TYPE_INVALID, FABRIC_PARTITION_KEY_TYPE_NONE, FABRIC_PARTITION_KEY_TYPE_STRING, FABRIC_REMOVE_REPLICA_DESCRIPTION, FABRIC_RESOLVED_SERVICE_ENDPOINT, FABRIC_RESTART_REPLICA_DESCRIPTION, - FABRIC_SERVICE_ENDPOINT_ROLE, FABRIC_SERVICE_PARTITION_KIND, - FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE, FABRIC_SERVICE_PARTITION_KIND_INVALID, - FABRIC_SERVICE_PARTITION_KIND_NAMED, FABRIC_SERVICE_PARTITION_KIND_SINGLETON, - FABRIC_SERVICE_ROLE_INVALID, FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY, - FABRIC_SERVICE_ROLE_STATEFUL_SECONDARY, FABRIC_SERVICE_ROLE_STATELESS, FABRIC_URI, + FABRIC_SERVICE_ENDPOINT_ROLE, FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION, + FABRIC_SERVICE_PARTITION_KIND, FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE, + FABRIC_SERVICE_PARTITION_KIND_INVALID, FABRIC_SERVICE_PARTITION_KIND_NAMED, + FABRIC_SERVICE_PARTITION_KIND_SINGLETON, FABRIC_SERVICE_ROLE_INVALID, + FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY, FABRIC_SERVICE_ROLE_STATEFUL_SECONDARY, + FABRIC_SERVICE_ROLE_STATELESS, FABRIC_URI, }, }; use windows_core::{HSTRING, PCWSTR}; @@ -24,7 +25,9 @@ use windows_core::{HSTRING, PCWSTR}; use crate::{ iter::{FabricIter, FabricListAccessor}, sync::{fabric_begin_end_proxy, FabricReceiver}, - types::{RemoveReplicaDescription, RestartReplicaDescription}, + types::{ + RemoveReplicaDescription, RestartReplicaDescription, ServiceNotificationFilterDescription, + }, }; // Service Management Client @@ -89,6 +92,40 @@ impl ServiceManagementClient { move |ctx| unsafe { com2.EndRemoveReplica(ctx) }, ) } + + fn register_service_notification_filter_internal( + &self, + desc: &FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION, + timeout_milliseconds: u32, + ) -> FabricReceiver> { + let com1 = &self.com; + let com2 = self.com.clone(); + fabric_begin_end_proxy( + move |callback| unsafe { + com1.BeginRegisterServiceNotificationFilter(desc, timeout_milliseconds, callback) + }, + move |ctx| unsafe { com2.EndRegisterServiceNotificationFilter(ctx) }, + ) + } + + fn unregister_service_notification_filter_internal( + &self, + filterid: i64, + timeout_milliseconds: u32, + ) -> FabricReceiver> { + let com1 = &self.com; + let com2 = self.com.clone(); + fabric_begin_end_proxy( + move |callback| unsafe { + com1.BeginUnregisterServiceNotificationFilter( + filterid, + timeout_milliseconds, + callback, + ) + }, + move |ctx| unsafe { com2.EndUnregisterServiceNotificationFilter(ctx) }, + ) + } } // public implementation block @@ -141,6 +178,8 @@ impl ServiceManagementClient { /// This API gives a running replica the chance to cleanup its state and be gracefully shutdown. /// WARNING: There are no safety checks performed when this API is used. /// Incorrect use of this API can lead to data loss for stateful services. + /// Remarks: + /// For stateless services, Instance Abort is called. pub async fn remove_replica( &self, desc: &RemoveReplicaDescription, @@ -150,6 +189,47 @@ impl ServiceManagementClient { self.remove_replica_internal(&raw, timeout.as_millis() as u32) .await } + + /// Remarks: + /// There is a cache of service endpoints in the client that gets updated by notifications + /// and this same cache is used to satisfy complaint based resolution requests + /// (see resolve_service_partition())). Applications that both register for notifications + /// and use complaint based resolution on the same client instance typically only need to + /// pass null for the ResolvedServicePartition argument during resolution. + /// This will always return the endpoints in the client cache updated by the latest notification. + /// The notification mechanism itself will keep the client cache updated when service endpoints change. + /// TODO: explore the relation to IFabricServiceNotification. + pub async fn register_service_notification_filter( + &self, + desc: &ServiceNotificationFilterDescription, + timeout: Duration, + ) -> crate::Result { + let raw: FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION = desc.into(); + let id = self + .register_service_notification_filter_internal(&raw, timeout.as_millis() as u32) + .await?; + Ok(FilterIdHandle { id }) + } + + /// It's not necessary to unregister individual filters if the client itself + /// will no longer be used since all ServiceNotificationFilterDescription + /// objects registered by the FabricClient will be automatically unregistered when client is disposed. + pub async fn unregister_service_notification_filter( + &self, + filter_id_handle: FilterIdHandle, + timeout: Duration, + ) -> crate::Result<()> { + self.unregister_service_notification_filter_internal( + filter_id_handle.id, + timeout.as_millis() as u32, + ) + .await + } +} + +// Handle to the registered service notification filter +pub struct FilterIdHandle { + id: i64, } // see ComFabricClient.cpp for conversion details in cpp @@ -207,7 +287,7 @@ impl PartitionKeyType { } } -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, PartialEq)] pub enum ServicePartitionKind { Int64Range, Invalid, @@ -344,7 +424,7 @@ impl FabricListAccessor for ResolvedServiceEnd } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ResolvedServiceEndpoint { pub address: HSTRING, pub role: ServiceEndpointRole, diff --git a/crates/libs/core/src/types/client/mod.rs b/crates/libs/core/src/types/client/mod.rs index e3cb73ac..ab3e0936 100644 --- a/crates/libs/core/src/types/client/mod.rs +++ b/crates/libs/core/src/types/client/mod.rs @@ -5,8 +5,50 @@ // This mod contains fabric client related types mod partition; +use mssf_com::FabricTypes::{ + FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION, FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS, + FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS_NAME_PREFIX, + FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS_NONE, + FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS_PRIMARY_ONLY, FABRIC_URI, +}; pub use partition::*; mod node; pub use node::*; mod replica; pub use replica::*; +use windows_core::HSTRING; + +// FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS +bitflags::bitflags! { + #[derive(Debug, Clone)] + pub struct ServiceNotificationFilterFlags: i32{ + const None = FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS_NONE.0; + const NamePrefix = FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS_NAME_PREFIX.0; + const PrimaryOnly = FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS_PRIMARY_ONLY.0; + } +} + +impl From<&ServiceNotificationFilterFlags> for FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS { + fn from(value: &ServiceNotificationFilterFlags) -> Self { + Self(value.bits()) + } +} + +// FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION +#[derive(Debug, Clone)] +pub struct ServiceNotificationFilterDescription { + pub name: HSTRING, + pub flags: ServiceNotificationFilterFlags, +} + +impl From<&ServiceNotificationFilterDescription> + for FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION +{ + fn from(value: &ServiceNotificationFilterDescription) -> Self { + Self { + Name: FABRIC_URI(value.name.as_ptr() as *mut u16), + Flags: (&value.flags).into(), + Reserved: std::ptr::null_mut(), + } + } +} diff --git a/crates/libs/core/src/types/client/partition.rs b/crates/libs/core/src/types/client/partition.rs index 0023fc83..a54189ef 100644 --- a/crates/libs/core/src/types/client/partition.rs +++ b/crates/libs/core/src/types/client/partition.rs @@ -111,7 +111,7 @@ impl From<&FABRIC_SERVICE_PARTITION_QUERY_RESULT_ITEM> for ServicePartition { } } -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub enum ServicePartitionStatus { Invalid, Ready, @@ -136,6 +136,7 @@ impl From<&FABRIC_QUERY_SERVICE_PARTITION_STATUS> for ServicePartitionStatus { } // FABRIC_STATEFUL_SERVICE_PARTITION_QUERY_RESULT_ITEM +#[derive(Debug, Clone)] pub struct StatefulServicePartition { pub partition_information: ServicePartitionInformation, pub target_replica_set_size: u32, @@ -160,6 +161,7 @@ impl From<&FABRIC_STATEFUL_SERVICE_PARTITION_QUERY_RESULT_ITEM> for StatefulServ } } +#[derive(Debug, Clone)] pub struct StatelessServicePartition { pub partition_information: ServicePartitionInformation, pub instance_count: u32, diff --git a/crates/libs/core/src/types/client/replica.rs b/crates/libs/core/src/types/client/replica.rs index cf1b1e5d..e8a84197 100644 --- a/crates/libs/core/src/types/client/replica.rs +++ b/crates/libs/core/src/types/client/replica.rs @@ -72,6 +72,7 @@ impl FabricListAccessor for ServiceRep } // FABRIC_SERVICE_REPLICA_QUERY_RESULT_ITEM +#[derive(Debug, Clone)] pub enum ServiceReplicaQueryResult { Invalid, Stateful(StatefulServiceReplicaQueryResult), @@ -103,6 +104,7 @@ impl From<&FABRIC_SERVICE_REPLICA_QUERY_RESULT_ITEM> for ServiceReplicaQueryResu } // FABRIC_STATEFUL_SERVICE_REPLICA_QUERY_RESULT_ITEM +#[derive(Debug, Clone)] pub struct StatefulServiceReplicaQueryResult { pub replica_id: i64, pub replica_role: ReplicaRole, @@ -131,7 +133,7 @@ impl From<&FABRIC_STATEFUL_SERVICE_REPLICA_QUERY_RESULT_ITEM> } // FABRIC_QUERY_SERVICE_REPLICA_STATUS -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub enum QueryServiceReplicaStatus { Invalid, Inbuild, @@ -156,6 +158,7 @@ impl From<&FABRIC_QUERY_SERVICE_REPLICA_STATUS> for QueryServiceReplicaStatus { } //FABRIC_STATELESS_SERVICE_INSTANCE_QUERY_RESULT_ITEM +#[derive(Debug, Clone)] pub struct StatelessServiceInstanceQueryResult { pub instance_id: i64, pub replica_status: QueryServiceReplicaStatus, diff --git a/crates/libs/core/src/types/common/mod.rs b/crates/libs/core/src/types/common/mod.rs index c8a3b365..6d3dda30 100644 --- a/crates/libs/core/src/types/common/mod.rs +++ b/crates/libs/core/src/types/common/mod.rs @@ -17,7 +17,7 @@ use mssf_com::FabricTypes::{ }; // FABRIC_HEALTH_STATE -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub enum HealthState { Invalid, Ok, diff --git a/crates/libs/core/src/types/common/partition.rs b/crates/libs/core/src/types/common/partition.rs index d213d114..8b17e513 100644 --- a/crates/libs/core/src/types/common/partition.rs +++ b/crates/libs/core/src/types/common/partition.rs @@ -14,7 +14,7 @@ use windows_core::GUID; use crate::strings::HSTRINGWrap; // FABRIC_SERVICE_PARTITION_INFORMATION -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum ServicePartitionInformation { Invalid, Singleton(SingletonPartitionInfomation), @@ -22,19 +22,19 @@ pub enum ServicePartitionInformation { Named(NamedPartitionInfomation), } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SingletonPartitionInfomation { pub id: GUID, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Int64PartitionInfomation { pub id: ::windows_core::GUID, pub low_key: i64, pub high_key: i64, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct NamedPartitionInfomation { pub id: ::windows_core::GUID, pub name: ::windows_core::HSTRING, diff --git a/crates/samples/echomain-stateful2/src/test.rs b/crates/samples/echomain-stateful2/src/test.rs index 1e346ca1..d362fa48 100644 --- a/crates/samples/echomain-stateful2/src/test.rs +++ b/crates/samples/echomain-stateful2/src/test.rs @@ -6,77 +6,267 @@ use std::time::Duration; use mssf_core::{ - client::FabricClient, + client::{ + svc_mgmt_client::{ + PartitionKeyType, ResolvedServiceEndpoint, ServiceEndpointRole, ServicePartitionKind, + }, + FabricClient, + }, + error::FabricErrorCode, types::{ - QueryServiceReplicaStatus, RestartReplicaDescription, ServicePartition, + QueryServiceReplicaStatus, ReplicaRole, RestartReplicaDescription, + ServiceNotificationFilterDescription, ServiceNotificationFilterFlags, ServicePartition, ServicePartitionInformation, ServicePartitionQueryDescription, ServicePartitionStatus, - ServiceReplicaQueryDescription, ServiceReplicaQueryResult, + ServiceReplicaQueryDescription, ServiceReplicaQueryResult, SingletonPartitionInfomation, + StatefulServicePartition, StatefulServiceReplicaQueryResult, }, GUID, HSTRING, }; +static SVC_URI: &str = "fabric:/StatefulEchoApp/StatefulEchoAppService"; + +/// Test client for the stateful service +pub struct TestClient { + fc: FabricClient, + service_uri: HSTRING, + timeout: Duration, +} + +impl TestClient { + fn new(fc: FabricClient) -> Self { + Self { + fc, + service_uri: HSTRING::from(SVC_URI), + timeout: Duration::from_secs(1), + } + } + + async fn get_partition( + &self, + ) -> mssf_core::Result<(StatefulServicePartition, SingletonPartitionInfomation)> { + let qc = self.fc.get_query_manager(); + let desc = ServicePartitionQueryDescription { + service_name: self.service_uri.clone(), + partition_id_filter: None, + }; + let list = qc.get_partition_list(&desc, self.timeout).await.unwrap(); + // there is only one partition + let p = list.iter().next().unwrap(); + let stateful = match p { + ServicePartition::Stateful(s) => s, + _ => panic!("not stateless"), + }; + let info = stateful.clone().partition_information; + let single = match info { + ServicePartitionInformation::Singleton(s) => s, + _ => panic!("not singleton"), + }; + Ok((stateful, single)) + } + + // primary replica is returned first. + async fn get_replicas( + &self, + partition_id: GUID, + ) -> mssf_core::Result<( + StatefulServiceReplicaQueryResult, + StatefulServiceReplicaQueryResult, + StatefulServiceReplicaQueryResult, + )> { + let qc = self.fc.get_query_manager(); + // test get replica info + let desc = ServiceReplicaQueryDescription { + partition_id, + replica_id_or_instance_id_filter: None, + }; + let replicas = qc + .get_replica_list(&desc, self.timeout) + .await? + .iter() + .collect::>(); + if replicas.len() < 3 { + // replica are not ready. + return Err(FabricErrorCode::OperationFailed.into()); + } + let stateful = replicas + .iter() + .map(|replica| match replica.clone() { + ServiceReplicaQueryResult::Stateful(s) => s, + _ => panic!("not stateful"), + }) + .collect::>(); + + let primary = stateful + .iter() + .find(|x| x.replica_role == ReplicaRole::Primary) + .expect("no primary found") + .clone(); + + let secondary = stateful + .iter() + .filter(|x| x.replica_role != ReplicaRole::Primary) + .collect::>(); + assert_eq!(secondary.len(), 2); + Ok((primary, secondary[0].clone(), secondary[1].clone())) + } + + // Resolve the service. The first return param is the primary. + async fn resolve( + &self, + ) -> mssf_core::Result<( + ResolvedServiceEndpoint, + ResolvedServiceEndpoint, + ResolvedServiceEndpoint, + )> { + let mgmt = self.fc.get_service_manager(); + let resolved_partition = mgmt + .resolve_service_partition( + &self.service_uri, + &PartitionKeyType::None, + None, + self.timeout, + ) + .await?; + let info = resolved_partition.get_info(); + assert_eq!(info.partition_key_type, PartitionKeyType::None); + assert_eq!(info.service_name, self.service_uri); + assert_eq!(info.service_partition_kind, ServicePartitionKind::Singleton); + let endpoints = resolved_partition + .get_endpoint_list() + .iter() + .collect::>(); + if endpoints.len() < 3 { + // not available yet. + return Err(FabricErrorCode::OperationFailed.into()); + } + let primary = endpoints + .iter() + .find(|r| r.role == ServiceEndpointRole::StatefulPrimary); + if primary.is_none() { + // primary not available yet. + return Err(FabricErrorCode::OperationFailed.into()); + } + let secondary = endpoints + .iter() + .filter(|r| r.role != ServiceEndpointRole::StatefulPrimary) + .collect::>(); + assert_eq!(secondary.len(), 2); + Ok(( + primary.unwrap().clone(), + secondary[0].clone(), + secondary[1].clone(), + )) + } +} + // Requires app to be deployed on onebox. // Uses fabric client to perform various actions for this service. #[tokio::test] async fn test_partition_info() { let fc = FabricClient::new(); - let qc = fc.get_query_manager(); - + let tc = TestClient::new(fc.clone()); let timeout = Duration::from_secs(1); - let desc = ServicePartitionQueryDescription { - service_name: HSTRING::from("fabric:/StatefulEchoApp/StatefulEchoAppService"), - partition_id_filter: None, - }; - - let list = qc.get_partition_list(&desc, timeout).await.unwrap(); - // there is only one partition - let p = list.iter().next().unwrap(); - let stateful = match p { - ServicePartition::Stateful(s) => s, - _ => panic!("not stateless"), - }; + let (stateful, single) = tc.get_partition().await.unwrap(); // TODO: not sure why state is unknown. // assert_eq!(stateful.health_state, HealthState::Ok); assert_eq!(stateful.partition_status, ServicePartitionStatus::Ready); assert_eq!(stateful.target_replica_set_size, 3); assert_eq!(stateful.min_replica_set_size, 1); - let info = stateful.partition_information; - let single = match info { - ServicePartitionInformation::Singleton(s) => s, - _ => panic!("not singleton"), - }; assert_ne!(single.id, GUID::zeroed()); // test get replica info - let desc = ServiceReplicaQueryDescription { - partition_id: single.id, - replica_id_or_instance_id_filter: None, - }; - let replicas = qc - .get_replica_list(&desc, timeout) - .await - .unwrap() - .iter() - .collect::>(); - assert_eq!(replicas.len(), 3); - let replica = &replicas[0]; - let stateful_replica = match replica { - ServiceReplicaQueryResult::Stateful(s) => s, - _ => panic!("not stateful"), + let (p, _, _) = tc.get_replicas(single.id).await.unwrap(); + assert_eq!(p.replica_status, QueryServiceReplicaStatus::Ready); + assert_ne!(p.node_name, HSTRING::new()); + + let mgmt = fc.get_service_manager(); + // register service notification filter + let filter_handle = { + let desc = ServiceNotificationFilterDescription { + name: HSTRING::from(SVC_URI), + flags: ServiceNotificationFilterFlags::NamePrefix, + }; + // register takes more than 1 sec. + mgmt.register_service_notification_filter(&desc, Duration::from_secs(10)) + .await + .unwrap() }; - assert_eq!( - stateful_replica.replica_status, - QueryServiceReplicaStatus::Ready - ); - assert_ne!(stateful_replica.node_name, HSTRING::new()); + // resolve the service + let (p_endpoint, _, _) = tc.resolve().await.unwrap(); + + // restart primary let desc = RestartReplicaDescription { - node_name: stateful_replica.node_name.clone(), + node_name: p.node_name.clone(), partition_id: single.id, - replica_or_instance_id: stateful_replica.replica_id, + replica_or_instance_id: p.replica_id, }; - let mgmt = fc.get_service_manager(); mgmt.restart_replica(&desc, timeout).await.unwrap(); + + // get replica info to see primary has changed + let mut count = 0; + loop { + let res = tc.get_replicas(single.id).await; + let p2 = match res { + Ok((p2, _, _)) => p2, + Err(_) => { + // replica not yet ready + count += 1; + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + } + }; + if p2.node_name != p.node_name { + assert_ne!(p.replica_id, p2.replica_id); + println!("replica id updated after {} retries", count); + break; + } else { + // failover is not yet finished. + if count > 5 { + panic!( + "replica id not changed after retry. original {}, new {}", + p.replica_id, p2.replica_id + ); + } + // replica has not changed yet. + tokio::time::sleep(Duration::from_secs(1)).await; + } + count += 1; + } + + // resolve again the primary addr should change + { + let mut count = 0; + loop { + let res = tc.resolve().await; + let p2_endpoint = match res { + Ok((p2, _, _)) => p2, + Err(_) => { + // not yet ready + count += 1; + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + } + }; + + if p2_endpoint.address != p_endpoint.address { + println!("addr updated after {} retries", count); + break; + } else { + // addr update might be slow. + // This typically takes 8 seconds which includes service boot time. + if count > 30 { + panic!("addr for primary is not changed {}", p2_endpoint.address); + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + count += 1; + } + } + // unregisters the notification + mgmt.unregister_service_notification_filter(filter_handle, timeout) + .await + .unwrap(); } diff --git a/crates/samples/echomain/src/test.rs b/crates/samples/echomain/src/test.rs index caed1eb7..275af7dd 100644 --- a/crates/samples/echomain/src/test.rs +++ b/crates/samples/echomain/src/test.rs @@ -6,59 +6,127 @@ use std::time::Duration; use mssf_core::{ - client::FabricClient, + client::{ + svc_mgmt_client::{ + PartitionKeyType, ResolvedServiceEndpoint, ResolvedServicePartitionInfo, + ServiceEndpointRole, ServicePartitionKind, + }, + FabricClient, + }, + error::FabricErrorCode, types::{ - QueryServiceReplicaStatus, RemoveReplicaDescription, ServicePartition, - ServicePartitionInformation, ServicePartitionQueryDescription, ServicePartitionStatus, - ServiceReplicaQueryDescription, ServiceReplicaQueryResult, + QueryServiceReplicaStatus, RemoveReplicaDescription, ServiceNotificationFilterDescription, + ServiceNotificationFilterFlags, ServicePartition, ServicePartitionInformation, + ServicePartitionQueryDescription, ServicePartitionStatus, ServiceReplicaQueryDescription, + ServiceReplicaQueryResult, SingletonPartitionInfomation, + StatelessServiceInstanceQueryResult, StatelessServicePartition, }, GUID, HSTRING, }; +static ECHO_SVC_URI: &str = "fabric:/EchoApp/EchoAppService"; + +// Test client for echo server. +pub struct EchoTestClient { + fc: FabricClient, + service_uri: HSTRING, + timeout: Duration, +} + +impl EchoTestClient { + pub fn new(fc: FabricClient) -> Self { + Self { + fc, + service_uri: HSTRING::from(ECHO_SVC_URI), + timeout: Duration::from_secs(1), + } + } + + pub async fn get_partition(&self) -> (StatelessServicePartition, SingletonPartitionInfomation) { + let qc = self.fc.get_query_manager(); + let desc = ServicePartitionQueryDescription { + service_name: self.service_uri.clone(), + partition_id_filter: None, + }; + let list = qc.get_partition_list(&desc, self.timeout).await.unwrap(); + // there is only one partition + let p = list.iter().next().unwrap(); + let stateless = match p { + ServicePartition::Stateless(s) => s, + _ => panic!("not stateless"), + }; + let info = stateless.clone().partition_information; + let single = match info { + ServicePartitionInformation::Singleton(s) => s, + _ => panic!("not singleton"), + }; + (stateless, single) + } + + pub async fn get_replica( + &self, + partition_id: GUID, + ) -> mssf_core::Result { + let qc = self.fc.get_query_manager(); + let desc = ServiceReplicaQueryDescription { + partition_id, + replica_id_or_instance_id_filter: None, + }; + let replicas = qc.get_replica_list(&desc, self.timeout).await?; + let replica_op = replicas.iter().next(); // only one replica + match replica_op { + Some(replica) => Ok(match replica { + ServiceReplicaQueryResult::Stateless(s) => s, + _ => panic!("not stateless"), + }), + // replica might be restarting + None => Err(FabricErrorCode::OperationFailed.into()), + } + } + + pub async fn resolve(&self) -> (ResolvedServicePartitionInfo, ResolvedServiceEndpoint) { + let mgmt = self.fc.get_service_manager(); + let resolved_partition = mgmt + .resolve_service_partition( + &self.service_uri, + &PartitionKeyType::None, + None, + self.timeout, + ) + .await + .expect("resolve failed"); + let info = resolved_partition.get_info(); + let endpoints = resolved_partition + .get_endpoint_list() + .iter() + .collect::>(); + // only has 1 instance + assert_eq!(endpoints.len(), 1); + (info, endpoints.first().unwrap().clone()) + } +} + // Requires app to be deployed on onebox. // Uses fabric client to perform various actions to the app. #[tokio::test] async fn test_fabric_client() { let fc = FabricClient::new(); - let qc = fc.get_query_manager(); + let ec = EchoTestClient::new(fc.clone()); - // Get partition info - let desc = ServicePartitionQueryDescription { - service_name: HSTRING::from("fabric:/EchoApp/EchoAppService"), - partition_id_filter: None, - }; let timeout = Duration::from_secs(1); + let service_uri = HSTRING::from(ECHO_SVC_URI); - let list = qc.get_partition_list(&desc, timeout).await.unwrap(); - // there is only one partition - let p = list.iter().next().unwrap(); - let stateless = match p { - ServicePartition::Stateless(s) => s, - _ => panic!("not stateless"), - }; - + // Get partition info + let (stateless, single) = ec.get_partition().await; assert_eq!(stateless.instance_count, 1); assert_eq!(stateless.partition_status, ServicePartitionStatus::Ready); // For some reason the state is unknown // assert_eq!(stateless.health_state, HealthState::Ok); - let info = stateless.partition_information; - let single = match info { - ServicePartitionInformation::Singleton(s) => s, - _ => panic!("not singleton"), - }; assert_ne!(single.id, GUID::zeroed()); // Get replica info - let desc = ServiceReplicaQueryDescription { - partition_id: single.id, - replica_id_or_instance_id_filter: None, - }; - let replicas = qc.get_replica_list(&desc, timeout).await.unwrap(); - let replica = replicas.iter().next().unwrap(); // only one replica - let stateless_replica = match replica { - ServiceReplicaQueryResult::Stateless(s) => s, - _ => panic!("not stateless"), - }; + let stateless_replica = ec.get_replica(single.id).await.unwrap(); + // TODO: health is unknown // assert_eq!(stateless.aggregated_health_state, HealthState::Ok); assert_eq!( @@ -67,14 +135,63 @@ async fn test_fabric_client() { ); assert_ne!(stateless_replica.node_name, HSTRING::new()); - // Restart the stateless instance by removing it. let mgmt = fc.get_service_manager(); - let desc = RemoveReplicaDescription { - node_name: stateless_replica.node_name, - partition_id: single.id, - replica_or_instance_id: stateless_replica.instance_id, + // register service notification filter + let filter_handle = { + let desc = ServiceNotificationFilterDescription { + name: service_uri.clone(), + flags: ServiceNotificationFilterFlags::NamePrefix, + }; + // register takes more than 1 sec. + mgmt.register_service_notification_filter(&desc, Duration::from_secs(10)) + .await + .unwrap() }; - mgmt.remove_replica(&desc, timeout) + + // try resolve the app + let (info, endpoint) = ec.resolve().await; + assert_eq!(info.partition_key_type, PartitionKeyType::None); + assert_eq!(info.service_name, service_uri); + assert_eq!(info.service_partition_kind, ServicePartitionKind::Singleton); + assert_eq!(endpoint.role, ServiceEndpointRole::Stateless); + + // Restart the stateless instance by removing it. + { + let desc = RemoveReplicaDescription { + node_name: stateless_replica.node_name, + partition_id: single.id, + replica_or_instance_id: stateless_replica.instance_id, + }; + mgmt.remove_replica(&desc, timeout) + .await + .expect("Failed to remove replica"); + } + + // replica id should be changed eventually. + let mut count = 0; + loop { + let res = ec.get_replica(single.id).await; + if res.is_err() { + continue; // replica might be down. + } + let replica2 = res.unwrap(); + if replica2.instance_id != stateless_replica.instance_id { + break; + } else { + if count > 5 { + panic!( + "replica id not changed after retry. original {}, new {}", + stateless_replica.instance_id, replica2.instance_id + ); + } + // replica has not changed yet. + tokio::time::sleep(Duration::from_secs(1)).await; + } + count += 1; + } + + // unregisters the notification + mgmt.unregister_service_notification_filter(filter_handle, timeout) .await - .expect("Failed to remove replica"); + .unwrap(); }