Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rpc with error #481

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ indexmap = "2.2"
spin = "0.9"
httpmock = "0.7"
test-log = "0.2"
anyhow = "1.0"
1 change: 1 addition & 0 deletions bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ hex = { version = "0.4", optional = true }
mime_guess = { version = "2.0", optional = true }
reqwest = { version = "0.12", features = ["json"]}
sentry = "0.34"
anyhow = { workspace = true }

[features]
default = ["console", "gateway", "media", "connector", "standalone", "cert_utils"]
Expand Down
24 changes: 12 additions & 12 deletions bin/src/http/api_console/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
#[oai(path = "/:node/log/rooms", method = "get")]
async fn rooms(&self, _auth: ConsoleAuthorization, Data(ctx): Data<&ConsoleApisCtx>, Path(node): Path<u32>, Query(page): Query<u32>, Query(limit): Query<u32>) -> Json<Response<Vec<RoomInfo>>> {
match ctx.connector.rooms(node_vnet_addr(node, CONNECTOR_RPC_PORT), GetParams { page, limit }).await {
Some(res) => Json(Response {
Ok(res) => Json(Response {

Check warning on line 80 in bin/src/http/api_console/connector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/http/api_console/connector.rs#L80

Added line #L80 was not covered by tests
status: true,
data: Some(
res.rooms
Expand All @@ -98,9 +98,9 @@
}),
..Default::default()
}),
None => Json(Response {
Err(e) => Json(Response {

Check warning on line 101 in bin/src/http/api_console/connector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/http/api_console/connector.rs#L101

Added line #L101 was not covered by tests
status: false,
error: Some("CLUSTER_ERROR".to_string()),
error: Some(e.to_string()),

Check warning on line 103 in bin/src/http/api_console/connector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/http/api_console/connector.rs#L103

Added line #L103 was not covered by tests
..Default::default()
}),
}
Expand All @@ -118,7 +118,7 @@
Query(limit): Query<u32>,
) -> Json<Response<Vec<PeerInfo>>> {
match ctx.connector.peers(node_vnet_addr(node, CONNECTOR_RPC_PORT), GetPeerParams { room, page, limit }).await {
Some(res) => Json(Response {
Ok(res) => Json(Response {

Check warning on line 121 in bin/src/http/api_console/connector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/http/api_console/connector.rs#L121

Added line #L121 was not covered by tests
status: true,
data: Some(
res.peers
Expand Down Expand Up @@ -151,9 +151,9 @@
}),
..Default::default()
}),
None => Json(Response {
Err(e) => Json(Response {

Check warning on line 154 in bin/src/http/api_console/connector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/http/api_console/connector.rs#L154

Added line #L154 was not covered by tests
status: false,
error: Some("CLUSTER_ERROR".to_string()),
error: Some(e.to_string()),

Check warning on line 156 in bin/src/http/api_console/connector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/http/api_console/connector.rs#L156

Added line #L156 was not covered by tests
..Default::default()
}),
}
Expand All @@ -170,7 +170,7 @@
Query(limit): Query<u32>,
) -> Json<Response<Vec<SessionInfo>>> {
match ctx.connector.sessions(node_vnet_addr(node, CONNECTOR_RPC_PORT), GetParams { page, limit }).await {
Some(res) => Json(Response {
Ok(res) => Json(Response {

Check warning on line 173 in bin/src/http/api_console/connector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/http/api_console/connector.rs#L173

Added line #L173 was not covered by tests
status: true,
data: Some(
res.sessions
Expand Down Expand Up @@ -204,9 +204,9 @@
}),
..Default::default()
}),
None => Json(Response {
Err(e) => Json(Response {

Check warning on line 207 in bin/src/http/api_console/connector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/http/api_console/connector.rs#L207

Added line #L207 was not covered by tests
status: false,
error: Some("CLUSTER_ERROR".to_string()),
error: Some(e.to_string()),

Check warning on line 209 in bin/src/http/api_console/connector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/http/api_console/connector.rs#L209

Added line #L209 was not covered by tests
..Default::default()
}),
}
Expand Down Expand Up @@ -240,7 +240,7 @@
)
.await
{
Some(res) => Json(Response {
Ok(res) => Json(Response {

Check warning on line 243 in bin/src/http/api_console/connector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/http/api_console/connector.rs#L243

Added line #L243 was not covered by tests
status: true,
data: Some(
res.events
Expand All @@ -262,9 +262,9 @@
}),
..Default::default()
}),
None => Json(Response {
Err(e) => Json(Response {

Check warning on line 265 in bin/src/http/api_console/connector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/http/api_console/connector.rs#L265

Added line #L265 was not covered by tests
status: false,
error: Some("CLUSTER_ERROR".to_string()),
error: Some(e.to_string()),

Check warning on line 267 in bin/src/http/api_console/connector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/http/api_console/connector.rs#L267

Added line #L267 was not covered by tests
..Default::default()
}),
}
Expand Down
25 changes: 13 additions & 12 deletions bin/src/server/connector/remote_rpc_handler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use anyhow::{anyhow, Result};
use media_server_connector::Querier;
use media_server_protocol::protobuf::cluster_connector::{
get_events::EventInfo, get_peers::PeerInfo, get_rooms::RoomInfo, get_sessions::SessionInfo, GetEventParams, GetEvents, GetParams, GetPeerParams, GetPeers, GetRooms, GetSessions,
Expand All @@ -16,13 +17,13 @@
pub struct ConnectorRemoteRpcHandlerImpl {}

impl MediaConnectorServiceHandler<Ctx> for ConnectorRemoteRpcHandlerImpl {
async fn rooms(&self, ctx: &Ctx, req: GetParams) -> Option<GetRooms> {
async fn rooms(&self, ctx: &Ctx, req: GetParams) -> Result<GetRooms> {

Check warning on line 20 in bin/src/server/connector/remote_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/connector/remote_rpc_handler.rs#L20

Added line #L20 was not covered by tests
log::info!("[ConnectorRemoteRpcHandler] on get rooms {req:?}");
let response = match ctx.storage.rooms(req.page as usize, req.limit as usize).await {
Ok(res) => res,
Err(err) => {
log::error!("[ConnectorRemoteRpcHandler] on get rooms error {err}");
return None;
return Err(anyhow!("{err}"));

Check warning on line 26 in bin/src/server/connector/remote_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/connector/remote_rpc_handler.rs#L26

Added line #L26 was not covered by tests
}
};
log::info!("[ConnectorRemoteRpcHandler] on got {} rooms", response.data.len());
Expand All @@ -40,7 +41,7 @@
})
.collect::<Vec<_>>();

Some(GetRooms {
Ok(GetRooms {

Check warning on line 44 in bin/src/server/connector/remote_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/connector/remote_rpc_handler.rs#L44

Added line #L44 was not covered by tests
rooms,
pagination: Some(Pagination {
total: response.total as u32,
Expand All @@ -49,13 +50,13 @@
})
}

async fn peers(&self, ctx: &Ctx, req: GetPeerParams) -> Option<GetPeers> {
async fn peers(&self, ctx: &Ctx, req: GetPeerParams) -> Result<GetPeers> {

Check warning on line 53 in bin/src/server/connector/remote_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/connector/remote_rpc_handler.rs#L53

Added line #L53 was not covered by tests
log::info!("[ConnectorRemoteRpcHandler] on get peers page {req:?}");
let response = match ctx.storage.peers(req.room, req.page as usize, req.limit as usize).await {
Ok(res) => res,
Err(err) => {
log::error!("[ConnectorRemoteRpcHandler] on get peers error {err}");
return None;
return Err(anyhow!("{err}"));

Check warning on line 59 in bin/src/server/connector/remote_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/connector/remote_rpc_handler.rs#L59

Added line #L59 was not covered by tests
}
};
log::info!("[ConnectorRemoteRpcHandler] on got {} peers", response.data.len());
Expand Down Expand Up @@ -85,7 +86,7 @@
})
.collect::<Vec<_>>();

Some(GetPeers {
Ok(GetPeers {

Check warning on line 89 in bin/src/server/connector/remote_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/connector/remote_rpc_handler.rs#L89

Added line #L89 was not covered by tests
peers,
pagination: Some(Pagination {
total: response.total as u32,
Expand All @@ -94,13 +95,13 @@
})
}

async fn sessions(&self, ctx: &Ctx, req: GetParams) -> Option<GetSessions> {
async fn sessions(&self, ctx: &Ctx, req: GetParams) -> Result<GetSessions> {

Check warning on line 98 in bin/src/server/connector/remote_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/connector/remote_rpc_handler.rs#L98

Added line #L98 was not covered by tests
log::info!("[ConnectorRemoteRpcHandler] on get sessions page {req:?}");
let response = match ctx.storage.sessions(req.page as usize, req.limit as usize).await {
Ok(res) => res,
Err(err) => {
log::error!("[ConnectorRemoteRpcHandler] on get sessions error {err}");
return None;
return Err(anyhow!("{err}"));

Check warning on line 104 in bin/src/server/connector/remote_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/connector/remote_rpc_handler.rs#L104

Added line #L104 was not covered by tests
}
};
log::info!("[ConnectorRemoteRpcHandler] on got {} sessions", response.data.len());
Expand Down Expand Up @@ -130,7 +131,7 @@
.collect::<Vec<_>>(),
})
.collect::<Vec<_>>();
Some(GetSessions {
Ok(GetSessions {

Check warning on line 134 in bin/src/server/connector/remote_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/connector/remote_rpc_handler.rs#L134

Added line #L134 was not covered by tests
sessions,
pagination: Some(Pagination {
total: response.total as u32,
Expand All @@ -139,13 +140,13 @@
})
}

async fn events(&self, ctx: &Ctx, req: GetEventParams) -> Option<GetEvents> {
async fn events(&self, ctx: &Ctx, req: GetEventParams) -> Result<GetEvents> {

Check warning on line 143 in bin/src/server/connector/remote_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/connector/remote_rpc_handler.rs#L143

Added line #L143 was not covered by tests
log::info!("[ConnectorRemoteRpcHandler] on get events page {req:?}");
let response = match ctx.storage.events(req.session, req.start_ts, req.end_ts, req.page as usize, req.limit as usize).await {
Ok(res) => res,
Err(err) => {
log::error!("[ConnectorRemoteRpcHandler] on get events error {err}");
return None;
return Err(anyhow!("{err}"));

Check warning on line 149 in bin/src/server/connector/remote_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/connector/remote_rpc_handler.rs#L149

Added line #L149 was not covered by tests
}
};
log::info!("[ConnectorRemoteRpcHandler] on got {} events", response.data.len());
Expand All @@ -163,7 +164,7 @@
meta: e.meta.map(|m| m.to_string()),
})
.collect::<Vec<_>>();
Some(GetEvents {
Ok(GetEvents {

Check warning on line 167 in bin/src/server/connector/remote_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/connector/remote_rpc_handler.rs#L167

Added line #L167 was not covered by tests
events,
pagination: Some(Pagination {
total: response.total as u32,
Expand Down
Loading
Loading