Skip to content

Commit

Permalink
Merge pull request #162 from Shishqa/shishqa/semaphore Semaphore API
Browse files Browse the repository at this point in the history
  • Loading branch information
rekby authored Jan 23, 2024
2 parents 997754b + 4a21c37 commit 903b6cd
Show file tree
Hide file tree
Showing 29 changed files with 1,371 additions and 94 deletions.
78 changes: 78 additions & 0 deletions ydb/examples/mutex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use std::time::Duration;

use tokio::task::JoinHandle;

use ydb::{
ClientBuilder, CoordinationSession, NodeConfigBuilder, SessionOptionsBuilder, YdbResult,
};

async fn mutex_work(session: CoordinationSession) {
let lease = session
.acquire_semaphore("my-resource".to_string(), 1)
.await
.unwrap();

let lease_alive = lease.alive();
println!("acquired semaphore");
tokio::select! {
_ = lease_alive.cancelled() => {},
_ = tokio::time::sleep(Duration::from_millis(20)) => {
println!("finished work");
},
}
}

#[tokio::main]
async fn main() -> YdbResult<()> {
let client = ClientBuilder::new_from_connection_string("grpc://localhost:2136?database=local")?
.client()?;
client.wait().await?;

let mut coordination_client = client.coordination_client();

let _ = coordination_client
.drop_node("local/test".to_string())
.await;

coordination_client
.create_node(
"local/test".to_string(),
NodeConfigBuilder::default().build()?,
)
.await?;

let session = coordination_client
.create_session(
"local/test".to_string(),
SessionOptionsBuilder::default().build()?,
)
.await?;

session.create_semaphore("my-resource", 1, vec![]).await?;

let mut handles: Vec<JoinHandle<()>> = vec![];
for _ in 0..10 {
let mut client = client.coordination_client();
handles.push(tokio::spawn(async move {
let session = client
.create_session(
"local/test".to_string(),
SessionOptionsBuilder::default().build().unwrap(),
)
.await
.unwrap();

let session_alive_token = session.alive();
tokio::select! {
_ = session_alive_token.cancelled() => {},
_ = mutex_work(session) => {},
}
}));
}

for result in futures_util::future::join_all(handles).await {
result?;
}

Ok(())
}
25 changes: 23 additions & 2 deletions ydb/src/client_coordination/client.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use crate::client::TimeoutSettings;
use crate::grpc_connection_manager::GrpcConnectionManager;
use crate::grpc_wrapper::raw_coordination_service::alter_node::RawAlterNodeRequest;
use crate::grpc_wrapper::raw_coordination_service::common::config::RawCoordinationNodeConfig;
use crate::grpc_wrapper::raw_coordination_service::config::RawCoordinationNodeConfig;
use crate::grpc_wrapper::raw_coordination_service::create_node::RawCreateNodeRequest;
use crate::grpc_wrapper::raw_coordination_service::describe_node::RawDescribeNodeRequest;
use crate::grpc_wrapper::raw_coordination_service::drop_node::RawDropNodeRequest;
use crate::{grpc_wrapper, YdbResult};
use crate::{grpc_wrapper, CoordinationSession, SessionOptions, YdbResult};

use super::list_types::{NodeConfig, NodeDescription};

pub struct CoordinationClient {
timeouts: TimeoutSettings,

session_seq_no: u64,

connection_manager: GrpcConnectionManager,
}

Expand All @@ -21,10 +24,22 @@ impl CoordinationClient {
) -> Self {
Self {
timeouts,
session_seq_no: 0,
connection_manager,
}
}

pub async fn create_session(
&mut self,
path: String,
options: SessionOptions,
) -> YdbResult<CoordinationSession> {
let seq_no = self.session_seq_no;
self.session_seq_no += 1;

CoordinationSession::new(path, seq_no, options, self.connection_manager.clone()).await
}

