Multiple fields in AppState, how to share multiple state between routes? #1758
-
Hello I have a WebAPI that has a HTTP client, a db pool connection, and a KV. I'm getting stuck on sharing state and locking. The only shared mutable state that I have to manage is the KV, one route adds items to it, a separate route checks for items. However, I can't get the items to persist inside of the KV in between calls. So I think this is a problem with the (un)locking that I am doing, but I am following the example https://github.com/tokio-rs/axum/blob/main/examples/key-value-store/src/main.rs Should I have separate state struct for the client, db and KV store? How do I pass multiple states to a route? All routes need the client, db and KV store. Do I need to specify in the function header that theres an Arc on the state that I am passing in? As in If you could point me to an example that would be appreciated. I don't understand what I am doing wrong. I tried looking at the other discussions, but I couldn't figure it out. // in main.rs
#[derive(FromRef, Clone)]
pub struct AppState {
pool: PgPool,
client: HTTPClient,
invoices: std::collections::HashMap<String, String>,
}
let mut invoices: HashMap<String, String> = std::collections::HashMap::new();
let app_state = AppState {
pool,
client,
invoices,
};
let shared_state = Arc::new(RwLock::new(app_state));
let app = Router::new()
.route("/", get(controllers::info::route_info))
.route("/payment/:pubkey", get(controllers::payment::payment))
.route(
"/register",
post(controllers::register::register
),
)
.route(
"/recover/:pubkey",
get(controllers::recover::recover_backup),
)
.layer(cors)
.with_state(Arc::clone(&shared_state));
pub async fn payment(
Path(pubkey): Path<PubKey>,
State(state): State<SharedState>,
// State(pool): State<PgPool>,
// State(client): State<HTTPClient>,
// State(&mut invoices): State<HashMap<String, String>>,
) -> axum::Json<serde_json::Value> { |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 1 reply
-
You should be able to extract I can't say more without further details. If that doesn't work then please provide a minimal example that I can |
Beta Was this translation helpful? Give feedback.
-
Thank you for your haste. I was able to reproduce my issue by making the kv store example resemble mine. The only change is in the kv_set function, where I add an await for tokio::sleep in between aquiring a write lock and then using the db. This code below does not compile due to this locking issue. The data structure I'm using is not Send, so it can't be safely sent across threads. However, theres an 'await' between where the lock is acquired and where db is accesed. So for that thread to be put into the tokio runtime, it needs to do exactly that which it can't therefore the issue. Is that a fair interpretation of the situation? How do I get around this? Is there a better way to go about this? error[E0599]: no method named `layer` found for fn item `fn(axum::extract::Path<String>, State<Arc<std::sync::RwLock<AppState>>>, axum::body::Bytes) -> impl Future<Output = ()> {kv_set}` in the current scope
--> src/main.rs:243:26
|
243 | .layer((
| ^^^^^ method not found in `fn(axum::extract::Path<String>, State<Arc<std::sync::RwLock<AppState>>>, axum::body::Bytes) -> impl Future<Output = ()> {kv_set}`
error: future cannot be sent between threads safely
--> src/main.rs:295:1
|
295 | #[axum::debug_handler]
| ^^^^^^^^^^^^^^^^^^^^^^ future returned by `kv_set` is not `Send`
|
= help: within `impl Future<Output = ()>`, the trait `Send` is not implemented for `std::sync::RwLockWriteGuard<'_, AppState>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:298:64
|
297 | let app = &mut state.write().unwrap();
| ---------------------- has type `std::sync::RwLockWriteGuard<'_, AppState>` which is not `Send`
298 | tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
| ^^^^^^ await occurs here, with `state.write().unwrap()` maybe used later
299 | app.db.insert(key, bytes);
300 | }
| - `state.write().unwrap()` is later dropped here
note: required by a bound in `check`
--> src/main.rs:295:1
|
295 | #[axum::debug_handler]
| ^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `check`
= note: this error originates in the attribute macro `axum::debug_handler` (in Nightly builds, run with -Z macro-backtrace for more info)
Some errors have detailed explanations: E0252, E0433, E0599.
For more information about an error, try `rustc --explain E0252`. //! Simple in-memory key/value store showing features of axum.
//!
//! Run with:
//!
//! ```not_rust
//! cd examples && cargo run -p example-key-value-store
//! ```
use axum::{
body::Bytes,
error_handling::HandleErrorLayer,
extract::{DefaultBodyLimit, Path, State},
handler::Handler,
http::StatusCode,
response::IntoResponse,
routing::{delete, get},
Router,
};
use std::{
borrow::Cow,
collections::HashMap,
net::SocketAddr,
sync::{Arc, RwLock},
time::Duration,
};
use tower::{BoxError, ServiceBuilder};
use tower_http::{
auth::RequireAuthorizationLayer, compression::CompressionLayer, limit::RequestBodyLimitLayer,
trace::TraceLayer,
};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
#[tokio::main]
async fn main() {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "example_key_value_store=debug,tower_http=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
let shared_state = SharedState::default();
// Build our application by composing routes
let app = Router::new()
.route(
"/:key",
// Add compression to `kv_get`
get(kv_get.layer(CompressionLayer::new()))
// But don't compress `kv_set`
.post_service(
kv_set
.layer((
DefaultBodyLimit::disable(),
RequestBodyLimitLayer::new(1024 * 5_000 /* ~5mb */),
))
.with_state(Arc::clone(&shared_state)),
),
)
.route("/keys", get(list_keys))
// Nest our admin routes under `/admin`
.nest("/admin", admin_routes())
// Add middleware to all routes
.layer(
ServiceBuilder::new()
// Handle errors from middleware
.layer(HandleErrorLayer::new(handle_error))
.load_shed()
.concurrency_limit(1024)
.timeout(Duration::from_secs(10))
.layer(TraceLayer::new_for_http()),
)
.with_state(Arc::clone(&shared_state));
// Run our app with hyper
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
tracing::debug!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
}
type SharedState = Arc<RwLock<AppState>>;
#[derive(Default)]
struct AppState {
db: HashMap<String, Bytes>,
}
async fn kv_get(
Path(key): Path<String>,
State(state): State<SharedState>,
) -> Result<Bytes, StatusCode> {
let db = &state.read().unwrap().db;
if let Some(value) = db.get(&key) {
Ok(value.clone())
} else {
Err(StatusCode::NOT_FOUND)
}
}
async fn kv_set(Path(key): Path<String>, State(state): State<SharedState>, bytes: Bytes) {
let app = &mut state.write().unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
app.db.insert(key, bytes);
}
async fn list_keys(State(state): State<SharedState>) -> String {
let db = &state.read().unwrap().db;
db.keys()
.map(|key| key.to_string())
.collect::<Vec<String>>()
.join("\n")
}
fn admin_routes() -> Router<SharedState> {
async fn delete_all_keys(State(state): State<SharedState>) {
state.write().unwrap().db.clear();
}
async fn remove_key(Path(key): Path<String>, State(state): State<SharedState>) {
state.write().unwrap().db.remove(&key);
}
Router::new()
.route("/keys", delete(delete_all_keys))
.route("/key/:key", delete(remove_key))
// Require bearer auth for all admin routes
.layer(RequireAuthorizationLayer::bearer("secret-token"))
}
async fn handle_error(error: BoxError) -> impl IntoResponse {
if error.is::<tower::timeout::error::Elapsed>() {
return (StatusCode::REQUEST_TIMEOUT, Cow::from("request timed out"));
}
if error.is::<tower::load_shed::error::Overloaded>() {
return (
StatusCode::SERVICE_UNAVAILABLE,
Cow::from("service is overloaded, try again later"),
);
}
(
StatusCode::INTERNAL_SERVER_ERROR,
Cow::from(format!("Unhandled internal error: {}", error)),
)
} |
Beta Was this translation helpful? Give feedback.
Thank you for your haste.
I was able to reproduce my issue by making the kv store example resemble mine.
The only change is in the kv_set function, where I add an await for tokio::sleep in between aquiring a write lock and then using the db.
This code below does not compile due to this locking issue. The data structure I'm using is not Send, so it can't be safely sent across threads. However, theres an 'await' between where the lock is acquired and where db is accesed. So for that thread to be put into the tokio runtime, it needs to do exactly that which it can't therefore the issue.
Is that a fair interpretation of the situation?
How do I get around this? Is there a better way to go abou…