Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

more efficient serialization of initial state for new worker #1039

Merged
merged 2 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 6 additions & 41 deletions bin/src/command/requests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
collections::{BTreeMap, HashSet},
fs::File,
io::{ErrorKind, Read, Write},
io::{ErrorKind, Read},
os::unix::io::{FromRawFd, IntoRawFd},
os::unix::net::UnixStream,
time::{Duration, Instant},
Expand Down Expand Up @@ -127,52 +127,15 @@ impl CommandServer {
.with_context(|| format!("could not open file at path: {}", &path))?;

let counter = self
.save_state_to_file(&mut file)
.state
.write_requests_to_file(&mut file)
.with_context(|| "failed writing state to file")?;

info!("wrote {} commands to {}", counter, path);

Ok(Some(Success::SaveState(counter, path.into())))
}

pub fn save_state_to_file(&mut self, file: &mut File) -> anyhow::Result<usize> {
let mut counter = 0usize;
let requests = self.state.generate_requests();

let result: anyhow::Result<usize> = (move || {
for request in requests {
let message = WorkerRequest::new(format!("SAVE-{counter}"), request);

file.write_all(
&serde_json::to_string(&message)
.map(|s| s.into_bytes())
.unwrap_or_default(),
)
.with_context(|| {
format!(
"Could not add this instruction line to the saved state file: {message:?}"
)
})?;

file.write_all(&b"\n\0"[..])
.with_context(|| "Could not add new line to the saved state file")?;

if counter % 1000 == 0 {
info!("writing command {}", counter);
file.sync_all()
.with_context(|| "Failed to sync the saved state file")?;
}
counter += 1;
}
file.sync_all()
.with_context(|| "Failed to sync the saved state file")?;

Ok(counter)
})();

result.with_context(|| "Could not write the state onto the state file")
}

pub async fn load_state(
&mut self,
client_id: Option<String>,
Expand Down Expand Up @@ -1237,10 +1200,12 @@ impl CommandServer {
"Saving state to file",
)
.await;

let mut file = File::create(&path)
.with_context(|| "Could not create file to automatically save the state")?;

self.save_state_to_file(&mut file)
self.state
.write_requests_to_file(&mut file)
.with_context(|| format!("could not save state automatically to {path}"))?;
}
}
Expand Down
14 changes: 8 additions & 6 deletions bin/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use sozu_command_lib::{
logging::target_to_backend,
proto::command::{request::RequestType, Request, RunState, Status, WorkerInfo},
ready::Ready,
request::WorkerRequest,
request::{read_requests_from_file, WorkerRequest},
response::WorkerResponse,
scm_socket::{Listeners, ScmSocket},
state::ConfigState,
Expand Down Expand Up @@ -216,8 +216,9 @@ pub fn begin_worker_process(
error!("Could not block the worker-to-main channel: {}", e);
}

let configuration_state_file = unsafe { File::from_raw_fd(configuration_state_fd) };
let config_state: ConfigState = serde_json::from_reader(configuration_state_file)
let mut configuration_state_file = unsafe { File::from_raw_fd(configuration_state_fd) };

let initial_state = read_requests_from_file(&mut configuration_state_file)
.with_context(|| "could not parse configuration state data")?;

let worker_config = worker_to_main_channel
Expand Down Expand Up @@ -275,7 +276,7 @@ pub fn begin_worker_process(
worker_to_main_channel,
worker_to_main_scm_socket,
worker_config,
config_state,
initial_state,
true,
)
.with_context(|| "Could not create server from config")?;
Expand Down Expand Up @@ -305,8 +306,9 @@ pub fn fork_main_into_worker(
tempfile().with_context(|| "could not create temporary file for configuration state")?;
util::disable_close_on_exec(state_file.as_raw_fd())?;

serde_json::to_writer(&mut state_file, state)
.with_context(|| "could not write upgrade data to temporary file")?;
state
.write_requests_to_file(&mut state_file)
.with_context(|| "Could not write state to file")?;

state_file
.rewind()
Expand Down
59 changes: 59 additions & 0 deletions command/src/request.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
use std::{
error,
fmt::{self, Display},
fs::File,
io::Read,
net::SocketAddr,
str::FromStr,
};

use nom::{HexDisplay, Offset};

use crate::{
buffer::fixed::Buffer,
parser::parse_several_requests,
proto::command::{
request::RequestType, LoadBalancingAlgorithms, PathRuleKind, Request, RequestHttpFrontend,
RulePosition,
Expand All @@ -19,6 +25,10 @@ pub enum RequestError {
InvalidSocketAddress { address: String, error: String },
#[error("invalid value {value} for field '{name}'")]
InvalidValue { name: String, value: i32 },
#[error("Could not read requests from file: {0}")]
FileError(std::io::Error),
#[error("Could not parse requests: {0}")]
ParseError(String),
}

impl Request {
Expand Down Expand Up @@ -123,6 +133,55 @@ impl fmt::Display for WorkerRequest {
}
}

pub fn read_requests_from_file(file: &mut File) -> Result<Vec<WorkerRequest>, RequestError> {
let mut acc = Vec::new();
let mut buffer = Buffer::with_capacity(200000);
loop {
let previous = buffer.available_data();

let bytes_read = file
.read(buffer.space())
.map_err(|e| RequestError::FileError(e))?;

buffer.fill(bytes_read);

if buffer.available_data() == 0 {
debug!("Empty buffer");
break;
}

let mut offset = 0usize;
match parse_several_requests::<WorkerRequest>(buffer.data()) {
Ok((i, requests)) => {
if !i.is_empty() {
debug!("could not parse {} bytes", i.len());
if previous == buffer.available_data() {
break;
}
}
offset = buffer.data().offset(i);

acc.push(requests);
}
Err(nom::Err::Incomplete(_)) => {
if buffer.available_data() == buffer.capacity() {
error!(
"message too big, stopping parsing:\n{}",
buffer.data().to_hex(16)
);
break;
}
}
Err(parse_error) => {
return Err(RequestError::ParseError(parse_error.to_string()));
}
}
buffer.consume(offset);
}
let requests = acc.into_iter().flatten().collect();
Ok(requests)
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ProxyDestinations {
pub to_http_proxy: bool,
Expand Down
36 changes: 36 additions & 0 deletions command/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::{
btree_map::Entry as BTreeMapEntry, hash_map::DefaultHasher, BTreeMap, BTreeSet, HashMap,
HashSet,
},
fs::File,
hash::{Hash, Hasher},
io::Write,
iter::{repeat, FromIterator},
net::SocketAddr,
};
Expand All @@ -23,6 +25,7 @@ use crate::{
},
display::format_request_type,
},
request::WorkerRequest,
response::{Backend, HttpFrontend, TcpFrontend},
ObjectKind,
};
Expand Down Expand Up @@ -56,6 +59,8 @@ pub enum StateError {
"Could not convert the frontend to an insertable one. Frontend: {frontend} error: {error}"
)]
FrontendConversion { frontend: String, error: String },
#[error("Could not write state to file: {0}")]
FileError(std::io::Error),
}

impl From<DecodeError> for StateError {
Expand Down Expand Up @@ -1367,6 +1372,37 @@ impl ConfigState {
tcp_listeners: self.tcp_listeners.clone(),
}
}

/// generate requests necessary to recreate the state,
/// write them in a JSON form in a file, separated by \n\0,
/// returns the number of written requests
pub fn write_requests_to_file(&self, file: &mut File) -> Result<usize, StateError> {
let mut counter = 0usize;
let requests = self.generate_requests();

for request in requests {
let message = WorkerRequest::new(format!("SAVE-{counter}"), request);

file.write_all(
&serde_json::to_string(&message)
.map(|s| s.into_bytes())
.unwrap_or_default(),
)
.map_err(StateError::FileError)?;

file.write_all(&b"\n\0"[..])
.map_err(StateError::FileError)?;

if counter % 1000 == 0 {
info!("writing command {}", counter);
file.sync_all().map_err(StateError::FileError)?;
}
counter += 1;
}
file.sync_all().map_err(StateError::FileError)?;

Ok(counter)
}
}