pub async fn create_node(&mut self, path: String, config: NodeConfig) -> YdbResult<()> {
let req = RawCreateNodeRequest {
config: RawCoordinationNodeConfig::from(config),
Expand Down Expand Up @@ -86,3 +101,9 @@ impl CoordinationClient {
.await
}
}

impl Clone for CoordinationClient {
fn clone(&self) -> Self {
unimplemented!()
}
}
5 changes: 2 additions & 3 deletions ydb/src/client_coordination/list_types.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use derive_builder::Builder;

use crate::grpc_wrapper::raw_coordination_service::common::config::RawCoordinationNodeConfig;
use crate::grpc_wrapper::raw_coordination_service::common::config::{
RawConsistencyMode, RawRateLimiterCountersMode,
use crate::grpc_wrapper::raw_coordination_service::config::{
RawConsistencyMode, RawCoordinationNodeConfig, RawRateLimiterCountersMode,
};
use crate::grpc_wrapper::raw_coordination_service::describe_node::RawDescribeNodeResult;
use crate::{errors, SchemeEntry};
Expand Down
1 change: 1 addition & 0 deletions ydb/src/client_coordination/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod client;
pub mod list_types;
pub mod session;
17 changes: 17 additions & 0 deletions ydb/src/client_coordination/session/acquire_options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use crate::errors;
use derive_builder::Builder;
use std::time::Duration;

#[derive(Builder, Clone)]
#[builder(build_fn(error = "errors::YdbError"))]
#[allow(dead_code)]
pub struct AcquireOptions {
#[builder(default = "Vec::new()")]
pub(crate) data: Vec<u8>,

#[builder(default = "false")]
pub(crate) ephemeral: bool,

#[builder(default = "Duration::from_secs(20)")]
pub(crate) timeout: Duration,
}
72 changes: 72 additions & 0 deletions ydb/src/client_coordination/session/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use std::{
collections::HashMap,
sync::{atomic, Arc},
};
use tracing::log::trace;

use tokio::sync::{mpsc, Mutex};
use ydb_grpc::ydb_proto::coordination::{session_request, SessionRequest};

use crate::{YdbError, YdbResult};

pub trait IdentifiedMessage {
fn id(&self) -> u64;
fn set_id(&mut self, id: u64);
}

pub struct RequestController<Response: IdentifiedMessage> {
last_req_id: atomic::AtomicU64,
messages_sender: mpsc::UnboundedSender<SessionRequest>,
active_requests: Arc<Mutex<HashMap<u64, tokio::sync::mpsc::UnboundedSender<Response>>>>,
}

impl<Response: IdentifiedMessage> RequestController<Response> {
pub fn new(messages_sender: mpsc::UnboundedSender<SessionRequest>) -> Self {
Self {
last_req_id: atomic::AtomicU64::new(0),
messages_sender,
active_requests: Arc::new(Mutex::new(HashMap::new())),
}
}

pub async fn send<Request: IdentifiedMessage + Into<session_request::Request>>(
&self,
mut req: Request,
) -> YdbResult<tokio::sync::mpsc::UnboundedReceiver<Response>> {
let curr_id = self.last_req_id.fetch_add(1, atomic::Ordering::AcqRel);

let (tx, rx): (
tokio::sync::mpsc::UnboundedSender<Response>,
tokio::sync::mpsc::UnboundedReceiver<Response>,
) = tokio::sync::mpsc::unbounded_channel();

req.set_id(curr_id);
self.messages_sender
.send(SessionRequest {
request: Some(req.into()),
})
.map_err(|_| YdbError::Custom("can't send".to_string()))?;

{
let mut active_requests = self.active_requests.lock().await;
active_requests.insert(curr_id, tx);
}

Ok(rx)
}

pub async fn get_response(&self, response: Response) -> YdbResult<()> {
let waiter = self.active_requests.lock().await.remove(&response.id());
match waiter {
Some(sender) => {
sender
.send(response)
.map_err(|_| YdbError::Custom("can't send".to_string()))?;
}
None => {
trace!("got response for already unknown id: {}", response.id());
}
};
Ok(())
}
}
Loading

0 comments on commit 903b6cd

Please sign in to comment.