Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add project rate limit #290

Merged
merged 2 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions apps/indexer-proxy/doc/error-code-dictionary.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@
- `1047` - Invalid request: payg query missing project and query field.
- `1048` - Invalid project price: price signer is not controller.
- `1050` - Payg conflict: local state > remote state + price * max_conflict_number.
- `1051` - Daily limit: reach the max limit in 24h.
- `1052` - Rate limit: reach the max limit in 1min.
- `1051` - Daily limit: reach the max limit of agreement in 24h.
- `1052` - Rate limit: reach the max limit of agreement in 1s.
- `1053` - Expired: consumer host service signers is empty.
- `1054` - Expired: payg channel is missing in redis. Maybe not set or expired.
- `1055` - Invalid signature: payg channel signer not include in redis cache.
- `1056` - Overflow: payg channel query is overflow total limit.
- `1057` - Rate limit: reach the max limit of project in 1s.
- `1100` - Serialize: hex convert failure.
- `1101` - Serialize: rustc_hex convert failure.
- `1102` - Serialize: uint convert failure.
Expand Down
3 changes: 2 additions & 1 deletion apps/indexer-proxy/proxy/src/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ pub const ACCOUNT_QUERY: &str = "query { accountMetadata { indexer encryptedKey

pub const VERSION_QUERY: &str = "query { getServicesVersion { coordinator } }";

pub const PROJECT_QUERY: &str = "query { getAliveProjects { id queryEndpoint nodeEndpoint } }";
pub const PROJECT_QUERY: &str =
"query { getAliveProjects { id rateLimit queryEndpoint nodeEndpoint } }";

pub const PAYG_QUERY: &str = "query { getAlivePaygs { id price token expiration overflow } }";

Expand Down
1 change: 1 addition & 0 deletions apps/indexer-proxy/proxy/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,7 @@ async fn handle_close_agreement_query(
query,
MetricsQuery::CloseAgreement,
MetricsNetwork::P2P,
false,
)
.await
}
2 changes: 1 addition & 1 deletion apps/indexer-proxy/proxy/src/payg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ pub async fn query_state(

// query the data.
let (data, signature) =
project_query_raw(project_id, query, MetricsQuery::PAYG, network_type).await?;
project_query_raw(project_id, query, MetricsQuery::PAYG, network_type, true).await?;

state_cache.spent = local_prev + remote_next - remote_prev;
state_cache.remote = remote_next;
Expand Down
51 changes: 51 additions & 0 deletions apps/indexer-proxy/proxy/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use ethers::{
utils::keccak256,
};
use once_cell::sync::Lazy;
use redis::{AsyncCommands, RedisResult};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::collections::HashMap;
Expand All @@ -39,6 +40,7 @@ use tdn::types::group::hash_to_group_id;
use tokio::sync::Mutex;

use crate::account::ACCOUNT;
use crate::cli::redis;
use crate::graphql::{poi_with_block, METADATA_QUERY};
use crate::metrics::{add_metrics_query, update_metrics_projects, MetricsNetwork, MetricsQuery};
use crate::p2p::send;
Expand All @@ -53,6 +55,7 @@ pub struct Project {
pub id: String,
pub query_endpoint: String,
pub node_endpoint: String,
pub rate_limit: Option<i64>,
pub payg_price: U256,
pub payg_token: Address,
pub payg_expiration: u64,
Expand Down Expand Up @@ -152,6 +155,8 @@ pub struct ProjectItem {
pub query_endpoint: String,
#[serde(rename = "nodeEndpoint")]
pub node_endpoint: String,
#[serde(rename = "rateLimit")]
pub project_rate_limit: Option<i64>,
#[serde(rename = "price")]
pub payg_price: String,
#[serde(rename = "token")]
Expand All @@ -166,14 +171,26 @@ pub async fn handle_projects(projects: Vec<ProjectItem>) -> Result<()> {
let mut project_ids = vec![];
let mut new_projects = vec![];
for item in projects {
let rate_limit = if let Some(n) = item.project_rate_limit {
if n > 0 {
Some(n)
} else {
None
}
} else {
None
};

let payg_price = U256::from_dec_str(&item.payg_price).unwrap_or(U256::from(0));
let payg_token: Address = item.payg_token.parse().unwrap_or(Address::zero());
let payg_overflow = item.payg_overflow.into();

project_ids.push(item.id.clone());
let project = Project {
id: item.id,
query_endpoint: item.query_endpoint,
node_endpoint: item.node_endpoint,
rate_limit,
payg_price,
payg_token,
payg_expiration: item.payg_expiration,
Expand All @@ -194,6 +211,7 @@ pub async fn project_metadata(id: &str, network: MetricsNetwork) -> Result<Value
&GraphQLQuery::query(METADATA_QUERY),
MetricsQuery::Free,
network,
true,
)
.await
}
Expand Down Expand Up @@ -274,6 +292,7 @@ pub async fn project_poi(id: &str, block: Option<u64>, network: MetricsNetwork)
&GraphQLQuery::query(&poi_with_block(last_block)),
MetricsQuery::Free,
network,
true,
)
.await
}
Expand All @@ -283,9 +302,25 @@ pub async fn project_query(
query: &GraphQLQuery,
payment: MetricsQuery,
network: MetricsNetwork,
is_limit: bool,
) -> Result<Value> {
let project = get_project(id).await?;

if let Some(limit) = project.rate_limit {
// project rate limit
let conn = redis();
let mut conn_lock = conn.lock().await;
let second = Utc::now().timestamp();
let used_key = format!("{}-rate-{}", project.id, second);

let used: i64 = conn_lock.get(&used_key).await.unwrap_or(0);
if is_limit && used + 1 > limit {
return Err(Error::RateLimit(1057));
}
let _: RedisResult<()> = conn_lock.set_ex(&used_key, used + 1, 1).await;
drop(conn_lock);
}

let now = Instant::now();
let res = graphql_request(&project.query_endpoint, query).await;
let time = now.elapsed().as_millis() as u64;
Expand All @@ -299,9 +334,25 @@ pub async fn project_query_raw(
query: &GraphQLQuery,
payment: MetricsQuery,
network: MetricsNetwork,
is_limit: bool,
) -> Result<(Vec<u8>, String)> {
let project = get_project(id).await?;

if let Some(limit) = project.rate_limit {
// project rate limit
let conn = redis();
let mut conn_lock = conn.lock().await;
let second = Utc::now().timestamp();
let used_key = format!("{}-rate-{}", project.id, second);

let used: i64 = conn_lock.get(&used_key).await.unwrap_or(0);
if is_limit && used + 1 > limit {
return Err(Error::RateLimit(1057));
}
let _: RedisResult<()> = conn_lock.set_ex(&used_key, used + 1, 1).await;
drop(conn_lock);
}

let now = Instant::now();
let res = graphql_request_raw(&project.query_endpoint, query).await;
let time = now.elapsed().as_millis() as u64;
Expand Down
1 change: 1 addition & 0 deletions apps/indexer-proxy/proxy/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ async fn query_handler(
&query,
MetricsQuery::CloseAgreement,
MetricsNetwork::HTTP,
false,
)
.await?;

Expand Down
Loading