diff --git a/.gitignore b/.gitignore index 30cca12..48d2393 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,7 @@ # Generated by Cargo # will have compiled files and executables /target/ - +.idea # These are backup files generated by rustfmt **/*.rs.bk diff --git a/Cargo.lock b/Cargo.lock index f20b020..8d86739 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -717,6 +717,7 @@ dependencies = [ "simple_logger", "tokio", "tokio-tungstenite", + "url", ] [[package]] @@ -1700,6 +1701,7 @@ dependencies = [ "idna", "matches", "percent-encoding", + "serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 92172dd..c9ff2d2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ tokio-tungstenite = { version = "0.20.0", features = ["native-tls"] } bitcrypto = { git = "https://github.com/KomodoPlatform/atomicDEX-API", branch = "dev" } ethkey = { git = "https://github.com/artemii235/parity-ethereum.git" } serialization = { git = "https://github.com/KomodoPlatform/atomicDEX-API", branch = "dev" } +url = { version = "2.2.2", features = ["serde"] } [target.x86_64-unknown-linux-gnu.dependencies] jemallocator = "0.5.0" diff --git a/README.md b/README.md index c1403f4..f30fa1d 100644 --- a/README.md +++ b/README.md @@ -12,29 +12,31 @@ Create the configuration file for app runtime. ```json { - "port": 6150, - "pubkey_path": "/path_to_publick_key.pem", - "privkey_path": "/path_to_private_key.pem", - "redis_connection_string": "redis://localhost", - "token_expiration_time": 300, - "proxy_routes": [ - { - "inbound_route": "/dev", - "outbound_route": "http://localhost:8000", - "authorized": false, - "allowed_methods": [ - "eth_blockNumber", - "eth_gasPrice" - ] - } - ], - "rate_limiter": { - "rp_1_min": 30, - "rp_5_min": 100, - "rp_15_min": 200, - "rp_30_min": 350, - "rp_60_min": 575 - } + "port": 6150, + "pubkey_path": "/path_to_publick_key.pem", + "privkey_path": "/path_to_private_key.pem", + "redis_connection_string": "redis://localhost", + "token_expiration_time": 300, + "proxy_routes": [ + { + "inbound_route": "/dev", + "outbound_route": "http://localhost:8000", + "proxy_type": "quicknode", + "authorized": false, + "allowed_rpc_methods": [ + "eth_blockNumber", + "eth_gasPrice" + ], + "rate_limiter": null + } + ], + "rate_limiter": { + "rp_1_min": 30, + "rp_5_min": 100, + "rp_15_min": 200, + "rp_30_min": 350, + "rp_60_min": 575 + } } ``` @@ -54,17 +56,34 @@ Expose configuration file's path as an environment variable in `AUTH_APP_CONFIG_ 3) If the incoming request comes from the same network, step 4 will be by-passed. -4) Request will be handled in the middleware with: - - Status Checker: Checks if the wallet address status is blocked, allowed, or trusted and does the following: - - Blocked: Return `403 Forbidden` immediately - - Allowed: process continues with the rate limiter - - Trusted: bypass rate limiter and proof of funding - - Rate Limiter: First, verify the signed message, and if not valid, return 401 Unauthorized immediately. If valid, then calculate the request count with time interval specified in the application configuration. If the wallet address sent too many request than the expected amount, process continues with the proof of funding. If not, by-passes the proof of funding. - - Proof of Funding: Return `406 Not Acceptable` if wallet has 0 balance. Otherwise, we assume that request is valid and process continues as usual. +4) Request Handling in the Middleware: -5) Find target route by requested endpoint + **For Quicknode:** + - **Status Checker**: + - **Blocked**: Return `403 Forbidden` immediately. + - **Allowed**: Process continues with the rate limiter. + - **Trusted**: Bypass rate limiter and proof of funding. -6) Check if requested rpc call is allowed in application configuration + - **Rate Limiter**: + - First, verify the signed message. If not valid, return `401 Unauthorized` immediately. + - If valid, calculate the request count with the time interval specified in the application configuration. If the wallet address has sent too many requests than the expected amount, process continues with the proof of funding. If not, bypass the proof of funding. + + - **Proof of Funding**: + - Return `406 Not Acceptable` if the wallet has a 0 balance. Otherwise, assume the request is valid and process it as usual. + + **For Moralis:** + - **Status Checker**: + - **Blocked**: Return `403 Forbidden` immediately. + - **Allowed**: Process continues with the rate limiter. + - **Trusted**: Bypass the rate limiter. + + - **Rate Limiter**: + - First, verify the signed message. If not valid, return `401 Unauthorized` immediately. + - If valid, calculate the request count with the time interval specified in the application configuration. If the wallet address has sent too many requests, return an error `406 Not Acceptable` indicating that the wallet address must wait for some time before making more requests. + +5) Find target route by requested endpoint. + +6) Check if requested rpc call is allowed in application configuration. 7) Generate JWT token with RSA algorithm using pub-priv keys specified in the application configuration, and insert the token to the request header. @@ -97,3 +116,27 @@ curl -v --url "'$mm2_address'" -s --data '{ "id": 0 }' ``` + +### How to run KomodoDefi-Proxy Service with Docker Compose + +If you want to test features locally, you can run Docker containers using Docker Compose commands. + +1. **Update Configuration**: + In the `.config_test` file, update the `proxy_routes` field by adding `ProxyRoutes` with the necessary parameters. + +2. **Run Containers in Detached Mode**: + To start the containers, run the following command. This will build the images if they are not already built or if changes are detected in the Dockerfile or the build context. + ```sh + docker compose up -d + ``` + +3. **Follow the Logs**: + Open a new terminal window or tab and execute this command to follow the logs of all services defined in the Docker Compose file. The `-f` (or `--follow`) option ensures that new log entries are continuously displayed as they are produced, while the `-t` (or `--timestamps`) option adds timestamps to each log entry. + ```sh + docker compose logs -f -t + ``` + +4. **Stop the Containers**: + ```sh + docker compose down + ``` \ No newline at end of file diff --git a/assets/.config_test b/assets/.config_test index 95e9892..c8b140f 100644 --- a/assets/.config_test +++ b/assets/.config_test @@ -1,28 +1,74 @@ { - "port": 5000, - "redis_connection_string": "dummy-value", - "pubkey_path": "dummy-value", - "privkey_path": "dummy-value", - "token_expiration_time": 300, - "proxy_routes": [ - { - "inbound_route": "/test", - "outbound_route": "https://komodoplatform.com", - "authorized": false, - "allowed_methods": [] - }, - { - "inbound_route": "/test-2", - "outbound_route": "https://atomicdex.io", - "authorized": false, - "allowed_methods": [] - } - ], - "rate_limiter": { - "rp_1_min": 555, - "rp_5_min": 555, - "rp_15_min": 555, - "rp_30_min": 555, - "rp_60_min": 555 - } + "port": 6150, + "redis_connection_string": "redis://redis:6379", + "pubkey_path": "/usr/src/komodo-defi-proxy/assets/.pubkey_test", + "privkey_path": "/usr/src/komodo-defi-proxy/assets/.privkey_test", + "token_expiration_time": 300, + "proxy_routes": [ + { + "inbound_route": "/test", + "outbound_route": "https://komodoplatform.com", + "proxy_type": "quicknode", + "authorized": false, + "allowed_rpc_methods": [], + "rate_limiter": null + }, + { + "inbound_route": "/test-2", + "outbound_route": "https://atomicdex.io", + "proxy_type": "quicknode", + "authorized": false, + "allowed_rpc_methods": [], + "rate_limiter": null + }, + { + "inbound_route": "/nft-test", + "outbound_route": "https://nft.proxy", + "proxy_type": "moralis", + "authorized": false, + "allowed_rpc_methods": [], + "rate_limiter": { + "rp_1_min": 60, + "rp_5_min": 200, + "rp_15_min": 700, + "rp_30_min": 1000, + "rp_60_min": 2000 + } + }, + { + "inbound_route": "/nft-test/special", + "outbound_route": "https://nft.special", + "proxy_type": "moralis", + "authorized": false, + "allowed_rpc_methods": [], + "rate_limiter": { + "rp_1_min": 60, + "rp_5_min": 200, + "rp_15_min": 700, + "rp_30_min": 1000, + "rp_60_min": 2000 + } + }, + { + "inbound_route": "/", + "outbound_route": "https://adex.io", + "proxy_type": "moralis", + "authorized": false, + "allowed_rpc_methods": [], + "rate_limiter": { + "rp_1_min": 60, + "rp_5_min": 200, + "rp_15_min": 700, + "rp_30_min": 1000, + "rp_60_min": 2000 + } + } + ], + "rate_limiter": { + "rp_1_min": 555, + "rp_5_min": 555, + "rp_15_min": 555, + "rp_30_min": 555, + "rp_60_min": 555 + } } \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..588b481 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,20 @@ +version: '3.8' +services: + redis: + image: redis:7.2.4-alpine3.19 + restart: always + ports: + - "6379:6379" + + proxy: + build: + context: ./ + dockerfile: Containerfile + ports: + - "6150:6150" + depends_on: + - redis + environment: + AUTH_APP_CONFIG_PATH: /usr/src/komodo-defi-proxy/assets/.config_test + volumes: + - ./assets:/usr/src/komodo-defi-proxy/assets diff --git a/src/ctx.rs b/src/ctx.rs index 316666a..7bece5f 100644 --- a/src/ctx.rs +++ b/src/ctx.rs @@ -1,11 +1,13 @@ -use std::env; - +use hyper::Uri; use once_cell::sync::OnceCell; +use proxy::ProxyType; use serde::{Deserialize, Serialize}; +use std::env; -use super::*; +pub(crate) use super::*; const DEFAULT_TOKEN_EXPIRATION_TIME: i64 = 3600; +pub(crate) const DEFAULT_PORT: u16 = 5000; static CONFIG: OnceCell = OnceCell::new(); pub(crate) fn get_app_config() -> &'static AppConfig { @@ -14,27 +16,49 @@ pub(crate) fn get_app_config() -> &'static AppConfig { }) } +/// Configuration settings for the application, loaded typically from a JSON configuration file. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub(crate) struct AppConfig { + /// Optional server port to listen on. If None in config file, then [DEFAULT_PORT] will be used. pub(crate) port: Option, + /// Redis database connection string. pub(crate) redis_connection_string: String, + /// File path to the public key used for user verification and authentication. pub(crate) pubkey_path: String, + /// File path to the private key used for user verification and authentication. pub(crate) privkey_path: String, + /// Optional token expiration time in seconds. + /// If None then the [DEFAULT_TOKEN_EXPIRATION_TIME] will be used. pub(crate) token_expiration_time: Option, + /// List of proxy routes. pub(crate) proxy_routes: Vec, + /// The default rate limiting rules for maintaining the frequency of incoming traffic for per client. pub(crate) rate_limiter: RateLimiter, } +/// Defines a routing rule for proxying requests from an inbound route to an outbound URL +/// based on a specified proxy type and additional authorization and method filtering criteria. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub(crate) struct ProxyRoute { + /// The incoming route pattern. pub(crate) inbound_route: String, + /// The target URL to which requests are forwarded. pub(crate) outbound_route: String, + /// The type of proxying to perform, directing requests to the appropriate service or API. + pub(crate) proxy_type: ProxyType, + /// Whether authorization is required for this route. #[serde(default)] pub(crate) authorized: bool, + /// Specific RPC methods allowed for this route. #[serde(default)] - pub(crate) allowed_methods: Vec, + pub(crate) allowed_rpc_methods: Vec, + /// Optional custom rate limiter configuration for this route. If provided, + /// this configuration will be used instead of the default rate limiting settings. + pub(crate) rate_limiter: Option, } +/// Configuration for rate limiting to manage the number of requests allowed over specified time intervals. +/// This prevents abuse and ensures fair usage of resources among all clients. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub(crate) struct RateLimiter { pub(crate) rp_1_min: u16, @@ -57,42 +81,96 @@ impl AppConfig { .unwrap_or(DEFAULT_TOKEN_EXPIRATION_TIME) } - pub(crate) fn get_proxy_route_by_inbound(&self, inbound: String) -> Option<&ProxyRoute> { + pub(crate) fn get_proxy_route_by_inbound(&self, inbound: &str) -> Option<&ProxyRoute> { let route_index = self.proxy_routes.iter().position(|r| { - r.inbound_route == inbound || r.inbound_route.to_owned() + "/" == inbound + r.inbound_route == inbound + || r.inbound_route == "/".to_owned() + inbound + || r.inbound_route.to_owned() + "/" == inbound + || "/".to_owned() + &*r.inbound_route == "/".to_owned() + inbound }); - if let Some(index) = route_index { - return Some(&self.proxy_routes[index]); - } + route_index.map(|index| &self.proxy_routes[index]) + } - None + #[inline(always)] + /// Finds the best matching proxy route based on the provided URI's. + pub(crate) fn get_proxy_route_by_uri(&self, uri: &mut Uri) -> Option<&ProxyRoute> { + self.proxy_routes + .iter() + .filter(|proxy_route| uri.path().starts_with(&proxy_route.inbound_route)) + .max_by_key(|proxy_route| proxy_route.inbound_route.len()) } } #[cfg(test)] pub(crate) fn get_app_config_test_instance() -> AppConfig { AppConfig { - port: Some(5000), - redis_connection_string: String::from("dummy-value"), - pubkey_path: String::from("dummy-value"), - privkey_path: String::from("dummy-value"), + port: Some(6150), + redis_connection_string: String::from("redis://redis:6379"), + pubkey_path: String::from("/usr/src/komodo-defi-proxy/assets/.pubkey_test"), + privkey_path: String::from("/usr/src/komodo-defi-proxy/assets/.privkey_test"), token_expiration_time: Some(300), proxy_routes: Vec::from([ ProxyRoute { inbound_route: String::from("/test"), outbound_route: String::from("https://komodoplatform.com"), + proxy_type: ProxyType::Quicknode, authorized: false, - allowed_methods: Vec::default(), + allowed_rpc_methods: Vec::default(), + rate_limiter: None, }, ProxyRoute { inbound_route: String::from("/test-2"), outbound_route: String::from("https://atomicdex.io"), + proxy_type: ProxyType::Quicknode, + authorized: false, + allowed_rpc_methods: Vec::default(), + rate_limiter: None, + }, + ProxyRoute { + inbound_route: String::from("/nft-test"), + outbound_route: String::from("https://nft.proxy"), + proxy_type: ProxyType::Moralis, + authorized: false, + allowed_rpc_methods: Vec::default(), + rate_limiter: Some(RateLimiter { + rp_1_min: 60, + rp_5_min: 200, + rp_15_min: 700, + rp_30_min: 1000, + rp_60_min: 2000, + }), + }, + ProxyRoute { + inbound_route: String::from("/nft-test/special"), + outbound_route: String::from("https://nft.special"), + proxy_type: ProxyType::Moralis, authorized: false, - allowed_methods: Vec::default(), + allowed_rpc_methods: Vec::default(), + rate_limiter: Some(RateLimiter { + rp_1_min: 60, + rp_5_min: 200, + rp_15_min: 700, + rp_30_min: 1000, + rp_60_min: 2000, + }), + }, + ProxyRoute { + inbound_route: String::from("/"), + outbound_route: String::from("https://adex.io"), + proxy_type: ProxyType::Moralis, + authorized: false, + allowed_rpc_methods: Vec::default(), + rate_limiter: Some(RateLimiter { + rp_1_min: 60, + rp_5_min: 200, + rp_15_min: 700, + rp_30_min: 1000, + rp_60_min: 2000, + }), }, ]), - rate_limiter: ctx::RateLimiter { + rate_limiter: RateLimiter { rp_1_min: 555, rp_5_min: 555, rp_15_min: 555, @@ -105,23 +183,69 @@ pub(crate) fn get_app_config_test_instance() -> AppConfig { #[test] fn test_app_config_serialzation_and_deserialization() { let json_config = serde_json::json!({ - "port": 5000, - "redis_connection_string": "dummy-value", - "pubkey_path": "dummy-value", - "privkey_path": "dummy-value", + "port": 6150, + "redis_connection_string": "redis://redis:6379", + "pubkey_path": "/usr/src/komodo-defi-proxy/assets/.pubkey_test", + "privkey_path": "/usr/src/komodo-defi-proxy/assets/.privkey_test", "token_expiration_time": 300, "proxy_routes": [ { "inbound_route": "/test", "outbound_route": "https://komodoplatform.com", + "proxy_type":"quicknode", "authorized": false, - "allowed_methods": [] + "allowed_rpc_methods": [], + "rate_limiter": null }, { "inbound_route": "/test-2", "outbound_route": "https://atomicdex.io", + "proxy_type":"quicknode", + "authorized": false, + "allowed_rpc_methods": [], + "rate_limiter": null + }, + { + "inbound_route": "/nft-test", + "outbound_route": "https://nft.proxy", + "proxy_type":"moralis", + "authorized": false, + "allowed_rpc_methods": [], + "rate_limiter": { + "rp_1_min": 60, + "rp_5_min": 200, + "rp_15_min": 700, + "rp_30_min": 1000, + "rp_60_min": 2000 + } + }, + { + "inbound_route": "/nft-test/special", + "outbound_route": "https://nft.special", + "proxy_type":"moralis", + "authorized": false, + "allowed_rpc_methods": [], + "rate_limiter": { + "rp_1_min": 60, + "rp_5_min": 200, + "rp_15_min": 700, + "rp_30_min": 1000, + "rp_60_min": 2000 + } + }, + { + "inbound_route": "/", + "outbound_route": "https://adex.io", + "proxy_type":"moralis", "authorized": false, - "allowed_methods": [] + "allowed_rpc_methods": [], + "rate_limiter": { + "rp_1_min": 60, + "rp_5_min": 200, + "rp_15_min": 700, + "rp_30_min": 1000, + "rp_60_min": 2000 + } } ], "rate_limiter": { diff --git a/src/db.rs b/src/db.rs index 6660ee8..1146b6e 100644 --- a/src/db.rs +++ b/src/db.rs @@ -36,7 +36,6 @@ impl Db { } } - #[allow(dead_code)] pub(crate) async fn key_exists(&mut self, key: &str) -> GenericResult { Ok(redis::cmd("EXISTS") .arg(key) @@ -54,7 +53,7 @@ impl Db { .arg(key) .arg(seconds) .arg(value) - .query_async(&mut self.connection) + .query_async::<_, ()>(&mut self.connection) .await?; Ok(()) diff --git a/src/main.rs b/src/main.rs index d4528eb..2db1909 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,8 +10,7 @@ mod db; mod http; #[path = "security/jwt.rs"] mod jwt; -#[path = "security/proof_of_funding.rs"] -mod proof_of_funding; +mod proxy; #[path = "security/rate_limiter.rs"] mod rate_limiter; #[path = "net/rpc.rs"] @@ -27,7 +26,12 @@ mod websocket; #[global_allocator] static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc; +/// A type alias for a generic error, encompassing any error that implements the `Error` trait, +/// along with traits for thread-safe error handling (`Send` and `Sync`). +/// This type is typically used across the application to handle errors uniformly. type GenericError = Box; +/// A type alias for a generic result, used throughout the application to encapsulate the +/// outcome of operations that might fail with a `GenericError`. type GenericResult = std::result::Result; #[tokio::main] diff --git a/src/net/http.rs b/src/net/http.rs index c71e898..41a5c31 100644 --- a/src/net/http.rs +++ b/src/net/http.rs @@ -1,21 +1,20 @@ -use std::net::SocketAddr; - +use super::*; +use crate::proxy::{generate_payload_from_req, proxy, validation_middleware}; +use crate::server::is_private_ip; use address_status::{get_address_status_list, post_address_status}; -use ctx::{AppConfig, ProxyRoute}; -use hyper::header::HeaderName; +use ctx::AppConfig; use hyper::{ header::{self, HeaderValue}, Body, HeaderMap, Method, Request, Response, StatusCode, }; -use hyper_tls::HttpsConnector; use jwt::{get_cached_token_or_generate_one, JwtClaims}; -use serde::{Deserialize, Serialize}; use serde_json::json; -use sign::SignedMessage; - -use super::*; -use crate::server::{is_private_ip, validation_middleware}; +use std::net::SocketAddr; +/// Header value for `hyper::header::CONTENT_TYPE` +pub(crate) const APPLICATION_JSON: &str = "application/json"; +/// Represents `X-Forwarded-For` Header key +pub(crate) const X_FORWARDED_FOR: &str = "x-forwarded-for"; async fn get_healthcheck() -> GenericResult> { let json = json!({ "health": "ok", @@ -26,7 +25,7 @@ async fn get_healthcheck() -> GenericResult> { .header("Access-Control-Allow-Origin", "*") .header("Access-Control-Allow-Headers", "*") .header("Access-Control-Allow-Methods", "POST, GET, OPTIONS") - .header(header::CONTENT_TYPE, "application/json") + .header(header::CONTENT_TYPE, APPLICATION_JSON) .body(Body::from(json.to_string()))?) } @@ -62,132 +61,9 @@ pub(crate) async fn insert_jwt_to_http_header( Ok(()) } -async fn parse_payload(req: Request) -> GenericResult<(Request, RpcPayload)> { - let (parts, body) = req.into_parts(); - let body_bytes = hyper::body::to_bytes(body).await?; - - let payload: RpcPayload = serde_json::from_slice(&body_bytes)?; - - Ok((Request::from_parts(parts, Body::from(body_bytes)), payload)) -} - -#[derive(Debug, Serialize, Deserialize, PartialEq)] -pub(crate) struct RpcPayload { - pub(crate) method: String, - pub(crate) params: serde_json::value::Value, - pub(crate) id: usize, - pub(crate) jsonrpc: String, - pub(crate) signed_message: SignedMessage, -} - -async fn proxy( - cfg: &AppConfig, - mut req: Request, - remote_addr: &SocketAddr, - payload: RpcPayload, - x_forwarded_for: HeaderValue, - proxy_route: &ProxyRoute, -) -> GenericResult> { - // check if requested method allowed - if !proxy_route.allowed_methods.contains(&payload.method) { - log::warn!( - "{}", - log_format!( - remote_addr.ip(), - payload.signed_message.address, - req.uri(), - "Method {} not allowed for, returning 403.", - payload.method - ) - ); - return response_by_status(StatusCode::FORBIDDEN); - } - - if proxy_route.authorized { - // modify outgoing request - if insert_jwt_to_http_header(cfg, req.headers_mut()) - .await - .is_err() - { - log::error!( - "{}", - log_format!( - remote_addr.ip(), - payload.signed_message.address, - req.uri(), - "Error inserting JWT into http header, returning 500." - ) - ); - return response_by_status(StatusCode::INTERNAL_SERVER_ERROR); - } - } - - let original_req_uri = req.uri().clone(); - *req.uri_mut() = match proxy_route.outbound_route.parse() { - Ok(uri) => uri, - Err(_) => { - log::error!( - "{}", - log_format!( - remote_addr.ip(), - payload.signed_message.address, - original_req_uri, - "Error type casting value of {} into Uri, returning 500.", - proxy_route.outbound_route - ) - ); - return response_by_status(StatusCode::INTERNAL_SERVER_ERROR); - } - }; - - // drop hop headers - for key in &[ - header::ACCEPT_ENCODING, - header::CONNECTION, - header::HOST, - header::PROXY_AUTHENTICATE, - header::PROXY_AUTHORIZATION, - header::TE, - header::TRANSFER_ENCODING, - header::TRAILER, - header::UPGRADE, - header::HeaderName::from_static("keep-alive"), - ] { - req.headers_mut().remove(key); - } - - req.headers_mut() - .insert(HeaderName::from_static("x-forwarded-for"), x_forwarded_for); - req.headers_mut() - .insert(header::CONTENT_TYPE, "application/json".parse()?); - - let https = HttpsConnector::new(); - let client = hyper::Client::builder().build(https); - - let target_uri = req.uri().clone(); - let res = match client.request(req).await { - Ok(t) => t, - Err(_) => { - log::warn!( - "{}", - log_format!( - remote_addr.ip(), - payload.signed_message.address, - original_req_uri, - "Couldn't reach {}, returning 503.", - target_uri - ) - ); - return response_by_status(StatusCode::SERVICE_UNAVAILABLE); - } - }; - - Ok(res) -} - pub(crate) async fn http_handler( cfg: &AppConfig, - req: Request, + mut req: Request, remote_addr: SocketAddr, ) -> GenericResult> { let req_uri = req.uri().clone(); @@ -217,16 +93,50 @@ pub(crate) async fn http_handler( return handle_preflight(); } - let (req, payload) = match parse_payload(req).await { + let proxy_route = match req.method() { + &Method::GET => match cfg.get_proxy_route_by_uri(req.uri_mut()) { + Some(proxy_route) => proxy_route, + None => { + log::warn!( + "{}", + log_format!( + remote_addr.ip(), + String::from("-"), + req_uri, + "Proxy route not found for GET request, returning 404." + ) + ); + return response_by_status(StatusCode::NOT_FOUND); + } + }, + _ => match cfg.get_proxy_route_by_inbound(req.uri().path()) { + Some(proxy_route) => proxy_route, + None => { + log::warn!( + "{}", + log_format!( + remote_addr.ip(), + String::from("-"), + req_uri, + "Proxy route not found for non-GET request, returning 404." + ) + ); + return response_by_status(StatusCode::NOT_FOUND); + } + }, + }; + + let (req, payload) = match generate_payload_from_req(req, &proxy_route.proxy_type).await { Ok(t) => t, - Err(_) => { + Err(e) => { log::warn!( "{}", log_format!( remote_addr.ip(), String::from("-"), req_uri, - "Recieved invalid http payload, returning 401." + "Received invalid http payload: {}, returning 401.", + e ) ); return response_by_status(StatusCode::UNAUTHORIZED); @@ -237,9 +147,9 @@ pub(crate) async fn http_handler( "{}", log_format!( remote_addr.ip(), - payload.signed_message.address, + payload.signed_message().address, req_uri, - "Request received." + "Request and payload data received." ) ); @@ -250,7 +160,7 @@ pub(crate) async fn http_handler( "{}", log_format!( remote_addr.ip(), - payload.signed_message.address, + payload.signed_message().address, req_uri, "Error type casting of IpAddr into HeaderValue, returning 500." ) @@ -259,22 +169,6 @@ pub(crate) async fn http_handler( } }; - let proxy_route = match cfg.get_proxy_route_by_inbound(req.uri().to_string()) { - Some(proxy_route) => proxy_route, - None => { - log::warn!( - "{}", - log_format!( - remote_addr.ip(), - payload.signed_message.address, - req.uri(), - "Proxy route not found, returning 404." - ) - ); - return response_by_status(StatusCode::NOT_FOUND); - } - }; - if is_private_ip { return proxy( cfg, @@ -305,58 +199,47 @@ pub(crate) async fn http_handler( } #[test] -fn test_rpc_payload_serialzation_and_deserialization() { - let json_payload = json!({ - "method": "dummy-value", - "params": [], - "id": 1, - "jsonrpc": "2.0", - "signed_message": { - "coin_ticker": "ETH", - "address": "dummy-value", - "timestamp_message": 1655319963, - "signature": "dummy-value", - } - }); +fn test_get_proxy_route_by_inbound() { + use hyper::Uri; + use std::str::FromStr; - let actual_payload: RpcPayload = serde_json::from_str(&json_payload.to_string()).unwrap(); - - let expected_payload = RpcPayload { - method: String::from("dummy-value"), - params: json!([]), - id: 1, - jsonrpc: String::from("2.0"), - signed_message: SignedMessage { - coin_ticker: String::from("ETH"), - address: String::from("dummy-value"), - timestamp_message: 1655319963, - signature: String::from("dummy-value"), - }, - }; + let cfg = ctx::get_app_config_test_instance(); + + let proxy_route = cfg.get_proxy_route_by_inbound("/test").unwrap(); + + assert_eq!(proxy_route.outbound_route, "https://komodoplatform.com"); + + let proxy_route = cfg.get_proxy_route_by_inbound("/test-2").unwrap(); - assert_eq!(actual_payload, expected_payload); + assert_eq!(proxy_route.outbound_route, "https://atomicdex.io"); - // Backwards - let json = serde_json::to_value(expected_payload).unwrap(); - assert_eq!(json_payload, json); - assert_eq!(json_payload.to_string(), json.to_string()); + let url = Uri::from_str("https://komodo.proxy:5535/nft-test").unwrap(); + let path = url.path().to_string(); + let proxy_route = cfg.get_proxy_route_by_inbound(&path).unwrap(); + assert_eq!(proxy_route.outbound_route, "https://nft.proxy"); } #[test] -fn test_get_proxy_route_by_inbound() { - let cfg = ctx::get_app_config_test_instance(); +fn test_get_proxy_route_by_uri_inbound() { + use hyper::Uri; + use std::str::FromStr; - let proxy_route = cfg - .get_proxy_route_by_inbound(String::from("/test")) - .unwrap(); + let cfg = ctx::get_app_config_test_instance(); - assert_eq!(proxy_route.outbound_route, "https://komodoplatform.com"); + // test "/nft-test" inbound case + let mut url = Uri::from_str("https://komodo.proxy:5535/nft-test/nft/api/v2.2/0x1f9090aaE28b8a3dCeaDf281B0F12828e676c326/nft/transfers?chain=eth&format=decimal&order=DESC").unwrap(); + let proxy_route = cfg.get_proxy_route_by_uri(&mut url).unwrap(); + assert_eq!(proxy_route.outbound_route, "https://nft.proxy"); - let proxy_route = cfg - .get_proxy_route_by_inbound(String::from("/test-2")) - .unwrap(); + // test "/nft-test/special" inbound case + let mut url = Uri::from_str("https://komodo.proxy:3333/nft-test/special/api/v2.2/0x1f9090aaE28b8a3dCeaDf281B0F12828e676c326/nft/transfers?chain=eth&format=decimal&order=DESC").unwrap(); + let proxy_route = cfg.get_proxy_route_by_uri(&mut url).unwrap(); + assert_eq!(proxy_route.outbound_route, "https://nft.special"); - assert_eq!(proxy_route.outbound_route, "https://atomicdex.io"); + // test "/" inbound case + let mut url = Uri::from_str("https://komodo.proxy:0333/api/v2.2/0x1f9090aaE28b8a3dCeaDf281B0F12828e676c326/nft/transfers?chain=eth&format=decimal&order=DESC").unwrap(); + let proxy_route = cfg.get_proxy_route_by_uri(&mut url).unwrap(); + assert_eq!(proxy_route.outbound_route, "https://adex.io"); } #[test] @@ -374,45 +257,3 @@ fn test_respond_by_status() { assert_eq!(res.status(), status_type); } } - -#[tokio::test] -async fn test_parse_payload() { - let serialized_payload = json!({ - "method": "dummy-value", - "params": [], - "id": 1, - "jsonrpc": "2.0", - "signed_message": { - "coin_ticker": "ETH", - "address": "dummy-value", - "timestamp_message": 1655319963, - "signature": "dummy-value", - } - }) - .to_string(); - - let mut req = Request::new(Body::from(serialized_payload)); - req.headers_mut().insert( - HeaderName::from_static("dummy-header"), - "dummy-value".parse().unwrap(), - ); - - let (req, payload) = parse_payload(req).await.unwrap(); - let header_value = req.headers().get("dummy-header").unwrap(); - - let expected_payload = RpcPayload { - method: String::from("dummy-value"), - params: json!([]), - id: 1, - jsonrpc: String::from("2.0"), - signed_message: SignedMessage { - coin_ticker: String::from("ETH"), - address: String::from("dummy-value"), - timestamp_message: 1655319963, - signature: String::from("dummy-value"), - }, - }; - - assert_eq!(payload, expected_payload); - assert_eq!(header_value, "dummy-value"); -} diff --git a/src/net/rpc.rs b/src/net/rpc.rs index 932098b..3256311 100644 --- a/src/net/rpc.rs +++ b/src/net/rpc.rs @@ -1,6 +1,6 @@ use bytes::Buf; use ctx::AppConfig; -use http::insert_jwt_to_http_header; +use http::{insert_jwt_to_http_header, APPLICATION_JSON}; use hyper::{body::aggregate, header, Body, Request}; use hyper_tls::HttpsConnector; use serde_json::from_reader; @@ -27,7 +27,7 @@ impl RpcClient { ) -> GenericResult { let mut req = Request::post(&self.url).body(Body::from(payload.to_string()))?; req.headers_mut() - .append(header::CONTENT_TYPE, "application/json".parse()?); + .append(header::CONTENT_TYPE, APPLICATION_JSON.parse()?); if is_authorized { insert_jwt_to_http_header(cfg, req.headers_mut()).await?; diff --git a/src/net/server.rs b/src/net/server.rs index 863a802..790a3ff 100644 --- a/src/net/server.rs +++ b/src/net/server.rs @@ -3,22 +3,18 @@ use std::str::FromStr; use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Request, Response, Server, StatusCode, Uri}; +use hyper::{Body, Request, Response, Server, StatusCode}; -use crate::address_status::AddressStatusOperations; -use crate::ctx::ProxyRoute; -use crate::db::Db; -use crate::http::{http_handler, response_by_status, RpcPayload}; +use super::{GenericError, GenericResult}; +use crate::ctx::{AppConfig, DEFAULT_PORT}; +use crate::http::{http_handler, response_by_status, X_FORWARDED_FOR}; use crate::log_format; -use crate::proof_of_funding::{verify_message_and_balance, ProofOfFundingError}; -use crate::rate_limiter::RateLimitOperations; use crate::websocket::{should_upgrade_to_socket_conn, socket_handler}; -use crate::{ctx::AppConfig, GenericError, GenericResult}; #[macro_export] macro_rules! log_format { ($ip: expr, $address: expr, $path: expr, $format: expr, $($args: tt)+) => {format!(concat!("[Ip: {} | Address: {} | Path: {}] ", $format), $ip, $address, $path, $($args)+)}; - ($ip: expr, $address: expr, $path: expr, $format: expr) => {format!(concat!("[Ip: {} | Pubkey: {} | Address: {}] ", $format), $ip, $address, $path)} + ($ip: expr, $address: expr, $path: expr, $format: expr) => {format!(concat!("[Ip: {} | Address: {} | Path: {}] ", $format), $ip, $address, $path)} } pub(crate) fn is_private_ip(ip: &IpAddr) -> bool { @@ -30,7 +26,7 @@ pub(crate) fn is_private_ip(ip: &IpAddr) -> bool { } fn get_real_address(req: &Request, remote_addr: &SocketAddr) -> GenericResult { - if let Some(ip) = req.headers().get("x-forwarded-for") { + if let Some(ip) = req.headers().get(X_FORWARDED_FOR) { let addr = IpAddr::from_str(ip.to_str()?)?; return Ok(SocketAddr::new(addr, remote_addr.port())); @@ -39,6 +35,12 @@ fn get_real_address(req: &Request, remote_addr: &SocketAddr) -> GenericRes Ok(*remote_addr) } +/// Handles incoming HTTP requests based on their content and whether they need to be upgraded +/// to a socket connection. +/// +/// This function first resolves the real client address from the request, considering forwarded headers. +/// It then decides whether to handle the request as a regular HTTP request or upgrade it to a +/// socket-based connection based on its headers and content. async fn connection_handler( cfg: &AppConfig, req: Request, @@ -67,112 +69,10 @@ async fn connection_handler( } } -pub(crate) async fn validation_middleware( - cfg: &AppConfig, - payload: &RpcPayload, - proxy_route: &ProxyRoute, - req_uri: &Uri, - remote_addr: &SocketAddr, -) -> Result<(), StatusCode> { - let mut db = Db::create_instance(cfg).await; - - match db - .read_address_status(&payload.signed_message.address) - .await - { - crate::address_status::AddressStatus::Trusted => Ok(()), - crate::address_status::AddressStatus::Blocked => Err(StatusCode::FORBIDDEN), - crate::address_status::AddressStatus::None => { - let signed_message_status = verify_message_and_balance(cfg, payload, proxy_route).await; - - if let Err(ProofOfFundingError::InvalidSignedMessage) = signed_message_status { - log::warn!( - "{}", - log_format!( - remote_addr.ip(), - payload.signed_message.address, - req_uri, - "Request has invalid signed message, returning 401" - ) - ); - - return Err(StatusCode::UNAUTHORIZED); - }; - - let rate_limiter_key = format!( - "{}:{}", - payload.signed_message.coin_ticker, payload.signed_message.address - ); - - match db.rate_exceeded(&rate_limiter_key, &cfg.rate_limiter).await { - Ok(false) => {} - _ => { - log::warn!( - "{}", - log_format!( - remote_addr.ip(), - payload.signed_message.address, - req_uri, - "Rate exceed for {}, checking balance for {} address.", - rate_limiter_key, - payload.signed_message.address - ) - ); - - match verify_message_and_balance(cfg, payload, proxy_route).await { - Ok(_) => {} - Err(ProofOfFundingError::InsufficientBalance) => { - log::warn!( - "{}", - log_format!( - remote_addr.ip(), - payload.signed_message.address, - req_uri, - "Wallet {} has insufficient balance for coin {}, returning 406.", - payload.signed_message.coin_ticker, - payload.signed_message.address - ) - ); - - return Err(StatusCode::NOT_ACCEPTABLE); - } - e => { - log::error!( - "{}", - log_format!( - remote_addr.ip(), - payload.signed_message.address, - req_uri, - "verify_message_and_balance failed in coin {}: {:?}", - payload.signed_message.coin_ticker, - e - ) - ); - return Err(StatusCode::INTERNAL_SERVER_ERROR); - } - } - } - }; - - if db.rate_address(rate_limiter_key).await.is_err() { - log::error!( - "{}", - log_format!( - remote_addr.ip(), - payload.signed_message.address, - req_uri, - "Rate incrementing failed." - ) - ); - }; - - Ok(()) - } - } -} - +/// Starts serving the proxy API on the configured port. This function sets up the HTTP server, +/// binds it to the specified address, and listens for incoming requests. pub(crate) async fn serve(cfg: &'static AppConfig) -> GenericResult<()> { - let addr = format!("0.0.0.0:{}", cfg.port.unwrap_or(5000)).parse()?; + let addr = format!("0.0.0.0:{}", cfg.port.unwrap_or(DEFAULT_PORT)).parse()?; let handler = make_service_fn(move |c_stream: &AddrStream| { let remote_addr = c_stream.remote_addr(); @@ -185,7 +85,7 @@ pub(crate) async fn serve(cfg: &'static AppConfig) -> GenericResult<()> { let server = Server::bind(&addr).serve(handler); - log::info!("AtomicDEX Auth API serving on http://{}", addr); + log::info!("Komodo-DeFi-Proxy API serving on http://{}", addr); Ok(server.await?) } @@ -201,7 +101,7 @@ fn test_get_real_address() { assert_eq!("127.0.0.1", remote_addr.ip().to_string()); req.headers_mut().insert( - hyper::header::HeaderName::from_static("x-forwarded-for"), + hyper::header::HeaderName::from_static(X_FORWARDED_FOR), "0.0.0.0".parse().unwrap(), ); diff --git a/src/net/websocket.rs b/src/net/websocket.rs index b5ec7d9..2b22150 100644 --- a/src/net/websocket.rs +++ b/src/net/websocket.rs @@ -10,9 +10,9 @@ use tokio_tungstenite::{ use crate::{ ctx::AppConfig, - http::{response_by_status, RpcPayload}, + http::response_by_status, log_format, - server::validation_middleware, + proxy::{validation_middleware_quicknode, QuicknodeSocketPayload}, GenericResult, }; @@ -26,8 +26,8 @@ pub(crate) async fn socket_handler( mut req: Request, remote_addr: SocketAddr, ) -> GenericResult> { - let inbound_route = req.uri().to_string(); - let proxy_route = match cfg.get_proxy_route_by_inbound(inbound_route) { + let inbound_route = req.uri().path().to_string(); + let proxy_route = match cfg.get_proxy_route_by_inbound(&inbound_route) { Some(proxy_route) => proxy_route.clone(), None => { log::warn!( @@ -137,7 +137,7 @@ pub(crate) async fn socket_handler( match msg { Some(Ok(msg)) => { if let Message::Text(msg) = msg { - let payload: RpcPayload = match serde_json::from_str(&msg) { + let socket_payload: QuicknodeSocketPayload = match serde_json::from_str(&msg) { Ok(t) => t, Err(e) => { if let Err(e) = inbound_socket.send(format!("Invalid payload. {e}").into()).await { @@ -155,9 +155,9 @@ pub(crate) async fn socket_handler( continue; }, }; + let (payload, signed_message) = socket_payload.into_parts(); - - if !proxy_route.allowed_methods.contains(&payload.method) { + if !proxy_route.allowed_rpc_methods.contains(&payload.method) { if let Err(e) = inbound_socket.send("Method not allowed.".into()).await { log::error!( "{}", @@ -173,9 +173,10 @@ pub(crate) async fn socket_handler( continue; } - match validation_middleware( + // TODO add general validation_middleware support (if have new features which support websocket) + match validation_middleware_quicknode( &cfg, - &payload, + &signed_message, &proxy_route, req.uri(), &remote_addr, @@ -236,7 +237,7 @@ pub(crate) async fn socket_handler( _ => break }; } - }; + } } } e => { diff --git a/src/proxy/mod.rs b/src/proxy/mod.rs new file mode 100644 index 0000000..8471b13 --- /dev/null +++ b/src/proxy/mod.rs @@ -0,0 +1,201 @@ +use crate::ctx::{AppConfig, GenericResult, ProxyRoute}; +use crate::sign::SignedMessage; +use hyper::header; +use hyper::header::{HeaderName, HeaderValue}; +use hyper::{Body, Request, Response, StatusCode, Uri}; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; +use std::net::SocketAddr; +mod moralis; +use moralis::{proxy_moralis, validation_middleware_moralis}; +mod quicknode; +pub(crate) use quicknode::{ + proxy_quicknode, validation_middleware_quicknode, QuicknodePayload, QuicknodeSocketPayload, +}; + +const X_AUTH_PAYLOAD: &str = "X-Auth-Payload"; +const KEEP_ALIVE: &str = "keep-alive"; + +/// Enumerates different proxy types supported by the application, focusing on separating feature logic. +/// This allows for differentiated handling based on what the proxy should do with the request, +/// directing each to the appropriate service or API based on its designated proxy type. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub(crate) enum ProxyType { + Quicknode, + Moralis, +} + +/// Represents the types of payloads that can be processed by the proxy, with each variant tailored to a specific proxy type. +/// This helps in managing the logic for routing and processing requests appropriately within the proxy layer. +#[derive(Clone, Debug, PartialEq)] +pub(crate) enum PayloadData { + /// Quicknode feature requires body payload and Signed Message in X-Auth-Payload header + Quicknode { + payload: QuicknodePayload, + signed_message: SignedMessage, + }, + /// Moralis feature requires only Signed Message in X-Auth-Payload header and doesn't have body + Moralis(SignedMessage), +} + +impl PayloadData { + /// Returns a reference to the `SignedMessage` contained within the payload. + pub(crate) fn signed_message(&self) -> &SignedMessage { + match self { + PayloadData::Quicknode { signed_message, .. } => signed_message, + PayloadData::Moralis(payload) => payload, + } + } +} + +/// Asynchronously generates and organizes payload data from an HTTP request based on the specified proxy type. +/// This function ensures that requests are properly formatted to the correct service, +/// returning a tuple with the request and the structured payload. +pub(crate) async fn generate_payload_from_req( + req: Request, + proxy_type: &ProxyType, +) -> GenericResult<(Request, PayloadData)> { + match proxy_type { + ProxyType::Quicknode => { + let (req, payload, signed_message) = + parse_body_and_auth_header::(req).await?; + let payload_data = PayloadData::Quicknode { + payload, + signed_message, + }; + Ok((req, payload_data)) + } + ProxyType::Moralis => { + let (req, signed_message) = parse_auth_header(req).await?; + Ok((req, PayloadData::Moralis(signed_message))) + } + } +} + +pub(crate) async fn proxy( + cfg: &AppConfig, + req: Request, + remote_addr: &SocketAddr, + payload: PayloadData, + x_forwarded_for: HeaderValue, + proxy_route: &ProxyRoute, +) -> GenericResult> { + match payload { + PayloadData::Quicknode { + payload, + signed_message, + } => { + proxy_quicknode( + cfg, + req, + remote_addr, + payload, + signed_message, + x_forwarded_for, + proxy_route, + ) + .await + } + PayloadData::Moralis(signed_message) => { + proxy_moralis( + cfg, + req, + remote_addr, + signed_message, + x_forwarded_for, + proxy_route, + ) + .await + } + } +} + +pub(crate) async fn validation_middleware( + cfg: &AppConfig, + payload: &PayloadData, + proxy_route: &ProxyRoute, + req_uri: &Uri, + remote_addr: &SocketAddr, +) -> Result<(), StatusCode> { + match payload { + PayloadData::Quicknode { signed_message, .. } => { + validation_middleware_quicknode(cfg, signed_message, proxy_route, req_uri, remote_addr) + .await + } + PayloadData::Moralis(signed_message) => { + validation_middleware_moralis(cfg, signed_message, proxy_route, req_uri, remote_addr) + .await + } + } +} + +/// Parses the request body and the `X-Auth-Payload` header into a payload and signed message. +/// +/// This function extracts the `X-Auth-Payload` header from the request, parses it into a `SignedMessage`, +/// and then reads and deserializes the request body into a specified type `T`. +/// If the body is empty or the header is missing, an error is returned. +async fn parse_body_and_auth_header( + req: Request, +) -> GenericResult<(Request, T, SignedMessage)> +where + T: DeserializeOwned, +{ + let (parts, body) = req.into_parts(); + let header_value = parts + .headers + .get(X_AUTH_PAYLOAD) + .ok_or("Missing X-Auth-Payload header")? + .to_str()?; + let signed_message: SignedMessage = serde_json::from_str(header_value)?; + let body_bytes = hyper::body::to_bytes(body).await?; + if body_bytes.is_empty() { + return Err("Empty body cannot be deserialized into non-optional type T".into()); + } + let payload: T = serde_json::from_slice(&body_bytes)?; + let new_req = Request::from_parts(parts, Body::from(body_bytes)); + Ok((new_req, payload, signed_message)) +} + +/// Parses [SignedMessage] value from X-Auth-Payload header +async fn parse_auth_header(req: Request) -> GenericResult<(Request, SignedMessage)> { + let (parts, body) = req.into_parts(); + let header_value = parts + .headers + .get(X_AUTH_PAYLOAD) + .ok_or("Missing X-Auth-Payload header")? + .to_str()?; + let payload: SignedMessage = serde_json::from_str(header_value)?; + let new_req = Request::from_parts(parts, body); + Ok((new_req, payload)) +} + +fn remove_hop_by_hop_headers( + req: &mut Request, + additional_headers_to_remove: &[HeaderName], +) -> GenericResult<()> { + // List of common hop headers to be removed + let mut headers_to_remove = vec![ + header::ACCEPT_ENCODING, + header::CONNECTION, + header::HOST, + header::PROXY_AUTHENTICATE, + header::PROXY_AUTHORIZATION, + header::TE, + header::TRANSFER_ENCODING, + header::TRAILER, + header::UPGRADE, + HeaderName::from_static(KEEP_ALIVE), + HeaderName::from_bytes(X_AUTH_PAYLOAD.as_bytes())?, + ]; + + // Extend with additional headers to remove + headers_to_remove.extend_from_slice(additional_headers_to_remove); + + // Remove headers + for key in &headers_to_remove { + req.headers_mut().remove(key); + } + + Ok(()) +} diff --git a/src/proxy/moralis.rs b/src/proxy/moralis.rs new file mode 100644 index 0000000..a997f05 --- /dev/null +++ b/src/proxy/moralis.rs @@ -0,0 +1,321 @@ +use crate::address_status::{AddressStatus, AddressStatusOperations}; +use crate::ctx::{AppConfig, ProxyRoute}; +use crate::db::Db; +use crate::http::{ + insert_jwt_to_http_header, response_by_status, APPLICATION_JSON, X_FORWARDED_FOR, +}; +use crate::proxy::remove_hop_by_hop_headers; +use crate::rate_limiter::RateLimitOperations; +use crate::sign::{SignOps, SignedMessage}; +use crate::{log_format, GenericResult}; +use hyper::header::{HeaderName, HeaderValue}; +use hyper::{header, Body, Request, Response, StatusCode, Uri}; +use hyper_tls::HttpsConnector; +use std::net::SocketAddr; + +pub(crate) async fn proxy_moralis( + cfg: &AppConfig, + mut req: Request, + remote_addr: &SocketAddr, + signed_message: SignedMessage, + x_forwarded_for: HeaderValue, + proxy_route: &ProxyRoute, +) -> GenericResult> { + if proxy_route.authorized { + if let Err(e) = insert_jwt_to_http_header(cfg, req.headers_mut()).await { + log::error!( + "{}", + log_format!( + remote_addr.ip(), + signed_message.address, + req.uri(), + "Error inserting JWT into HTTP header: {}, returning 500.", + e + ) + ); + return response_by_status(StatusCode::INTERNAL_SERVER_ERROR); + } + } + + let original_req_uri = req.uri().clone(); + + if let Err(e) = modify_request_uri(&mut req, proxy_route) { + log::error!( + "{}", + log_format!( + remote_addr.ip(), + signed_message.address, + original_req_uri, + "Error modifying request base Uri: {}, returning 500.", + e + ) + ); + return response_by_status(StatusCode::INTERNAL_SERVER_ERROR); + } + + remove_hop_by_hop_headers(&mut req, &[header::CONTENT_LENGTH])?; + + req.headers_mut() + .insert(HeaderName::from_static(X_FORWARDED_FOR), x_forwarded_for); + req.headers_mut() + .insert(header::ACCEPT, APPLICATION_JSON.parse()?); + + let https = HttpsConnector::new(); + let client = hyper::Client::builder().build(https); + + let target_uri = req.uri().clone(); + let res = match client.request(req).await { + Ok(t) => t, + Err(e) => { + log::warn!( + "{}", + log_format!( + remote_addr.ip(), + signed_message.address, + original_req_uri, + "Couldn't reach {}: {}. Returning 503.", + target_uri, + e + ) + ); + return response_by_status(StatusCode::SERVICE_UNAVAILABLE); + } + }; + + Ok(res) +} + +/// This function removes the matched inbound route from the request URI and +/// replaces request base URI with the outbound route specified in the proxy route. +fn modify_request_uri(req: &mut Request, proxy_route: &ProxyRoute) -> GenericResult<()> { + let proxy_base_uri = proxy_route.outbound_route.parse::()?; + let original_uri = req.uri(); + + let original_path_and_query = original_uri + .path_and_query() + .map(|pq| pq.as_str()) + .unwrap_or(""); + // Remove the "inbound_route" part from the original path and query + let remaining_path_and_query = if proxy_route.inbound_route == "/" { + original_path_and_query + } else { + original_path_and_query + .strip_prefix(&proxy_route.inbound_route) + .ok_or("Route doesn't match with the given inbound URL.")? + }; + + let mut base_uri_parts = proxy_base_uri.into_parts(); + base_uri_parts.path_and_query = Some(remaining_path_and_query.parse()?); + let new_uri = Uri::from_parts(base_uri_parts)?; + *req.uri_mut() = new_uri; + Ok(()) +} + +pub(crate) async fn validation_middleware_moralis( + cfg: &AppConfig, + signed_message: &SignedMessage, + proxy_route: &ProxyRoute, + req_uri: &Uri, + remote_addr: &SocketAddr, +) -> Result<(), StatusCode> { + let mut db = Db::create_instance(cfg).await; + + match db.read_address_status(&signed_message.address).await { + AddressStatus::Trusted => Ok(()), + AddressStatus::Blocked => Err(StatusCode::FORBIDDEN), + AddressStatus::None => { + match signed_message.verify_message() { + Ok(true) => {} + Ok(false) => { + log::warn!( + "{}", + log_format!( + remote_addr.ip(), + signed_message.address, + req_uri, + "Request has invalid signed message, returning 401" + ) + ); + + return Err(StatusCode::UNAUTHORIZED); + } + Err(e) => { + log::error!( + "{}", + log_format!( + remote_addr.ip(), + signed_message.address, + req_uri, + "verify_message failed in coin {}: {}, returning 500.", + signed_message.coin_ticker, + e + ) + ); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } + } + + let rate_limiter_key = + format!("{}:{}", signed_message.coin_ticker, signed_message.address); + + let rate_limiter = proxy_route + .rate_limiter + .as_ref() + .unwrap_or(&cfg.rate_limiter); + match db.rate_exceeded(&rate_limiter_key, rate_limiter).await { + Ok(false) => {} + Ok(true) => { + log::warn!( + "{}", + log_format!( + remote_addr.ip(), + signed_message.address, + req_uri, + "Rate exceed for {}, returning 406.", + rate_limiter_key, + ) + ); + return Err(StatusCode::NOT_ACCEPTABLE); + } + Err(e) => { + log::error!( + "{}", + log_format!( + remote_addr.ip(), + signed_message.address, + req_uri, + "Rate exceeded check failed in coin {}: {}, returning 500.", + signed_message.coin_ticker, + e + ) + ); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } + } + + if let Err(e) = db.rate_address(rate_limiter_key).await { + log::error!( + "{}", + log_format!( + remote_addr.ip(), + signed_message.address, + req_uri, + "Rate incrementing failed in coin {}: {}, returning 500.", + signed_message.coin_ticker, + e + ) + ); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + }; + + Ok(()) + } + } +} + +#[tokio::test] +async fn test_parse_moralis_payload() { + use super::{parse_auth_header, X_AUTH_PAYLOAD}; + use hyper::header::HeaderName; + use hyper::Method; + + let serialized_payload = serde_json::json!({ + "coin_ticker": "BTC", + "address": "dummy-value", + "timestamp_message": 1655320000, + "signature": "dummy-value", + }) + .to_string(); + + let req = Request::builder() + .method(Method::GET) + .header(header::ACCEPT, HeaderValue::from_static(APPLICATION_JSON)) + .header( + X_AUTH_PAYLOAD, + HeaderValue::from_str(&serialized_payload).unwrap(), + ) + .body(Body::empty()) + .unwrap(); + + let (mut req, payload) = parse_auth_header(req).await.unwrap(); + + let body_bytes = hyper::body::to_bytes(req.body_mut()).await.unwrap(); + assert!( + body_bytes.is_empty(), + "Body should be empty for GET methods" + ); + + let header_value = req.headers().get(header::ACCEPT).unwrap(); + + let expected_payload = SignedMessage { + coin_ticker: String::from("BTC"), + address: String::from("dummy-value"), + timestamp_message: 1655320000, + signature: String::from("dummy-value"), + }; + + assert_eq!(payload, expected_payload); + assert_eq!(header_value, APPLICATION_JSON); + + let additional_headers = &[ + header::CONTENT_LENGTH, + HeaderName::from_bytes(X_AUTH_PAYLOAD.as_bytes()).unwrap(), + ]; + remove_hop_by_hop_headers(&mut req, additional_headers).unwrap(); +} + +#[tokio::test] +async fn test_modify_request_uri() { + use super::ProxyType; + use std::str::FromStr; + + const EXPECTED_URI: &str = "http://localhost:8000/api/v2.2/0x1f9090aaE28b8a3dCeaDf281B0F12828e676c326/nft/transfers?chain=eth&format=decimal&order=DESC"; + + let mut req = Request::builder() + .uri("https://komodo.proxy:5535/nft-test/nft/api/v2.2/0x1f9090aaE28b8a3dCeaDf281B0F12828e676c326/nft/transfers?chain=eth&format=decimal&order=DESC") + .body(Body::empty()) + .unwrap(); + let proxy_route = ProxyRoute { + inbound_route: String::from_str("/nft-test").unwrap(), + outbound_route: "http://localhost:8000".to_string(), + proxy_type: ProxyType::Moralis, + authorized: false, + allowed_rpc_methods: vec![], + rate_limiter: None, + }; + modify_request_uri(&mut req, &proxy_route).unwrap(); + assert_eq!( + req.uri(), + "http://localhost:8000/nft/api/v2.2/0x1f9090aaE28b8a3dCeaDf281B0F12828e676c326/nft/transfers?chain=eth&format=decimal&order=DESC" + ); + + let mut req = Request::builder() + .uri("https://komodo.proxy:5535/nft-test/special/api/v2.2/0x1f9090aaE28b8a3dCeaDf281B0F12828e676c326/nft/transfers?chain=eth&format=decimal&order=DESC") + .body(Body::empty()) + .unwrap(); + let proxy_route = ProxyRoute { + inbound_route: String::from_str("/nft-test/special").unwrap(), + outbound_route: "http://localhost:8000".to_string(), + proxy_type: ProxyType::Moralis, + authorized: false, + allowed_rpc_methods: vec![], + rate_limiter: None, + }; + modify_request_uri(&mut req, &proxy_route).unwrap(); + assert_eq!(req.uri(), EXPECTED_URI); + + let mut req = Request::builder() + .uri("https://komodo.proxy:5535/api/v2.2/0x1f9090aaE28b8a3dCeaDf281B0F12828e676c326/nft/transfers?chain=eth&format=decimal&order=DESC") + .body(Body::empty()) + .unwrap(); + let proxy_route = ProxyRoute { + inbound_route: String::from_str("/").unwrap(), + outbound_route: "http://localhost:8000".to_string(), + proxy_type: ProxyType::Moralis, + authorized: false, + allowed_rpc_methods: vec![], + rate_limiter: None, + }; + modify_request_uri(&mut req, &proxy_route).unwrap(); + assert_eq!(req.uri(), EXPECTED_URI); +} diff --git a/src/proxy/quicknode.rs b/src/proxy/quicknode.rs new file mode 100644 index 0000000..8945118 --- /dev/null +++ b/src/proxy/quicknode.rs @@ -0,0 +1,395 @@ +use crate::address_status::{AddressStatus, AddressStatusOperations}; +use crate::ctx::{AppConfig, ProxyRoute}; +use crate::db::Db; +use crate::http::{ + insert_jwt_to_http_header, response_by_status, APPLICATION_JSON, X_FORWARDED_FOR, +}; +use crate::proxy::remove_hop_by_hop_headers; +use crate::rate_limiter::RateLimitOperations; +use crate::rpc::Json; +use crate::sign::{SignOps, SignedMessage}; +use crate::{log_format, rpc, GenericResult}; +use hyper::header::{HeaderName, HeaderValue}; +use hyper::{header, Body, Request, Response, StatusCode, Uri}; +use hyper_tls::HttpsConnector; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use std::net::SocketAddr; + +/// Represents a payload for JSON-RPC calls, tailored for the Quicknode API within the proxy. +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] +pub(crate) struct QuicknodePayload { + pub(crate) method: String, + pub(crate) params: serde_json::value::Value, + pub(crate) id: usize, + pub(crate) jsonrpc: String, +} + +/// Used for websocket connection. +/// It combines standard JSON RPC method call fields (method, params, id, jsonrpc) with a `SignedMessage` +/// for authentication and validation, facilitating secure and validated interactions with the Quicknode service. +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] +pub(crate) struct QuicknodeSocketPayload { + pub(crate) method: String, + pub(crate) params: serde_json::value::Value, + pub(crate) id: usize, + pub(crate) jsonrpc: String, + pub(crate) signed_message: SignedMessage, +} + +impl QuicknodeSocketPayload { + pub(crate) fn into_parts(self) -> (QuicknodePayload, SignedMessage) { + let payload = QuicknodePayload { + method: self.method, + params: self.params, + id: self.id, + jsonrpc: self.jsonrpc, + }; + let signed_message = self.signed_message; + (payload, signed_message) + } +} + +#[derive(Debug)] +enum ProofOfFundingError { + InvalidSignedMessage, + InsufficientBalance, + ErrorFromRpcCall, + #[allow(dead_code)] + RpcCallFailed(String), +} + +pub(crate) async fn proxy_quicknode( + cfg: &AppConfig, + mut req: Request, + remote_addr: &SocketAddr, + payload: QuicknodePayload, + signed_message: SignedMessage, + x_forwarded_for: HeaderValue, + proxy_route: &ProxyRoute, +) -> GenericResult> { + // check if requested method allowed + if !proxy_route.allowed_rpc_methods.contains(&payload.method) { + log::warn!( + "{}", + log_format!( + remote_addr.ip(), + signed_message.address, + req.uri(), + "Method {} not allowed for, returning 403.", + payload.method + ) + ); + return response_by_status(StatusCode::FORBIDDEN); + } + + if proxy_route.authorized { + // modify outgoing request + if insert_jwt_to_http_header(cfg, req.headers_mut()) + .await + .is_err() + { + log::error!( + "{}", + log_format!( + remote_addr.ip(), + signed_message.address, + req.uri(), + "Error inserting JWT into http header, returning 500." + ) + ); + return response_by_status(StatusCode::INTERNAL_SERVER_ERROR); + } + } + + let original_req_uri = req.uri().clone(); + *req.uri_mut() = match proxy_route.outbound_route.parse() { + Ok(uri) => uri, + Err(e) => { + log::error!( + "{}", + log_format!( + remote_addr.ip(), + signed_message.address, + original_req_uri, + "Error type casting value of {} into Uri: {}, returning 500.", + proxy_route.outbound_route, + e + ) + ); + return response_by_status(StatusCode::INTERNAL_SERVER_ERROR); + } + }; + + remove_hop_by_hop_headers(&mut req, &[])?; + + req.headers_mut() + .insert(HeaderName::from_static(X_FORWARDED_FOR), x_forwarded_for); + req.headers_mut() + .insert(header::CONTENT_TYPE, APPLICATION_JSON.parse()?); + + let https = HttpsConnector::new(); + let client = hyper::Client::builder().build(https); + + let target_uri = req.uri().clone(); + let res = match client.request(req).await { + Ok(t) => t, + Err(e) => { + log::warn!( + "{}", + log_format!( + remote_addr.ip(), + signed_message.address, + original_req_uri, + "Couldn't reach {}: {}. Returning 503.", + target_uri, + e + ) + ); + return response_by_status(StatusCode::SERVICE_UNAVAILABLE); + } + }; + + Ok(res) +} + +pub(crate) async fn validation_middleware_quicknode( + cfg: &AppConfig, + signed_message: &SignedMessage, + proxy_route: &ProxyRoute, + req_uri: &Uri, + remote_addr: &SocketAddr, +) -> Result<(), StatusCode> { + let mut db = Db::create_instance(cfg).await; + + match db.read_address_status(&signed_message.address).await { + AddressStatus::Trusted => Ok(()), + AddressStatus::Blocked => Err(StatusCode::FORBIDDEN), + AddressStatus::None => { + let signed_message_status = + verify_message_and_balance(cfg, signed_message, proxy_route).await; + + if let Err(ProofOfFundingError::InvalidSignedMessage) = signed_message_status { + log::warn!( + "{}", + log_format!( + remote_addr.ip(), + signed_message.address, + req_uri, + "Request has invalid signed message, returning 401" + ) + ); + + return Err(StatusCode::UNAUTHORIZED); + }; + + let rate_limiter_key = + format!("{}:{}", signed_message.coin_ticker, signed_message.address); + + let rate_limiter = proxy_route + .rate_limiter + .as_ref() + .unwrap_or(&cfg.rate_limiter); + match db.rate_exceeded(&rate_limiter_key, rate_limiter).await { + Ok(false) => {} + _ => { + log::warn!( + "{}", + log_format!( + remote_addr.ip(), + signed_message.address, + req_uri, + "Rate exceed for {}, checking balance for {} address.", + rate_limiter_key, + signed_message.address + ) + ); + + match verify_message_and_balance(cfg, signed_message, proxy_route).await { + Ok(_) => {} + Err(ProofOfFundingError::InsufficientBalance) => { + log::warn!( + "{}", + log_format!( + remote_addr.ip(), + signed_message.address, + req_uri, + "Wallet {} has insufficient balance for coin {}, returning 406.", + signed_message.address, + signed_message.coin_ticker, + ) + ); + + return Err(StatusCode::NOT_ACCEPTABLE); + } + e => { + log::error!( + "{}", + log_format!( + remote_addr.ip(), + signed_message.address, + req_uri, + "verify_message_and_balance failed in coin {}: {:?}", + signed_message.coin_ticker, + e + ) + ); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } + } + } + }; + + if db.rate_address(rate_limiter_key).await.is_err() { + log::error!( + "{}", + log_format!( + remote_addr.ip(), + signed_message.address, + req_uri, + "Rate incrementing failed." + ) + ); + }; + + Ok(()) + } + } +} + +async fn verify_message_and_balance( + cfg: &AppConfig, + signed_message: &SignedMessage, + proxy_route: &ProxyRoute, +) -> Result<(), ProofOfFundingError> { + if let Ok(true) = signed_message.verify_message() { + let mut db = Db::create_instance(cfg).await; + + // We don't want to send balance requests everytime when user sends requests. + if let Ok(true) = db.key_exists(&signed_message.address).await { + return Ok(()); + } + + let rpc_payload = json!({ + "id": 1, + "jsonrpc": "2.0", + "method": "eth_getBalance", + "params": [signed_message.address, "latest"] + }); + + let rpc_client = + // TODO: Use the current transport instead of forcing to use http (even if it's rare, this might not work on certain nodes) + rpc::RpcClient::new(proxy_route.outbound_route.replace("ws", "http").clone()); + + match rpc_client + .send(cfg, rpc_payload, proxy_route.authorized) + .await + { + Ok(res) if res["result"] != Json::Null && res["result"] != "0x0" => { + // cache this address for 60 seconds + let _ = db.insert_cache(&signed_message.address, "", 60).await; + + return Ok(()); + } + Ok(res) if res["error"] != Json::Null => { + return Err(ProofOfFundingError::ErrorFromRpcCall); + } + Ok(_) => return Err(ProofOfFundingError::InsufficientBalance), + Err(e) => return Err(ProofOfFundingError::RpcCallFailed(e.to_string())), + }; + } + + Err(ProofOfFundingError::InvalidSignedMessage) +} + +#[test] +fn test_quicknode_payload_serialzation_and_deserialization() { + let json_payload = json!({ + "method": "dummy-value", + "params": [], + "id": 1, + "jsonrpc": "2.0" + }); + + let actual_payload: QuicknodePayload = serde_json::from_str(&json_payload.to_string()).unwrap(); + + let expected_payload = QuicknodePayload { + method: String::from("dummy-value"), + params: json!([]), + id: 1, + jsonrpc: String::from("2.0"), + }; + + assert_eq!(actual_payload, expected_payload); + + // Backwards + let json = serde_json::to_value(expected_payload).unwrap(); + assert_eq!(json_payload, json); + assert_eq!(json_payload.to_string(), json.to_string()); +} + +#[tokio::test] +async fn test_parse_quicknode_payload() { + use super::{parse_body_and_auth_header, X_AUTH_PAYLOAD}; + use hyper::Method; + + let serialized_payload = json!({ + "method": "dummy-value", + "params": [], + "id": 1, + "jsonrpc": "2.0" + }) + .to_string(); + + let serialized_auth_value = json!({ + "coin_ticker": "ETH", + "address": "dummy-value", + "timestamp_message": 1655319963, + "signature": "dummy-value", + }) + .to_string(); + + let mut req = Request::builder() + .method(Method::POST) + .header( + X_AUTH_PAYLOAD, + HeaderValue::from_str(&serialized_auth_value).unwrap(), + ) + .body(Body::from(serialized_payload)) + .unwrap(); + req.headers_mut().insert( + HeaderName::from_static("dummy-header"), + "dummy-value".parse().unwrap(), + ); + + let (mut req, payload, signed_message) = parse_body_and_auth_header::(req) + .await + .unwrap(); + + let body_bytes = hyper::body::to_bytes(req.body_mut()).await.unwrap(); + assert!( + !body_bytes.is_empty(), + "Body should not be empty for non-GET methods" + ); + + let dummy_header_value = req.headers().get("dummy-header").unwrap(); + assert_eq!(dummy_header_value, "dummy-value"); + + let expected_payload = QuicknodePayload { + method: String::from("dummy-value"), + params: json!([]), + id: 1, + jsonrpc: String::from("2.0"), + }; + assert_eq!(payload, expected_payload); + + let expected_auth_value = SignedMessage { + coin_ticker: String::from("ETH"), + address: String::from("dummy-value"), + timestamp_message: 1655319963, + signature: String::from("dummy-value"), + }; + assert_eq!(signed_message, expected_auth_value); + + remove_hop_by_hop_headers(&mut req, &[]).unwrap(); +} diff --git a/src/security/address_status.rs b/src/security/address_status.rs index be29afd..b9dd8c7 100644 --- a/src/security/address_status.rs +++ b/src/security/address_status.rs @@ -2,6 +2,7 @@ use async_trait::async_trait; use bytes::Buf; use ctx::AppConfig; use db::Db; +use http::APPLICATION_JSON; use hyper::{header, Body, Request, Response, StatusCode}; use redis::FromRedisValue; use serde::{Deserialize, Serialize}; @@ -28,7 +29,7 @@ pub(crate) async fn post_address_status( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .header(header::CONTENT_TYPE, "application/json") + .header(header::CONTENT_TYPE, APPLICATION_JSON) .body(Body::from(Vec::new()))?) } @@ -47,7 +48,7 @@ pub(crate) async fn get_address_status_list(cfg: &AppConfig) -> GenericResult(&mut self.connection) .await?) } @@ -118,7 +121,7 @@ impl AddressStatusOperations for Db { .map(|v| (v.address.clone(), v.status)) .collect(); pipe.hset_multiple(DB_STATUS_LIST, &formatted); - pipe.query_async(&mut self.connection).await?; + pipe.query_async::<_, ()>(&mut self.connection).await?; Ok(()) } diff --git a/src/security/jwt.rs b/src/security/jwt.rs index 6978267..3d67909 100644 --- a/src/security/jwt.rs +++ b/src/security/jwt.rs @@ -38,6 +38,7 @@ impl JwtClaims { } static AUTH_DECODING_KEY: OnceCell = OnceCell::new(); + #[allow(dead_code)] pub(crate) fn get_decoding_key(cfg: &AppConfig) -> &'static DecodingKey { let buffer_closure = || -> Vec { read_file_buffer(&cfg.pubkey_path) }; @@ -102,7 +103,7 @@ pub(crate) async fn generate_jwt_and_cache_it( .arg("EX") .arg(cfg.token_expiration_time() - 60) // expire 60 seconds before token's expiration .arg("NX") - .query_async(&mut conn) + .query_async::<_, ()>(&mut conn) .await?; Ok(token) diff --git a/src/security/proof_of_funding.rs b/src/security/proof_of_funding.rs deleted file mode 100644 index fde8c25..0000000 --- a/src/security/proof_of_funding.rs +++ /dev/null @@ -1,64 +0,0 @@ -use ctx::{AppConfig, ProxyRoute}; -use db::Db; -use http::RpcPayload; -use rpc::Json; -use serde_json::json; -use sign::SignOps; - -use super::*; - -#[derive(Debug)] -pub(crate) enum ProofOfFundingError { - InvalidSignedMessage, - InsufficientBalance, - ErrorFromRpcCall, - #[allow(dead_code)] - RpcCallFailed(String), -} - -pub(crate) async fn verify_message_and_balance( - cfg: &AppConfig, - payload: &RpcPayload, - proxy_route: &ProxyRoute, -) -> Result<(), ProofOfFundingError> { - if let Ok(true) = payload.signed_message.verify_message() { - let mut db = Db::create_instance(cfg).await; - - // We don't want to send balance requests everytime when user sends requests. - if let Ok(true) = db.key_exists(&payload.signed_message.address).await { - return Ok(()); - } - - let rpc_payload = json!({ - "id": 1, - "jsonrpc": "2.0", - "method": "eth_getBalance", - "params": [payload.signed_message.address, "latest"] - }); - - let rpc_client = - // TODO: Use the current transport instead of forcing to use http (even if it's rare, this might not work on certain nodes) - rpc::RpcClient::new(proxy_route.outbound_route.replace("ws", "http").clone()); - - match rpc_client - .send(cfg, rpc_payload, proxy_route.authorized) - .await - { - Ok(res) if res["result"] != Json::Null && res["result"] != "0x0" => { - // cache this address for 60 seconds - let _ = db - .insert_cache(&payload.signed_message.address, "", 60) - .await; - - return Ok(()); - } - Ok(res) if res["error"] != Json::Null => { - return Err(ProofOfFundingError::ErrorFromRpcCall); - } - Ok(_) => return Err(ProofOfFundingError::InsufficientBalance), - Err(e) => return Err(ProofOfFundingError::RpcCallFailed(e.to_string())), - }; - } - - Err(ProofOfFundingError::InvalidSignedMessage) -} diff --git a/src/security/rate_limiter.rs b/src/security/rate_limiter.rs index 653304d..ba96f4c 100644 --- a/src/security/rate_limiter.rs +++ b/src/security/rate_limiter.rs @@ -13,13 +13,13 @@ pub(crate) const DB_RP_60_MIN: &str = "rp:60_min"; #[async_trait] pub(crate) trait RateLimitOperations { - async fn upsert_address_rate_in_pipe( + fn upsert_address_rate_in_pipe( &mut self, pipe: &mut Pipeline, db: &str, address: &str, expire_time: usize, - ) -> GenericResult<()>; + ); async fn rate_address(&mut self, address: String) -> GenericResult<()>; async fn did_exceed_in_single_time_frame( &mut self, @@ -36,52 +36,30 @@ pub(crate) trait RateLimitOperations { #[async_trait] impl RateLimitOperations for Db { - async fn upsert_address_rate_in_pipe( + fn upsert_address_rate_in_pipe( &mut self, pipe: &mut Pipeline, db: &str, address: &str, expire_time: usize, - ) -> GenericResult<()> { - if !self.key_exists(db).await? { - pipe.hset(db, address, "1") - .cmd("EXPIRE") - .arg(db) - .arg(expire_time) - .arg("XX") - .query_async(&mut self.connection) - .await?; - } else { - pipe.cmd("HINCRBY") - .arg(db) - .arg(&[address, "1"]) - .cmd("EXPIRE") - .arg(db) - .arg(expire_time) - .arg("XX") - .query_async(&mut self.connection) - .await?; - } - - Ok(()) + ) { + // Atomic operation, which means it increments the value safely even when multiple clients are modifying the counter simultaneously. + pipe.atomic() + .hincr(db, address, 1) // Increment the hash value or create it if it doesn't exist + .expire(db, expire_time); // Set or reset the expiration time } /// semi-lazy IP rate implementation for 5 different time frames. async fn rate_address(&mut self, address: String) -> GenericResult<()> { let mut pipe = redis::pipe(); - self.upsert_address_rate_in_pipe(&mut pipe, DB_RP_1_MIN, &address, 60) - .await?; - self.upsert_address_rate_in_pipe(&mut pipe, DB_RP_5_MIN, &address, 300) - .await?; - self.upsert_address_rate_in_pipe(&mut pipe, DB_RP_15_MIN, &address, 900) - .await?; - self.upsert_address_rate_in_pipe(&mut pipe, DB_RP_30_MIN, &address, 1800) - .await?; - self.upsert_address_rate_in_pipe(&mut pipe, DB_RP_60_MIN, &address, 3600) - .await?; - - pipe.query_async(&mut self.connection).await?; + self.upsert_address_rate_in_pipe(&mut pipe, DB_RP_1_MIN, &address, 60); + self.upsert_address_rate_in_pipe(&mut pipe, DB_RP_5_MIN, &address, 300); + self.upsert_address_rate_in_pipe(&mut pipe, DB_RP_15_MIN, &address, 900); + self.upsert_address_rate_in_pipe(&mut pipe, DB_RP_30_MIN, &address, 1800); + self.upsert_address_rate_in_pipe(&mut pipe, DB_RP_60_MIN, &address, 3600); + // Execute the pipeline once after setting all commands + pipe.query_async::<_, ()>(&mut self.connection).await?; Ok(()) } diff --git a/src/security/sign.rs b/src/security/sign.rs index 76d79bb..33a0945 100644 --- a/src/security/sign.rs +++ b/src/security/sign.rs @@ -16,11 +16,17 @@ pub(crate) trait SignOps { fn is_valid_checksum_addr(&self) -> bool; fn valid_addr_from_str(&self) -> Result; fn addr_from_str(&self) -> Result; + #[allow(dead_code)] fn sign_message(&mut self, secret: &Secret) -> GenericResult<()>; fn verify_message(&self) -> GenericResult; } -#[derive(Debug, Serialize, Deserialize, PartialEq)] +/// Represents a signed message used for authenticating and validating requests processed by the proxy. +/// +/// This structure contains cryptographic elements (`signature`) and metadata (`coin_ticker`, `address`, `timestamp_message`) +/// that are used to verify the authenticity and integrity of a request to the proxy. This is essential for securing +/// the proxy operations and preventing unauthorized access. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub(crate) struct SignedMessage { pub(crate) coin_ticker: String, pub(crate) address: String, @@ -99,6 +105,7 @@ impl SignOps for SignedMessage { Address::from_str(&self.address[2..]).map_err(|e| e.to_string()) } + #[allow(dead_code)] fn sign_message(&mut self, secret: &Secret) -> GenericResult<()> { let signature = sign(secret, &H256::from(self.sign_message_hash()))?; self.signature = format!("0x{}", signature);