Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: add apollo datasource #117

Merged
merged 5 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions examples/datasources/apollo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#![allow(unreachable_code)]
use apollo_client::conf::{requests::WatchRequest, ApolloConfClientBuilder};
use url::Url;
use tokio::{task::JoinHandle, time::{sleep, Duration}};
use sentinel_core::{
flow,
base,
Result,
datasource::{ApolloDatasource, new_flow_rule_handler, rule_json_array_parser},
EntryBuilder
};
use std::sync::Arc;

// An example on apollo config service data source.
// Run this example by following steps:
// 1. Set up apollo
// (Quick start see https://github.com/apolloconfig/apollo-quick-start)
// 2. Run this example
// 3. Publish flow rule below at apollo-portal
// key: flow-apollo-example
// value:
// [
// {
// "id":"1",
// "resource":"task",
// "ref_resource":"",
// "calculate_strategy":"Direct",
// "control_strategy":"Reject",
// "relation_strategy":"Current",
// "threshold":1.0,
// "warm_up_period_sec":0,
// "warm_up_cold_factor":0,
// "max_queueing_time_ms":0,
// "stat_interval_ms":0,
// "low_mem_usage_threshold":0,
// "high_mem_usage_threshold":0,
// "mem_low_water_mark":0,
// "mem_high_water_mark":0
// }
// ]
// You will find that QPS number is restricted to 10 at first. But after publish the new flow rule,
// it will be restricted to 1.
#[tokio::main]
async fn main() -> Result<()> {
let handlers = basic_flow_example().await;
// println!("{:?}", sentinel_core::flow::get_rules_of_resource(&"task".to_string()));

// Create apollo client
let client =
ApolloConfClientBuilder::new_via_config_service(Url::parse("http://localhost:8080")?)?
.build()?;

// Request apollo notification api, and fetch configuration when notified.
let watch_request = WatchRequest {
app_id: "SampleApp".to_string(),
namespace_names: vec![
"application.properties".into(),
"application.json".into(),
"application.yml".into(),
],
..Default::default()
};

// Sleep 3 seconds and then read the apollp
sentinel_core::utils::sleep_for_ms(3000);

let property = "flow-apollo-example";
// Create a data source and change the rule.
let h = new_flow_rule_handler(rule_json_array_parser);
let mut ds = ApolloDatasource::new(client, property.into(), watch_request, vec![h]);
ds.initialize().await?;
for h in handlers {
h.await.expect("Couldn't join on the associated thread");
}
Ok(())
}

async fn basic_flow_example() -> Vec<JoinHandle<()>> {
// Init sentienl configurations
sentinel_core::init_default().unwrap_or_else(|err| sentinel_core::logging::error!("{:?}", err));
let resource_name = String::from("task");
// Load sentinel rules
flow::load_rules(vec![Arc::new(flow::Rule {
resource: resource_name.clone(),
threshold: 10.0,
calculate_strategy: flow::CalculateStrategy::Direct,
control_strategy: flow::ControlStrategy::Reject,
..Default::default()
})]);
let mut handlers = Vec::new();
for _ in 0..20 {
let res_name = resource_name.clone();
handlers.push(tokio::spawn(async move {
loop {
let entry_builder = EntryBuilder::new(res_name.clone())
.with_traffic_type(base::TrafficType::Inbound);
if let Ok(entry) = entry_builder.build() {
// Passed, wrap the logic here.
task().await;
// Be sure the entry is exited finally.
entry.exit()
} else {
sleep(Duration::from_millis(100)).await;
}
}
}));
}
handlers
}

// todo: Cannot sentinel-macros now. It will append rules,
// which is conflicts with the dynamic datasource
async fn task() {
println!("{}: passed", sentinel_core::utils::curr_time_millis());
sleep(Duration::from_millis(100)).await;
}

9 changes: 9 additions & 0 deletions sentinel-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ metric_log = ["directories", "regex"]
ds_etcdv3 = ["etcd-rs", "futures"]
ds_consul = ["consul", "base64"]
ds_k8s = ["kube", "k8s-openapi", "schemars", "futures"]
ds_apollo = ["apollo-client", "futures-util"]

[dependencies]
sentinel-macros = { version = "0.1.0", path = "../sentinel-macros", optional = true }
Expand Down Expand Up @@ -66,6 +67,8 @@ k8s-openapi = { version = "0.16.0", default-features = false, features = [
"v1_25",
], optional = true }
schemars = { version = "0.8.8", optional = true }
apollo-client = { version = "0.7.5", optional = true }
futures-util = { version = "0.3.29", optional = true }
dirs = "5.0.1"

