Skip to content

Commit

Permalink
Refactor backend logic with an Origin struct
Browse files Browse the repository at this point in the history
I feel like going it's nowhere...
Backend logic is messy and unclear, buffers are needlessly kept alive,
borrowing subsets of the HTTP session is harder and harder, reseting
default answers request is way too hard for what it's worth (it's
probably broken right now), gauges are desynched and we are cloning
again and again String ids...

Signed-off-by: Eloi DEMOLIS <[email protected]>
  • Loading branch information
Wonshtrum committed Jan 8, 2025
1 parent 6110de3 commit 28ea2c6
Show file tree
Hide file tree
Showing 14 changed files with 360 additions and 377 deletions.
1 change: 0 additions & 1 deletion command/assets/custom_200.html
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
HTTP/1.1 200 OK
%Content-Length: %CONTENT_LENGTH
Sozu-Id: %REQUEST_ID

<h1>%CLUSTER_ID Custom 200</h1>
Expand Down
1 change: 0 additions & 1 deletion command/assets/custom_404.html
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
HTTP/1.1 404 Not Found
Cache-Control: no-cache
Connection: close
Sozu-Id: %REQUEST_ID

<h1>My own 404 error page</h1>
Expand Down
1 change: 0 additions & 1 deletion command/assets/custom_503.html
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
HTTP/1.1 503 Service Unavailable
Cache-Control: no-cache
Connection: close
%Content-Length: %CONTENT_LENGTH
Sozu-Id: %REQUEST_ID

<h1>MyCluster: 503 Service Unavailable</h1>
Expand Down
1 change: 1 addition & 0 deletions command/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1724,6 +1724,7 @@ mod tests {
hostname: String::from("test.local"),
path: PathRule::prefix(String::from("/abc")),
address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
required_auth: Some(false),
redirect: Some(RedirectPolicy::Forward.into()),
redirect_scheme: Some(RedirectScheme::UseSame.into()),
..Default::default()
Expand Down
15 changes: 10 additions & 5 deletions e2e/src/http_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@ pub fn http_request<S1: Into<String>, S2: Into<String>, S3: Into<String>, S4: In
)
}

