Skip to content

Commit

Permalink
Small changes mainly relative to logging
Browse files Browse the repository at this point in the history
- Handle Logging command at worker level (not proxy)
- parse_logging_spec returns a vec of parsing errors
- Logging command takes a string for logging_filter instead of LogLevel
- Logging command fails uppon any errors of parse_logging_spec
- Logger::init ignores parse_logging_spec errors
- Reorganise some code handling commands
- Generate access log on abrupt client deconnection

Signed-off-by: Eloi DEMOLIS <[email protected]>
  • Loading branch information
Wonshtrum authored and Keksoj committed Feb 9, 2024
1 parent 2b5ed4c commit c542067
Show file tree
Hide file tree
Showing 16 changed files with 144 additions and 161 deletions.
26 changes: 2 additions & 24 deletions bin/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ pub enum SubCmd {
},
#[clap(name = "logging", about = "change logging level")]
Logging {
#[clap(subcommand)]
level: LoggingLevel,
#[clap(name = "filter")]
filter: String,
},
#[clap(name = "state", about = "state management")]
State {
Expand Down Expand Up @@ -231,28 +231,6 @@ pub enum MetricsCmd {
},
}

#[derive(Subcommand, PartialEq, Eq, Clone, Debug)]
pub enum LoggingLevel {
#[clap(name = "trace", about = "Displays a LOT of logs")]
Trace,
#[clap(
name = "debug",
about = "Displays more logs about the inner workings of Sōzu"
)]
Debug,
#[clap(name = "error", about = "Displays occurring errors")]
Error,
#[clap(name = "warn", about = "Displays warnings about non-critical errors")]
Warn,
#[clap(name = "info", about = "Displays logs about normal behaviour of Sōzu")]
Info,
}
impl std::fmt::Display for LoggingLevel {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{self:?}")
}
}