[target.'cfg(not(target_arch="wasm32"))'.dependencies]
Expand All @@ -77,6 +80,7 @@ uuid = { version = "1.2", features = ["serde", "v4"] }
mockall = "0.11.0"
rand = "0.8.4"
tokio = { version = "1", features = ["full"] }
url = "2.5.0"

[lib]
doctest = false
Expand Down Expand Up @@ -211,3 +215,8 @@ required-features = ["full", "ds_consul"]
name = "k8s"
path = "../examples/datasources/k8s.rs"
required-features = ["full", "ds_k8s"]

[[example]]
name = "apollo"
path = "../examples/datasources/apollo.rs"
required-features = ["full", "ds_apollo"]
98 changes: 98 additions & 0 deletions sentinel-core/src/datasource/adapters/ds_apollo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use super::*;
use crate::{logging, utils::sleep_for_ms};
use apollo_client::conf::{requests::WatchRequest, ApolloConfClient};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use futures_util::{future, pin_mut, stream::StreamExt};

pub struct ApolloDatasource<P: SentinelRule + PartialEq + DeserializeOwned, H: PropertyHandler<P>> {
ds: DataSourceBase<P, H>,
property: String,
watch_request: WatchRequest,
client: ApolloConfClient,
closed: AtomicBool,
}

impl<P: SentinelRule + PartialEq + DeserializeOwned, H: PropertyHandler<P>> ApolloDatasource<P, H> {
pub fn new(client: ApolloConfClient, property: String, watch_request: WatchRequest,
handlers: Vec<Arc<H>>) -> Self {
let mut ds = DataSourceBase::default();
for h in handlers {
ds.add_property_handler(h);
}
ApolloDatasource {
ds,
property,
client,
watch_request,
closed: AtomicBool::new(false),
}
}

pub async fn initialize(&mut self) -> Result<()> {
self.watch().await
}

async fn watch(&mut self) -> Result<()> {
logging::info!(
"[Apollo] Apollo datasource is watching property {}",
self.property
);

let stream = self.client.watch(self.watch_request.clone())
.take_while(|_| future::ready(!self.closed.load(Ordering::SeqCst)));

pin_mut!(stream);

while let Some(response) = stream.next().await {
match response {
Ok(value) => {
// Load rules
let responses = value;
flearc marked this conversation as resolved.
Show resolved Hide resolved
// One namespace for one response
for (_, value) in responses {
let fetch_response = match value {
Ok(r) => r,
Err(e) => {
logging::error!("[Apollo] Fail to fetch response from apollo, {:?}", e);
continue;
}
};
let rule = fetch_response.configurations.get(&self.property);
Forsworns marked this conversation as resolved.
Show resolved Hide resolved
match self.ds.update(rule) {
Ok(()) => {}
Err(e) =>
logging::error!("[Apollo] Failed to update rules, {:?}", e)
}
}
},
// retry
Err(e) => {
logging::error!("[Apollo] Client yield an error, {:?}", e);
sleep_for_ms(1000);
}
}
}

Ok(())
}

pub fn close(&self) -> Result<()> {
self.closed.store(true, Ordering::SeqCst);
logging::info!(
"[Apollo] Apollo data source has been closed. Stop watch the key {:?} from apollo.",
self.property
);
Ok(())
}
}

impl<P: SentinelRule + PartialEq + DeserializeOwned, H: PropertyHandler<P>> DataSource<P, H>
for ApolloDatasource<P, H>
{
fn get_base(&mut self) -> &mut DataSourceBase<P, H> {
&mut self.ds
}
}
4 changes: 4 additions & 0 deletions sentinel-core/src/datasource/adapters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ pub use ds_etcdv3::*;
pub mod ds_consul;
#[cfg(feature = "ds_consul")]
pub use ds_consul::*;
#[cfg(feature = "ds_apollo")]
pub mod ds_apollo;
#[cfg(feature = "ds_apollo")]
pub use ds_apollo::*;
cfg_k8s! {
pub mod ds_k8s;
pub use ds_k8s::*;
Expand Down
4 changes: 2 additions & 2 deletions sentinel-core/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ macro_rules! cfg_exporter {
macro_rules! cfg_datasource {
($($item:item)*) => {
$(
#[cfg(any(feature = "ds_etcdv3", feature = "ds_consul", feature = "ds_k8s"))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "ds_etcdv3", feature = "ds_consul", feature = "ds_k8s"))))]
#[cfg(any(feature = "ds_etcdv3", feature = "ds_consul", feature = "ds_k8s", feature = "ds_apollo"))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "ds_etcdv3", feature = "ds_consul", feature = "ds_k8s", feature = "ds_apollo"))))]
$item
)*
}
Expand Down
Loading