pub fn immutable_answer(status: u16) -> String {
pub fn immutable_answer(status: u16, content_length: bool) -> String {
let content_length = if content_length {
"\r\nContent-Length: 0"
} else {
""
};
match status {
400 => String::from("HTTP/1.1 400 Bad Request\r\nCache-Control: no-cache\r\nConnection: close\r\n\r\n"),
404 => String::from("HTTP/1.1 404 Not Found\r\nCache-Control: no-cache\r\nConnection: close\r\n\r\n"),
502 => String::from("HTTP/1.1 502 Bad Gateway\r\nCache-Control: no-cache\r\nConnection: close\r\n\r\n"),
503 => String::from("HTTP/1.1 503 Service Unavailable\r\nCache-Control: no-cache\r\nConnection: close\r\n\r\n"),
400 => format!("HTTP/1.1 400 Bad Request\r\nCache-Control: no-cache\r\nConnection: close{content_length}\r\n\r\n"),
404 => format!("HTTP/1.1 404 Not Found\r\nCache-Control: no-cache\r\nConnection: close{content_length}\r\n\r\n"),
502 => format!("HTTP/1.1 502 Bad Gateway\r\nCache-Control: no-cache\r\nConnection: close{content_length}\r\n\r\n"),
503 => format!("HTTP/1.1 503 Service Unavailable\r\nCache-Control: no-cache\r\nConnection: close{content_length}\r\n\r\n"),
_ => unimplemented!()
}
}
20 changes: 10 additions & 10 deletions e2e/src/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,10 +645,10 @@ fn try_http_behaviors() -> State {
.to_http(None)
.unwrap();
http_config.answers = BTreeMap::from([
("400".to_string(), immutable_answer(400)),
("404".to_string(), immutable_answer(404)),
("502".to_string(), immutable_answer(502)),
("503".to_string(), immutable_answer(503)),
("400".to_string(), immutable_answer(400, false)),
("404".to_string(), immutable_answer(404, false)),
("502".to_string(), immutable_answer(502, false)),
("503".to_string(), immutable_answer(503, false)),
]);

worker.send_proxy_request_type(RequestType::AddHttpListener(http_config));
Expand All @@ -671,7 +671,7 @@ fn try_http_behaviors() -> State {

let response = client.receive();
println!("response: {response:?}");
assert_eq!(response, Some(immutable_answer(404)));
assert_eq!(response, Some(immutable_answer(404, true)));
assert_eq!(client.receive(), None);

worker.send_proxy_request_type(RequestType::AddHttpFrontend(RequestHttpFrontend {
Expand All @@ -686,7 +686,7 @@ fn try_http_behaviors() -> State {

let response = client.receive();
println!("response: {response:?}");
assert_eq!(response, Some(immutable_answer(503)));
assert_eq!(response, Some(immutable_answer(503, true)));
assert_eq!(client.receive(), None);

let back_address = create_local_address();
Expand All @@ -706,7 +706,7 @@ fn try_http_behaviors() -> State {

let response = client.receive();
println!("response: {response:?}");
assert_eq!(response, Some(immutable_answer(400)));
assert_eq!(response, Some(immutable_answer(400, true)));
assert_eq!(client.receive(), None);

let mut backend = SyncBackend::new("backend", back_address, "TEST\r\n\r\n");
Expand All @@ -723,7 +723,7 @@ fn try_http_behaviors() -> State {
let response = client.receive();
println!("request: {request:?}");
println!("response: {response:?}");
assert_eq!(response, Some(immutable_answer(502)));
assert_eq!(response, Some(immutable_answer(502, true)));
assert_eq!(client.receive(), None);

info!("expecting 200");
Expand Down Expand Up @@ -786,7 +786,7 @@ fn try_http_behaviors() -> State {
let response = client.receive();
println!("request: {request:?}");
println!("response: {response:?}");
assert_eq!(response, Some(immutable_answer(503)));
assert_eq!(response, Some(immutable_answer(503, true)));
assert_eq!(client.receive(), None);

worker.send_proxy_request_type(RequestType::RemoveBackend(RemoveBackend {
Expand Down Expand Up @@ -984,7 +984,7 @@ fn try_https_redirect() -> State {
client.connect();
client.send();
let answer = client.receive();
let expected_answer = format!("{answer_301_prefix}https://example.com/redirected?true\r\n\r\n");
let expected_answer = format!("{answer_301_prefix}https://example.com/redirected?true\r\nContent-Length: 0\r\n\r\n");
assert_eq!(answer, Some(expected_answer));

State::Success
Expand Down
25 changes: 14 additions & 11 deletions lib/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,14 @@ impl HttpSession {
}
}

fn upgrade_http(&mut self, http: Http<TcpStream, HttpListener>) -> Option<HttpStateMachine> {
fn upgrade_http(
&mut self,
mut http: Http<TcpStream, HttpListener>,
) -> Option<HttpStateMachine> {
debug!("http switching to ws");
let front_token = self.frontend_token;
let back_token = match http.backend_token {
Some(back_token) => back_token,
let frontend_token = self.frontend_token;
let origin = match http.origin.take() {
Some(origin) => origin,
None => {
warn!(
"Could not upgrade http request on cluster '{:?}' ({:?}) using backend '{:?}' into websocket for request '{}'",
Expand All @@ -223,7 +226,7 @@ impl HttpSession {
}
};

let ws_context = http.websocket_context();
let websocket_context = http.websocket_context();
let mut container_frontend_timeout = http.container_frontend_timeout;
let mut container_backend_timeout = http.container_backend_timeout;
container_frontend_timeout.reset();
Expand All @@ -237,25 +240,25 @@ impl HttpSession {

let mut pipe = Pipe::new(
backend_buffer,
http.context.backend_id,
http.backend_socket,
http.backend,
Some(origin.backend_id),
Some(origin.socket),
Some(origin.backend),
Some(container_backend_timeout),
Some(container_frontend_timeout),
http.context.cluster_id,
http.request_stream.storage.buffer,
front_token,
frontend_token,
http.frontend_socket,
self.listener.clone(),
Protocol::HTTP,
http.context.id,
http.context.session_address,
ws_context,
websocket_context,
);

pipe.frontend_readiness.event = http.frontend_readiness.event;
pipe.backend_readiness.event = http.backend_readiness.event;
pipe.set_back_token(back_token);
pipe.set_back_token(origin.token);

gauge_add!("protocol.http", -1);
gauge_add!("protocol.ws", 1);
Expand Down
23 changes: 13 additions & 10 deletions lib/src/https.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,14 @@ impl HttpsSession {
}
}

fn upgrade_http(&self, http: Http<FrontRustls, HttpsListener>) -> Option<HttpsStateMachine> {
fn upgrade_http(
&self,
mut http: Http<FrontRustls, HttpsListener>,
) -> Option<HttpsStateMachine> {
debug!("https switching to wss");
let front_token = self.frontend_token;
let back_token = match http.backend_token {
Some(back_token) => back_token,
let origin = match http.origin.take() {
Some(origin) => origin,
None => {
warn!(
"Could not upgrade https request on cluster '{:?}' ({:?}) using backend '{:?}' into secure websocket for request '{}'",
Expand All @@ -343,7 +346,7 @@ impl HttpsSession {
}
};

let ws_context = http.websocket_context();
let websocket_context = http.websocket_context();
let mut container_frontend_timeout = http.container_frontend_timeout;
let mut container_backend_timeout = http.container_backend_timeout;
container_frontend_timeout.reset();
Expand All @@ -357,25 +360,25 @@ impl HttpsSession {

let mut pipe = Pipe::new(
backend_buffer,
http.context.backend_id,
http.backend_socket,
http.backend,
Some(origin.backend_id),
Some(origin.socket),
Some(origin.backend),
Some(container_backend_timeout),
Some(container_frontend_timeout),
http.context.cluster_id,
http.request_stream.storage.buffer,
front_token,
http.frontend_socket,
self.listener.clone(),
Protocol::HTTP,
Protocol::HTTPS,
http.context.id,
http.context.session_address,
ws_context,
websocket_context,
);

pipe.frontend_readiness.event = http.frontend_readiness.event;
pipe.backend_readiness.event = http.backend_readiness.event;
pipe.set_back_token(back_token);
pipe.set_back_token(origin.token);

gauge_add!("protocol.https", -1);
gauge_add!("protocol.wss", 1);
Expand Down
4 changes: 1 addition & 3 deletions lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,6 @@ pub struct SessionMetrics {
pub service_start: Option<Instant>,
pub wait_start: Instant,

pub backend_id: Option<String>,
pub backend_start: Option<Instant>,
pub backend_connected: Option<Instant>,
pub backend_stop: Option<Instant>,
Expand All @@ -971,7 +970,6 @@ impl SessionMetrics {
bout: 0,
service_start: None,
wait_start: Instant::now(),
backend_id: None,
backend_start: None,
backend_connected: None,
backend_stop: None,
Expand Down Expand Up @@ -1072,7 +1070,7 @@ impl SessionMetrics {
time!("request_time", request_time.as_millis());
time!("service_time", service_time.as_millis());

if let Some(backend_id) = self.backend_id.as_ref() {
if let Some(backend_id) = context.backend_id {
if let Some(backend_response_time) = self.backend_response_time() {
record_backend_metrics!(
context.cluster_id.as_str_or("-"),
Expand Down
Loading

0 comments on commit 28ea2c6

Please sign in to comment.