fn parse_socket_address(address: &str) -> Result<SocketAddr, StateError> {
Expand Down
21 changes: 18 additions & 3 deletions e2e/src/sozu/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,19 @@ impl Worker {
.send_listeners(&listeners)
.expect("could not send listeners");

let initial_state = state
.generate_requests()
.into_iter()
.map(|request| WorkerRequest {
id: "initial_state".to_string(),
content: request,
})
.collect();
let server = Server::try_new_from_config(
cmd_worker_to_main,
scm_worker_to_main,
config,
state,
initial_state,
false,
)
.expect("could not create sozu worker");
Expand Down Expand Up @@ -139,7 +147,14 @@ impl Worker {
.expect("could not send listeners");

let thread_config = config.to_owned();
let thread_state = state.to_owned();
let initial_state = state
.generate_requests()
.into_iter()
.map(|request| WorkerRequest {
id: "initial_state".to_string(),
content: request,
})
.collect();
let thread_name = name.to_owned();
let thread_scm_worker_to_main = scm_worker_to_main.to_owned();

Expand All @@ -157,7 +172,7 @@ impl Worker {
cmd_worker_to_main,
thread_scm_worker_to_main,
thread_config,
thread_state,
initial_state,
false,
)
.expect("could not create sozu worker");
Expand Down
20 changes: 7 additions & 13 deletions lib/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl Server {
worker_to_main_channel: ProxyChannel,
worker_to_main_scm: ScmSocket,
config: Config,
config_state: ConfigState,
initial_state: Vec<WorkerRequest>,
expects_initial_status: bool,
) -> anyhow::Result<Self> {
let event_loop = Poll::new().with_context(|| "could not create event loop")?;
Expand Down Expand Up @@ -331,7 +331,7 @@ impl Server {
Some(https),
None,
server_config,
Some(config_state),
Some(initial_state),
expects_initial_status,
)
}
Expand All @@ -348,7 +348,7 @@ impl Server {
https: Option<https::HttpsProxy>,
tcp: Option<tcp::TcpProxy>,
server_config: ServerConfig,
config_state: Option<ConfigState>,
initial_state: Option<Vec<WorkerRequest>>,
expects_initial_status: bool,
) -> anyhow::Result<Self> {
FEATURES.with(|_features| {
Expand Down Expand Up @@ -438,16 +438,10 @@ impl Server {
};

// initialize the worker with the state we got from a file
if let Some(state) = config_state {
for (counter, request) in state.generate_requests().iter().enumerate() {
let id = format!("INIT-{counter}");
let worker_request = WorkerRequest {
id,
content: request.to_owned(),
};

trace!("generating initial config request: {:#?}", worker_request);
server.notify_proxys(worker_request);
if let Some(requests) = initial_state {
for request in requests {
trace!("generating initial config request: {:#?}", request);
server.notify_proxys(request);
}

// do not send back answers to the initialization messages
Expand Down
Loading