#[derive(Subcommand, PartialEq, Eq, Clone, Debug)]
pub enum StateCmd {
#[clap(name = "save", about = "Save state to that file")]
Expand Down
22 changes: 17 additions & 5 deletions bin/src/command/requests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
collections::{BTreeMap, HashMap},
env,
fs::File,
io::{ErrorKind, Read},
};
Expand Down Expand Up @@ -216,17 +217,28 @@ fn save_state(server: &mut Server, client: &mut ClientSession, path: &str) {
/// change logging level on the main process, and on all workers
fn set_logging_level(server: &mut Server, client: &mut ClientSession, logging_filter: String) {
debug!("Changing main process log level to {}", logging_filter);
logging::LOGGER.with(|l| {
let directives = logging::parse_logging_spec(&logging_filter);
l.borrow_mut().set_directives(directives);
let (directives, errors) = logging::parse_logging_spec(&logging_filter);
if !errors.is_empty() {
client.finish_failure(format!(
"Error parsing logging filter:\n {}",
errors
.iter()
.map(logging::LogSpecParseError::to_string)
.collect::<Vec<String>>()
.join("\n ")
));
return;
}
logging::LOGGER.with(|logger| {
logger.borrow_mut().set_directives(directives);
});

// also change / set the content of RUST_LOG so future workers / main thread
// will have the new logging filter value
::std::env::set_var("RUST_LOG", &logging_filter);
env::set_var("RUST_LOG", &logging_filter);
debug!(
"Logging level now: {}",
::std::env::var("RUST_LOG").unwrap_or("could get RUST_LOG from env".to_string())
env::var("RUST_LOG").unwrap_or("could get RUST_LOG from env".to_string())
);

worker_request(server, client, RequestType::Logging(logging_filter));
Expand Down
7 changes: 4 additions & 3 deletions bin/src/command/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ impl CommandHub {
let channel = Channel::new(stream, 4096, u64::MAX);
let id = self.next_client_id();
let session = ClientSession::new(channel, id, token);
info!("register new client: {}", id);
debug!("new client: {:?}", session);
info!("Register new client: {}", id);
debug!("{:#?}", session);
self.clients.insert(token, session);
}

Expand Down Expand Up @@ -400,7 +400,8 @@ impl CommandHub {
server.handle_client_request(client, request);
}
ClientResult::CloseSession => {
info!("Closing client {:#?}", client);
info!("Closing client {}", client.id);
debug!("{:#?}", client);
self.event_subscribers.remove(&token);
self.clients.remove(&token);
}
Expand Down
2 changes: 1 addition & 1 deletion bin/src/ctl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl CommandManager {
} => self.get_metrics(list, refresh, names, clusters, backends, no_clusters),
_ => self.configure_metrics(cmd),
},
SubCmd::Logging { level } => self.logging_filter(&level),
SubCmd::Logging { filter } => self.logging_filter(filter),
SubCmd::State { cmd } => match cmd {
StateCmd::Save { file } => self.save_state(file),
StateCmd::Load { file } => self.load_state(file),
Expand Down
8 changes: 4 additions & 4 deletions bin/src/ctl/request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use sozu_command_lib::{

use crate::{
cli::{
BackendCmd, ClusterCmd, HttpFrontendCmd, HttpListenerCmd, HttpsListenerCmd, LoggingLevel,
MetricsCmd, TcpFrontendCmd, TcpListenerCmd,
BackendCmd, ClusterCmd, HttpFrontendCmd, HttpListenerCmd, HttpsListenerCmd, MetricsCmd,
TcpFrontendCmd, TcpListenerCmd,
},
ctl::CommandManager,
};
Expand Down Expand Up @@ -484,8 +484,8 @@ impl CommandManager {
)
}

pub fn logging_filter(&mut self, filter: &LoggingLevel) -> Result<(), CtlError> {
self.send_request(RequestType::Logging(filter.to_string().to_lowercase()).into())
pub fn logging_filter(&mut self, filter: String) -> Result<(), CtlError> {
self.send_request(RequestType::Logging(filter).into())
}

pub fn add_certificate(
Expand Down
19 changes: 15 additions & 4 deletions command/src/logging/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
EndpointRecord, FullTags, LogContext, LogDuration, LogLevel, LogMessage, LoggerBackend,
Rfc3339Time,
},
AsStr, AsString,
AsStr,
};

impl LogLevel {
Expand Down Expand Up @@ -122,11 +122,11 @@ impl fmt::Display for EndpointRecord<'_> {
..
} => write!(
f,
"{} {} {} -> {}",
"{} {} {} {}",
authority.as_str_or("-"),
method.as_str_or("-"),
path.as_str_or("-"),
status.as_string_or("-"),
display_status(*status, f.alternate()),
),
Self::Tcp { context } => {
write!(f, "{}", context.as_str_or("-"))
Expand All @@ -135,10 +135,21 @@ impl fmt::Display for EndpointRecord<'_> {
}
}

fn display_status(status: Option<u16>, pretty: bool) -> String {
match (status, pretty) {
(Some(s @ 200..=299), true) => format!("\x1b[32m{s}"),
(Some(s @ 300..=399), true) => format!("\x1b[34m{s}"),
(Some(s @ 400..=499), true) => format!("\x1b[33m{s}"),
(Some(s @ 500..=599), true) => format!("\x1b[31m{s}"),
(Some(s), _) => s.to_string(),
(None, _) => "-".to_string(),
}
}

impl<'a> fmt::Display for FullTags<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match (self.concatenated, self.user_agent) {
(None, None) => write!(f, "-"),
(None, None) => Ok(()),
(Some(tags), None) => write!(f, "{tags}"),
(Some(tags), Some(ua)) if !tags.is_empty() => {
write!(f, "{tags}, user-agent={}", prepare_user_agent(ua))
Expand Down
38 changes: 21 additions & 17 deletions command/src/logging/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl Logger {
access_format: Option<AccessLogFormat>,
access_colored: Option<bool>,
) {
let directives = parse_logging_spec(spec);
let (directives, _errors) = parse_logging_spec(spec);
LOGGER.with(|logger| {
let mut logger = logger.borrow_mut();
if !logger.initialized {
Expand Down Expand Up @@ -283,7 +283,7 @@ impl InnerLogger {
]
},
colored: {
formats: ["\x1b[;1m{}\x1b[m {}->{} {}/{}/{}/{} {}->{} \x1b[2m[{}] \x1b[;1m{} {}\x1b[m{}\n"],
formats: ["\x1b[;1m{}\x1b[m {}->{} {}/{}/{}/{} {}->{} \x1b[2m[{}] \x1b[;1m{} {:#}\x1b[m{}\n"],
args: @,
}
},
Expand Down Expand Up @@ -519,18 +519,25 @@ pub struct LogDirective {
level: LogLevelFilter,
}

pub fn parse_logging_spec(spec: &str) -> Vec<LogDirective> {
#[derive(thiserror::Error, Debug)]
pub enum LogSpecParseError {
#[error("Too many '/'s: {0}")]
TooManySlashes(String),
#[error("Too many '='s: {0}")]
TooManyEquals(String),
#[error("Invalid log level: {0}")]
InvalidLogLevel(String),
}

pub fn parse_logging_spec(spec: &str) -> (Vec<LogDirective>, Vec<LogSpecParseError>) {
let mut dirs = Vec::new();
let mut errors = Vec::new();

let mut parts = spec.split('/');
let mods = parts.next();
let _ = parts.next();
if parts.next().is_some() {
println!(
"warning: invalid logging spec '{spec}', \
ignoring it (too many '/'s)"
);
return dirs;
errors.push(LogSpecParseError::TooManySlashes(spec.to_string()));
}
if let Some(m) = mods {
for s in m.split(',') {
Expand All @@ -552,18 +559,12 @@ pub fn parse_logging_spec(spec: &str) -> Vec<LogDirective> {
(Some(part0), Some(part1), None) => match part1.parse() {
Ok(num) => (num, Some(part0)),
_ => {
println!(
"warning: invalid logging spec '{part1}', \
ignoring it"
);
errors.push(LogSpecParseError::InvalidLogLevel(s.to_string()));
continue;
}
},
_ => {
println!(
"warning: invalid logging spec '{s}', \
ignoring it"
);
errors.push(LogSpecParseError::TooManyEquals(s.to_string()));
continue;
}
};
Expand All @@ -574,7 +575,10 @@ pub fn parse_logging_spec(spec: &str) -> Vec<LogDirective> {
}
}

dirs
for error in &errors {
println!("{error:?}");
}
(dirs, errors)
}

/// start the logger with all logs and access logs on stdout
Expand Down
16 changes: 9 additions & 7 deletions command/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,20 @@ impl Request {
| RequestType::RemoveBackend(_)
| RequestType::SoftStop(_)
| RequestType::HardStop(_)
| RequestType::Status(_)
| RequestType::QueryClusterById(_)
| RequestType::QueryClustersByDomain(_)
| RequestType::QueryClustersHashes(_)
| RequestType::QueryMetrics(_)
| RequestType::Logging(_) => {
| RequestType::Status(_) => {
proxy_destination.to_http_proxy = true;
proxy_destination.to_https_proxy = true;
proxy_destination.to_tcp_proxy = true;
}

// handled at worker level prior to this call
RequestType::ConfigureMetrics(_)
| RequestType::QueryMetrics(_)
| RequestType::Logging(_)
| RequestType::QueryClustersHashes(_)
| RequestType::QueryClusterById(_)
| RequestType::QueryClustersByDomain(_) => {}

// the Add***Listener and other Listener orders will be handled separately
// by the notify_proxys function, so we don't give them destinations
RequestType::AddHttpsListener(_)
Expand All @@ -85,7 +88,6 @@ impl Request {
| RequestType::RemoveListener(_)
| RequestType::ActivateListener(_)
| RequestType::DeactivateListener(_)
| RequestType::ConfigureMetrics(_)
| RequestType::ReturnListenSockets(_) => {}

// These won't ever reach a worker anyway
Expand Down
4 changes: 2 additions & 2 deletions command/src/scm_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ pub enum ScmSocketError {
blocking: bool,
error: std::io::Error,
},
#[error("could not send message per SCM socket: {0}")]
#[error("could not send message for SCM socket: {0}")]
Send(String),
#[error("could not send message per SCM socket: {0}")]
#[error("could not receive message for SCM socket: {0}")]
Receive(String),
#[error("invalid char set: {0}")]
InvalidCharSet(String),
Expand Down
24 changes: 12 additions & 12 deletions command/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,18 @@ impl ConfigState {
RequestType::RemoveBackend(backend) => self.remove_backend(backend),

// This is to avoid the error message
&RequestType::Logging(_)
| &RequestType::CountRequests(_)
| &RequestType::Status(_)
| &RequestType::SoftStop(_)
| &RequestType::QueryCertificatesFromWorkers(_)
| &RequestType::QueryClusterById(_)
| &RequestType::QueryClustersByDomain(_)
| &RequestType::QueryMetrics(_)
| &RequestType::QueryClustersHashes(_)
| &RequestType::ConfigureMetrics(_)
| &RequestType::ReturnListenSockets(_)
| &RequestType::HardStop(_) => Ok(()),
RequestType::Logging(_)
| RequestType::CountRequests(_)
| RequestType::Status(_)
| RequestType::SoftStop(_)
| RequestType::QueryCertificatesFromWorkers(_)
| RequestType::QueryClusterById(_)
| RequestType::QueryClustersByDomain(_)
| RequestType::QueryMetrics(_)
| RequestType::QueryClustersHashes(_)
| RequestType::ConfigureMetrics(_)
| RequestType::ReturnListenSockets(_)
| RequestType::HardStop(_) => Ok(()),

_other_request => Err(StateError::UndispatchableRequest),
}
Expand Down
19 changes: 2 additions & 17 deletions lib/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use rusty_ulid::Ulid;
use time::{Duration, Instant};

use sozu_command::{
logging::{self, CachedTags},
logging::CachedTags,
proto::command::{
request::RequestType, Cluster, HttpListenerConfig, ListenerType, RemoveListener,
RequestHttpFrontend, WorkerRequest, WorkerResponse,
Expand Down Expand Up @@ -522,7 +522,7 @@ impl HttpProxy {

pub fn remove_listener(&mut self, remove: RemoveListener) -> Result<(), ProxyError> {
let len = self.listeners.len();
let remove_address = remove.address.clone().into();
let remove_address = remove.address.into();
self.listeners
.retain(|_, l| l.borrow().address != remove_address);

Expand Down Expand Up @@ -702,14 +702,6 @@ impl HttpProxy {

Ok(())
}

pub fn logging(&mut self, logging_filter: String) -> Result<(), ProxyError> {
logging::LOGGER.with(|l| {
let directives = logging::parse_logging_spec(&logging_filter);
l.borrow_mut().set_directives(directives);
});
Ok(())
}
}

impl HttpListener {
Expand Down Expand Up @@ -838,13 +830,6 @@ impl ProxyConfiguration for HttpProxy {
debug!("{} status", request_id);
Ok(())
}
Some(RequestType::Logging(logging_filter)) => {
debug!(
"{} changing logging filter to {}",
request_id, logging_filter
);
self.logging(logging_filter)
}
other_command => {
debug!(
"{} unsupported message for HTTP proxy, ignoring: {:?}",
Expand Down
Loading

0 comments on commit c542067

Please sign in to comment.