Skip to content

Commit

Permalink
Merge pull request #275 from kinode-dao/dr/http-add-unbind
Browse files Browse the repository at this point in the history
HTTP server: add `Unbind` actions
  • Loading branch information
dr-frmr authored Mar 6, 2024
2 parents 274ae26 + 0b3fc47 commit 3adfebf
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 12 deletions.
70 changes: 58 additions & 12 deletions kinode/src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type PathBindings = Arc<RwLock<Router<BoundPath>>>;
type WsPathBindings = Arc<RwLock<Router<BoundWsPath>>>;

struct BoundPath {
pub app: ProcessId,
pub app: Option<ProcessId>, // if None, path has been unbound
pub path: String,
pub secure_subdomain: Option<String>,
pub authenticated: bool,
Expand All @@ -49,7 +49,7 @@ struct BoundPath {
}

struct BoundWsPath {
pub app: ProcessId,
pub app: Option<ProcessId>, // if None, path has been unbound
pub secure_subdomain: Option<String>,
pub authenticated: bool,
pub encrypted: bool, // TODO use
Expand Down Expand Up @@ -191,7 +191,7 @@ pub async fn http_server(
// add RPC path
let mut bindings_map: Router<BoundPath> = Router::new();
let rpc_bound_path = BoundPath {
app: ProcessId::new(Some("rpc"), "distro", "sys"),
app: Some(ProcessId::new(Some("rpc"), "distro", "sys")),
path: path.clone(),
secure_subdomain: None, // TODO maybe RPC should have subdomain?
authenticated: false,
Expand Down Expand Up @@ -397,6 +397,10 @@ async fn ws_handler(
};

let bound_path = route.handler();
let Some(app) = bound_path.app.clone() else {
return Err(warp::reject::not_found());
};

if let Some(ref subdomain) = bound_path.secure_subdomain {
let _ = print_tx
.send(Printout {
Expand Down Expand Up @@ -435,7 +439,6 @@ async fn ws_handler(
return Err(warp::reject::reject());
}

let app = bound_path.app.clone();
let extension = bound_path.extension;

drop(ws_path_bindings);
Expand Down Expand Up @@ -500,6 +503,10 @@ async fn http_handler(
};
let bound_path = route.handler();

let Some(app) = &bound_path.app else {
return Ok(warp::reply::with_status(vec![], StatusCode::NOT_FOUND).into_response());
};

if bound_path.authenticated
&& !auth_cookie_valid(
&our,
Expand Down Expand Up @@ -578,7 +585,7 @@ async fn http_handler(
// RPC functionality: if path is /rpc:distro:sys/message,
// we extract message from base64 encoded bytes in data
// and send it to the correct app.
let (message, is_fire_and_forget) = if bound_path.app == "rpc:distro:sys" {
let (message, is_fire_and_forget) = if app == &"rpc:distro:sys" {
match handle_rpc_message(our, id, body, print_tx).await {
Ok((message, is_fire_and_forget)) => (message, is_fire_and_forget),
Err(e) => {
Expand All @@ -601,7 +608,7 @@ async fn http_handler(
},
target: Address {
node: our.to_string(),
process: bound_path.app.clone(),
process: app.clone(),
},
rsvp: None,
message: Message::Request(Request {
Expand Down Expand Up @@ -1147,7 +1154,7 @@ async fn handle_app_message(
path_bindings.add(
&normalize_path(&path),
BoundPath {
app: km.source.process.clone(),
app: Some(km.source.process.clone()),
path: path.clone(),
secure_subdomain: None,
authenticated,
Expand All @@ -1170,7 +1177,7 @@ async fn handle_app_message(
path_bindings.add(
&normalize_path(&path),
BoundPath {
app: km.source.process.clone(),
app: Some(km.source.process.clone()),
path: path.clone(),
secure_subdomain: None,
authenticated,
Expand All @@ -1194,7 +1201,7 @@ async fn handle_app_message(
path_bindings.add(
&normalize_path(&path),
BoundPath {
app: km.source.process.clone(),
app: Some(km.source.process.clone()),
path: path.clone(),
secure_subdomain: Some(subdomain),
authenticated: true,
Expand All @@ -1217,7 +1224,7 @@ async fn handle_app_message(
path_bindings.add(
&normalize_path(&path),
BoundPath {
app: km.source.process.clone(),
app: Some(km.source.process.clone()),
path: path.clone(),
secure_subdomain: Some(subdomain),
authenticated: true,
Expand All @@ -1227,6 +1234,27 @@ async fn handle_app_message(
);
}
}
HttpServerAction::Unbind { mut path } => {
let mut path_bindings = path_bindings.write().await;
if km.source.process != "homepage:homepage:sys" {
path = if path.starts_with('/') {
format!("/{}{}", km.source.process, path)
} else {
format!("/{}/{}", km.source.process, path)
};
}
path_bindings.add(
&normalize_path(&path),
BoundPath {
app: None,
path: path.clone(),
secure_subdomain: None,
authenticated: false,
local_only: false,
static_content: None,
},
);
}
HttpServerAction::WebSocketBind {
mut path,
authenticated,
Expand All @@ -1242,7 +1270,7 @@ async fn handle_app_message(
ws_path_bindings.add(
&normalize_path(&path),
BoundWsPath {
app: km.source.process.clone(),
app: Some(km.source.process.clone()),
secure_subdomain: None,
authenticated,
encrypted,
Expand All @@ -1267,14 +1295,32 @@ async fn handle_app_message(
ws_path_bindings.add(
&normalize_path(&path),
BoundWsPath {
app: km.source.process.clone(),
app: Some(km.source.process.clone()),
secure_subdomain: Some(subdomain),
authenticated: true,
encrypted,
extension,
},
);
}
HttpServerAction::WebSocketUnbind { mut path } => {
let mut ws_path_bindings = ws_path_bindings.write().await;
path = if path.starts_with('/') {
format!("/{}{}", km.source.process, path)
} else {
format!("/{}/{}", km.source.process, path)
};
ws_path_bindings.add(
&normalize_path(&path),
BoundWsPath {
app: None,
secure_subdomain: None,
authenticated: false,
encrypted: false,
extension: false,
},
);
}
HttpServerAction::WebSocketOpen { .. } => {
// we cannot receive these, only send them to processes
send_action_response(
Expand Down
4 changes: 4 additions & 0 deletions lib/src/http/server_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ pub enum HttpServerAction {
/// lazy_load_blob bytes and serve them as the response to any request to this path.
cache: bool,
},
/// Unbind a previously-bound HTTP path
Unbind { path: String },
/// Bind a path to receive incoming WebSocket connections.
/// Doesn't need a cache since does not serve assets.
WebSocketBind {
Expand All @@ -107,6 +109,8 @@ pub enum HttpServerAction {
encrypted: bool,
extension: bool,
},
/// Unbind a previously-bound WebSocket path
WebSocketUnbind { path: String },
/// Processes will RECEIVE this kind of request when a client connects to them.
/// If a process does not want this websocket open, they should issue a *request*
/// containing a [`type@HttpServerAction::WebSocketClose`] message and this channel ID.
Expand Down

0 comments on commit 3adfebf

Please sign in to comment.