From dd12dbb5483f531ec797160dd1cf290b2750991e Mon Sep 17 00:00:00 2001 From: code-ga Date: Wed, 20 Dec 2023 14:14:55 +0700 Subject: [PATCH] no longer need to handle redis pubsub message Moving some event to private api #41 - untested --- server/cdn-service/src/main.rs | 4 - server/cdn-service/src/route/mod.rs | 3 +- server/cdn-service/src/types.rs | 8 - server/cdn-service/src/util/mod.rs | 18 +- server/cdn-service/src/ws/mod.rs | 303 ---------------------------- server/comic/util/genUploadToken.go | 2 +- 6 files changed, 3 insertions(+), 335 deletions(-) delete mode 100644 server/cdn-service/src/ws/mod.rs diff --git a/server/cdn-service/src/main.rs b/server/cdn-service/src/main.rs index 89cc8324..7d9b7b8d 100644 --- a/server/cdn-service/src/main.rs +++ b/server/cdn-service/src/main.rs @@ -3,13 +3,11 @@ mod types; mod upload_images; mod util; -mod ws; use dotenv::dotenv; use salvo::prelude::TcpListener; use salvo::{Listener, Server}; -use crate::ws::handle_socket_message; mod route; #[tokio::main] @@ -24,8 +22,6 @@ async fn main() { dbg!(&redis_client.get_connection_info().addr.to_string()); - handle_socket_message(redis_client.clone()).await; - println!("Server started on port 3000 🚀"); let acceptor = TcpListener::new("0.0.0.0:3000").bind().await; Server::new(acceptor) diff --git a/server/cdn-service/src/route/mod.rs b/server/cdn-service/src/route/mod.rs index 867d31b0..149e6457 100644 --- a/server/cdn-service/src/route/mod.rs +++ b/server/cdn-service/src/route/mod.rs @@ -18,7 +18,7 @@ pub fn route(redis: RedisClient) -> salvo::Router { redis: redis.clone(), })); - router = router.push(Router::with_path("/private/gen_token").post(GenToken { + router = router.push(Router::with_path("/gen_token").post(GenToken { redis: redis.clone(), })); @@ -99,7 +99,6 @@ impl salvo::Handler for GenToken { } res.render(Json(sender_data.unwrap())); } - } struct GetImageData { diff --git a/server/cdn-service/src/types.rs b/server/cdn-service/src/types.rs index a8b2c8db..870dce9b 100644 --- a/server/cdn-service/src/types.rs +++ b/server/cdn-service/src/types.rs @@ -69,13 +69,5 @@ impl FromRedisValue for TokenStorageTableNode { }) } } -pub enum WsError { - DecodePayloadError, -} -pub struct SendWsErrorMetaInput { - pub from: String, - pub url: String, - pub id: String, -} pub type RedisClient = redis::Client; diff --git a/server/cdn-service/src/util/mod.rs b/server/cdn-service/src/util/mod.rs index b14c0293..bc25386a 100644 --- a/server/cdn-service/src/util/mod.rs +++ b/server/cdn-service/src/util/mod.rs @@ -1,4 +1,4 @@ -use crate::types::{RedisClient, SendWsErrorMetaInput, SenderData, TokenStorageTableNode, WsError}; +use crate::types::{RedisClient, SenderData, TokenStorageTableNode}; use redis::{AsyncCommands, JsonAsyncCommands}; use serde_json::json; use tokio_stream::StreamExt; @@ -61,22 +61,6 @@ pub async fn send_uploaded_message( return false; } -pub async fn send_ws_error(error: WsError, meta: SendWsErrorMetaInput, redis: &RedisClient) { - let sender_data = SenderData { - url: meta.from, - message_type: "rep".to_string(), - from: meta.url, - header: serde_json::Value::Null, - payload: serde_json::Value::Null, - error: serde_json::Value::String(match error { - WsError::DecodePayloadError => "decode payload error".to_string(), - }), - id: meta.id.to_string(), - }; - - let _ = send_service_message(redis, &sender_data, false).await; -} - pub fn gen_token(uuid: String) -> String { uuid } diff --git a/server/cdn-service/src/ws/mod.rs b/server/cdn-service/src/ws/mod.rs deleted file mode 100644 index 3ed90d05..00000000 --- a/server/cdn-service/src/ws/mod.rs +++ /dev/null @@ -1,303 +0,0 @@ -use redis::JsonAsyncCommands; -use serde_json::json; -use tokio_stream::StreamExt; - -use crate::{ - types::{ - self, GetImageMessageType, RedisClient, SendWsErrorMetaInput, SenderData, - TokenStorageTableNode, WsError, WsRequestMessage, - }, - util::{gen_token, get_image_url, get_redis_key, send_service_message, send_ws_error}, -}; -pub async fn handle_socket_message(redis: RedisClient) { - let redis_conn = redis.get_tokio_connection().await.unwrap(); - println!("Connected to Redis"); - let mut pubsub = redis_conn.into_pubsub(); - pubsub - .subscribe("upload_token_registry/genToken") - .await - .unwrap(); - pubsub.subscribe("cdn_service/cdn_get_image").await.unwrap(); - let _ = tokio::spawn(async move { - loop { - // let msg = socket - // .clone() - // .lock() - // .await - // .read() - // .expect("Error reading message"); - // if let Message::Text(text) = msg { - // let json_data = match serde_json::from_str::(&text) { - // Err(e) => { - // dbg!(&e); - // continue; - // } - // Ok(d) => d, - // }; - // if !json_data.message_type.eq("message") { - // continue; - // } - // if json_data.url.eq("upload_token_registry/genToken") { - // handle_gen_token(json_data, redis.clone()).await; - // } else if json_data.url.eq("cdn_service/cdn_get_image") { - // handle_cdn_get_image(json_data, socket.clone()).await; - // } - // } - let msg = pubsub.on_message().next().await.unwrap(); - let json_data = match serde_json::from_str::( - &msg.get_payload::().unwrap().as_str(), - ) { - Err(e) => { - dbg!(&e); - continue; - } - Ok(d) => d, - }; - if !json_data.message_type.eq("message") { - continue; - } - if json_data.url.eq("upload_token_registry/genToken") { - handle_gen_token(json_data, redis.clone()).await; - } else if json_data.url.eq("cdn_service/cdn_get_image") { - handle_cdn_get_image(json_data, redis.clone()).await; - } - } - }) - .await; -} - -async fn handle_gen_token(json_data: SenderData, redis: RedisClient) { - let mut sender_data = None; - if let serde_json::Value::Array(a) = json_data.payload { - let mut tokens = vec![]; - for v in a { - let (id, data, emit_to, event_name) = ( - v.get("id").unwrap().as_str().unwrap(), - v.get("data").unwrap(), - v.get("emit_to").unwrap().as_str().unwrap(), - v.get("event_name").unwrap().as_str().unwrap(), - ); - let token = gen_token(id.to_string()); - - - redis - .get_tokio_connection() - .await - .unwrap() - .json_set::( - get_redis_key(token.to_string()), - "$".to_string(), - &TokenStorageTableNode { - data: data.clone(), - emit_to: emit_to.to_string(), - event_name: event_name.to_string(), - }, - ); - tokens.push(event_name.to_string()); - tokens.push(token.clone()); - } - sender_data = Some(SenderData { - url: json_data.from, - message_type: "rep".to_string(), - from: json_data.url, - header: serde_json::Value::Null, - payload: serde_json::json! {{ - "id":json_data.id.to_string(), - "token":tokens.clone() - }}, - error: (serde_json::Value::Null), - id: json_data.id.to_string(), - }); - } else if let serde_json::Value::Object(o) = json_data.payload { - let (id, data, emit_to, event_name) = ( - o.get("id").unwrap().as_str().unwrap(), - o.get("data").unwrap(), - o.get("emit_to").unwrap().as_str().unwrap(), - o.get("event_name").unwrap().as_str().unwrap(), - ); - let token = gen_token(id.to_string()); - redis - .get_tokio_connection() - .await - .unwrap() - .json_set::( - get_redis_key(token.to_string()), - "$".to_string(), - &TokenStorageTableNode { - data: data.clone(), - emit_to: emit_to.to_string(), - event_name: event_name.to_string(), - }, - ); - sender_data = Some(SenderData { - url: json_data.from, - message_type: "rep".to_string(), - from: json_data.url, - header: serde_json::Value::Null, - payload: serde_json::json! {{ - "id":id.to_string(), - "token":token.clone() - }}, - error: (serde_json::Value::Null), - id: id.to_string(), - }); - } - // match socket - // .clone() - // .lock() - // .await - // .write_message(Message::Text(serde_json::to_string(&sender_data).unwrap())) - // { - // Ok(_) => {} - // Err(e) => { - // dbg!(&e); - // return; - // } - // }; - - if sender_data.is_none() { - return; - } - let sender_data = sender_data.unwrap(); - match send_service_message(&redis, &sender_data, false).await { - Ok(_) => {} - Err(e) => { - dbg!(&e); - return; - } - }; -} - -async fn handle_cdn_get_image(json_data: SenderData, redis: RedisClient) { - let payload = match serde_json::from_str::(&json_data.payload.to_string()) { - Ok(v) => v, - Err(_) => { - send_ws_error( - WsError::DecodePayloadError, - SendWsErrorMetaInput { - from: json_data.url.clone(), - url: json_data.from.clone(), - id: json_data.id.clone(), - }, - &redis, - ) - .await; - return; - } - }; - let message_string = match std::str::from_utf8(payload.message.clone().as_slice()) { - Ok(s) => s, - _ => { - send_ws_error( - WsError::DecodePayloadError, - SendWsErrorMetaInput { - from: json_data.url.clone(), - url: json_data.from.clone(), - id: json_data.id.clone(), - }, - &redis, - ) - .await; - - return; - } - } - .to_string(); - // decode message to the json have one field url - let image_id = match serde_json::from_str::(&message_string) { - Ok(d) => d, - Err(_) => { - send_ws_error( - WsError::DecodePayloadError, - SendWsErrorMetaInput { - from: json_data.url.clone(), - url: json_data.from.clone(), - id: json_data.id.clone(), - }, - &redis, - ) - .await; - - return; - } - } - .id; - let image_url = get_image_url(image_id.clone()); - let message_send_client = &SenderData { - id: json_data.id.clone(), - from: json_data.url.clone(), - url: json_data.from.clone(), - payload: json! {{ - "message":json!({ - "image_url": image_url.clone(), - "image_id" : image_id.clone(), - }).to_string().as_bytes().to_vec() - }}, - error: serde_json::Value::Null, - header: serde_json::Value::Null, - message_type: "rep".to_string(), - }; - // match socket - // .clone() - // .lock() - // .await - // .write_message(Message::Text(message_send_client)) - // { - // Ok(_) => {} - // Err(e) => { - // dbg!(&e); - // return; - // } - // }; - - match send_service_message(&redis, &message_send_client, false).await { - Ok(_) => {} - Err(e) => { - dbg!(&e); - send_ws_error( - WsError::DecodePayloadError, - SendWsErrorMetaInput { - from: json_data.url.clone(), - url: json_data.from.clone(), - id: json_data.id.clone(), - }, - &redis, - ) - .await - } - }; - - // boastcast to all service with url and 2 payload - let message_send_other = &SenderData { - id: json_data.id.clone(), - from: json_data.url.clone(), - url: "all/client_get_cdn_image".to_string(), - payload: json! {{ - "request_id": payload.request_id, - "headers" : payload.headers, - "image_id":image_id.clone(), - "message": payload.message.clone(), - "image_url":image_url.clone() - }}, - error: serde_json::Value::Null, - header: json_data.header.clone(), - message_type: "rep".to_string(), - }; - - match send_service_message(&redis, &message_send_other, false).await { - Ok(_) => {} - Err(e) => { - dbg!(&e); - send_ws_error( - WsError::DecodePayloadError, - SendWsErrorMetaInput { - from: json_data.url.clone(), - url: json_data.from.clone(), - id: json_data.id.clone(), - }, - &redis, - ) - .await; - } - }; -} diff --git a/server/comic/util/genUploadToken.go b/server/comic/util/genUploadToken.go index 6b46bba3..76bd1e1d 100644 --- a/server/comic/util/genUploadToken.go +++ b/server/comic/util/genUploadToken.go @@ -8,7 +8,7 @@ import ( "github.com/Folody-Team/Shartube/LocalTypes" ) -var genTokenUrl = "http://shartube-upload-server:3000/private/gen_token" +var genTokenUrl = "http://shartube-upload-server:3000/gen_token" type GenSingleUploadTokenPayload[T any] struct { ID string `json:"id"`