diff --git a/bin/src/command/requests.rs b/bin/src/command/requests.rs index 10ec7bfd9..4a204c57b 100644 --- a/bin/src/command/requests.rs +++ b/bin/src/command/requests.rs @@ -1,7 +1,7 @@ use std::{ collections::{BTreeMap, HashSet}, fs::File, - io::{ErrorKind, Read, Write}, + io::{ErrorKind, Read}, os::unix::io::{FromRawFd, IntoRawFd}, os::unix::net::UnixStream, time::{Duration, Instant}, @@ -127,7 +127,8 @@ impl CommandServer { .with_context(|| format!("could not open file at path: {}", &path))?; let counter = self - .save_state_to_file(&mut file) + .state + .write_requests_to_file(&mut file) .with_context(|| "failed writing state to file")?; info!("wrote {} commands to {}", counter, path); @@ -135,44 +136,6 @@ impl CommandServer { Ok(Some(Success::SaveState(counter, path.into()))) } - pub fn save_state_to_file(&mut self, file: &mut File) -> anyhow::Result { - let mut counter = 0usize; - let requests = self.state.generate_requests(); - - let result: anyhow::Result = (move || { - for request in requests { - let message = WorkerRequest::new(format!("SAVE-{counter}"), request); - - file.write_all( - &serde_json::to_string(&message) - .map(|s| s.into_bytes()) - .unwrap_or_default(), - ) - .with_context(|| { - format!( - "Could not add this instruction line to the saved state file: {message:?}" - ) - })?; - - file.write_all(&b"\n\0"[..]) - .with_context(|| "Could not add new line to the saved state file")?; - - if counter % 1000 == 0 { - info!("writing command {}", counter); - file.sync_all() - .with_context(|| "Failed to sync the saved state file")?; - } - counter += 1; - } - file.sync_all() - .with_context(|| "Failed to sync the saved state file")?; - - Ok(counter) - })(); - - result.with_context(|| "Could not write the state onto the state file") - } - pub async fn load_state( &mut self, client_id: Option, @@ -1237,10 +1200,12 @@ impl CommandServer { "Saving state to file", ) .await; + let mut file = File::create(&path) .with_context(|| "Could not create file to automatically save the state")?; - self.save_state_to_file(&mut file) + self.state + .write_requests_to_file(&mut file) .with_context(|| format!("could not save state automatically to {path}"))?; } } diff --git a/bin/src/worker.rs b/bin/src/worker.rs index 55a5e6496..81d361f2d 100644 --- a/bin/src/worker.rs +++ b/bin/src/worker.rs @@ -32,7 +32,7 @@ use sozu_command_lib::{ logging::target_to_backend, proto::command::{request::RequestType, Request, RunState, Status, WorkerInfo}, ready::Ready, - request::WorkerRequest, + request::{read_requests_from_file, WorkerRequest}, response::WorkerResponse, scm_socket::{Listeners, ScmSocket}, state::ConfigState, @@ -216,8 +216,9 @@ pub fn begin_worker_process( error!("Could not block the worker-to-main channel: {}", e); } - let configuration_state_file = unsafe { File::from_raw_fd(configuration_state_fd) }; - let config_state: ConfigState = serde_json::from_reader(configuration_state_file) + let mut configuration_state_file = unsafe { File::from_raw_fd(configuration_state_fd) }; + + let initial_state = read_requests_from_file(&mut configuration_state_file) .with_context(|| "could not parse configuration state data")?; let worker_config = worker_to_main_channel @@ -275,7 +276,7 @@ pub fn begin_worker_process( worker_to_main_channel, worker_to_main_scm_socket, worker_config, - config_state, + initial_state, true, ) .with_context(|| "Could not create server from config")?; @@ -305,8 +306,9 @@ pub fn fork_main_into_worker( tempfile().with_context(|| "could not create temporary file for configuration state")?; util::disable_close_on_exec(state_file.as_raw_fd())?; - serde_json::to_writer(&mut state_file, state) - .with_context(|| "could not write upgrade data to temporary file")?; + state + .write_requests_to_file(&mut state_file) + .with_context(|| "Could not write state to file")?; state_file .rewind() diff --git a/command/src/request.rs b/command/src/request.rs index d278713f1..c8350b23b 100644 --- a/command/src/request.rs +++ b/command/src/request.rs @@ -1,11 +1,17 @@ use std::{ error, fmt::{self, Display}, + fs::File, + io::Read, net::SocketAddr, str::FromStr, }; +use nom::{HexDisplay, Offset}; + use crate::{ + buffer::fixed::Buffer, + parser::parse_several_requests, proto::command::{ request::RequestType, LoadBalancingAlgorithms, PathRuleKind, Request, RequestHttpFrontend, RulePosition, @@ -19,6 +25,10 @@ pub enum RequestError { InvalidSocketAddress { address: String, error: String }, #[error("invalid value {value} for field '{name}'")] InvalidValue { name: String, value: i32 }, + #[error("Could not read requests from file: {0}")] + FileError(std::io::Error), + #[error("Could not parse requests: {0}")] + ParseError(String), } impl Request { @@ -123,6 +133,55 @@ impl fmt::Display for WorkerRequest { } } +pub fn read_requests_from_file(file: &mut File) -> Result, RequestError> { + let mut acc = Vec::new(); + let mut buffer = Buffer::with_capacity(200000); + loop { + let previous = buffer.available_data(); + + let bytes_read = file + .read(buffer.space()) + .map_err(|e| RequestError::FileError(e))?; + + buffer.fill(bytes_read); + + if buffer.available_data() == 0 { + debug!("Empty buffer"); + break; + } + + let mut offset = 0usize; + match parse_several_requests::(buffer.data()) { + Ok((i, requests)) => { + if !i.is_empty() { + debug!("could not parse {} bytes", i.len()); + if previous == buffer.available_data() { + break; + } + } + offset = buffer.data().offset(i); + + acc.push(requests); + } + Err(nom::Err::Incomplete(_)) => { + if buffer.available_data() == buffer.capacity() { + error!( + "message too big, stopping parsing:\n{}", + buffer.data().to_hex(16) + ); + break; + } + } + Err(parse_error) => { + return Err(RequestError::ParseError(parse_error.to_string())); + } + } + buffer.consume(offset); + } + let requests = acc.into_iter().flatten().collect(); + Ok(requests) +} + #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ProxyDestinations { pub to_http_proxy: bool, diff --git a/command/src/state.rs b/command/src/state.rs index 5cb45bf6d..04af19958 100644 --- a/command/src/state.rs +++ b/command/src/state.rs @@ -3,7 +3,9 @@ use std::{ btree_map::Entry as BTreeMapEntry, hash_map::DefaultHasher, BTreeMap, BTreeSet, HashMap, HashSet, }, + fs::File, hash::{Hash, Hasher}, + io::Write, iter::{repeat, FromIterator}, net::SocketAddr, }; @@ -23,6 +25,7 @@ use crate::{ }, display::format_request_type, }, + request::WorkerRequest, response::{Backend, HttpFrontend, TcpFrontend}, ObjectKind, }; @@ -56,6 +59,8 @@ pub enum StateError { "Could not convert the frontend to an insertable one. Frontend: {frontend} error: {error}" )] FrontendConversion { frontend: String, error: String }, + #[error("Could not write state to file: {0}")] + FileError(std::io::Error), } impl From for StateError { @@ -1367,6 +1372,37 @@ impl ConfigState { tcp_listeners: self.tcp_listeners.clone(), } } + + /// generate requests necessary to recreate the state, + /// write them in a JSON form in a file, separated by \n\0, + /// returns the number of written requests + pub fn write_requests_to_file(&self, file: &mut File) -> Result { + let mut counter = 0usize; + let requests = self.generate_requests(); + + for request in requests { + let message = WorkerRequest::new(format!("SAVE-{counter}"), request); + + file.write_all( + &serde_json::to_string(&message) + .map(|s| s.into_bytes()) + .unwrap_or_default(), + ) + .map_err(StateError::FileError)?; + + file.write_all(&b"\n\0"[..]) + .map_err(StateError::FileError)?; + + if counter % 1000 == 0 { + info!("writing command {}", counter); + file.sync_all().map_err(StateError::FileError)?; + } + counter += 1; + } + file.sync_all().map_err(StateError::FileError)?; + + Ok(counter) + } } fn parse_socket_address(address: &str) -> Result { diff --git a/e2e/src/sozu/worker.rs b/e2e/src/sozu/worker.rs index a25b1948c..a8bade68c 100644 --- a/e2e/src/sozu/worker.rs +++ b/e2e/src/sozu/worker.rs @@ -102,11 +102,19 @@ impl Worker { .send_listeners(&listeners) .expect("could not send listeners"); + let initial_state = state + .generate_requests() + .into_iter() + .map(|request| WorkerRequest { + id: "initial_state".to_string(), + content: request, + }) + .collect(); let server = Server::try_new_from_config( cmd_worker_to_main, scm_worker_to_main, config, - state, + initial_state, false, ) .expect("could not create sozu worker"); @@ -139,7 +147,14 @@ impl Worker { .expect("could not send listeners"); let thread_config = config.to_owned(); - let thread_state = state.to_owned(); + let initial_state = state + .generate_requests() + .into_iter() + .map(|request| WorkerRequest { + id: "initial_state".to_string(), + content: request, + }) + .collect(); let thread_name = name.to_owned(); let thread_scm_worker_to_main = scm_worker_to_main.to_owned(); @@ -157,7 +172,7 @@ impl Worker { cmd_worker_to_main, thread_scm_worker_to_main, thread_config, - thread_state, + initial_state, false, ) .expect("could not create sozu worker"); diff --git a/lib/src/server.rs b/lib/src/server.rs index 12811decc..0461dbe41 100644 --- a/lib/src/server.rs +++ b/lib/src/server.rs @@ -269,7 +269,7 @@ impl Server { worker_to_main_channel: ProxyChannel, worker_to_main_scm: ScmSocket, config: Config, - config_state: ConfigState, + initial_state: Vec, expects_initial_status: bool, ) -> anyhow::Result { let event_loop = Poll::new().with_context(|| "could not create event loop")?; @@ -331,7 +331,7 @@ impl Server { Some(https), None, server_config, - Some(config_state), + Some(initial_state), expects_initial_status, ) } @@ -348,7 +348,7 @@ impl Server { https: Option, tcp: Option, server_config: ServerConfig, - config_state: Option, + initial_state: Option>, expects_initial_status: bool, ) -> anyhow::Result { FEATURES.with(|_features| { @@ -438,16 +438,10 @@ impl Server { }; // initialize the worker with the state we got from a file - if let Some(state) = config_state { - for (counter, request) in state.generate_requests().iter().enumerate() { - let id = format!("INIT-{counter}"); - let worker_request = WorkerRequest { - id, - content: request.to_owned(), - }; - - trace!("generating initial config request: {:#?}", worker_request); - server.notify_proxys(worker_request); + if let Some(requests) = initial_state { + for request in requests { + trace!("generating initial config request: {:#?}", request); + server.notify_proxys(request); } // do not send back answers to the initialization messages