diff --git a/.env.example b/.env.example index cd533521985d..ad231bd259a3 100644 --- a/.env.example +++ b/.env.example @@ -70,6 +70,9 @@ OPENDAL_SLED_TREE=sled-tree # memcached OPENDAL_MEMCACHED_ENDPOINT=tcp://127.0.0.1:11211 OPENDAL_MEMCACHED_ROOT=/ +OPENDAL_MEMCACHED_ENABLE_TLS=true +OPENDAL_MEMCACHED_TLS_KEY=/ +OPENDAL_MEMCACHED_TLS_CERT=/ # webdav OPENDAL_WEBDAV_ROOT=/tmp/opendal/ OPENDAL_WEBDAV_ENDPOINT=http://127.0.0.1:8080 diff --git a/core/Cargo.lock b/core/Cargo.lock index 28417ce9e28b..49e47854a7a2 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -2116,6 +2116,16 @@ dependencies = [ "libc", ] +[[package]] +name = "core-foundation" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b55271e5c8c478ad3f38ad24ef34923091e0548492a266d19b3c0b4d82574c63" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -3814,9 +3824,9 @@ dependencies = [ "rustls 0.23.15", "rustls-pki-types", "tokio", - "tokio-rustls 0.26.0", + "tokio-rustls 0.26.1", "tower-service", - "webpki-roots 0.26.6", + "webpki-roots 0.26.7", ] [[package]] @@ -4749,7 +4759,7 @@ dependencies = [ "openssl-probe", "openssl-sys", "schannel", - "security-framework", + "security-framework 2.11.1", "security-framework-sys", "tempfile", ] @@ -5052,6 +5062,8 @@ dependencies = [ "reqwest", "rocksdb", "rust-nebula", + "rustls 0.23.15", + "rustls-native-certs 0.8.1", "serde", "serde_json", "sha1", @@ -5064,6 +5076,7 @@ dependencies = [ "surrealdb", "tikv-client", "tokio", + "tokio-rustls 0.26.1", "tracing", "tracing-opentelemetry", "tracing-subscriber", @@ -6425,7 +6438,7 @@ dependencies = [ "tokio", "tokio-native-tls", "tokio-retry2", - "tokio-rustls 0.26.0", + "tokio-rustls 0.26.1", "tokio-util", "url", ] @@ -6613,7 +6626,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper 1.0.1", "tokio", - "tokio-rustls 0.26.0", + "tokio-rustls 0.26.1", "tokio-util", "tower-service", "url", @@ -6621,7 +6634,7 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", - "webpki-roots 0.26.6", + "webpki-roots 0.26.7", "windows-registry", ] @@ -6973,7 +6986,7 @@ dependencies = [ "openssl-probe", "rustls-pemfile 1.0.4", "schannel", - "security-framework", + "security-framework 2.11.1", ] [[package]] @@ -6986,7 +6999,19 @@ dependencies = [ "rustls-pemfile 2.2.0", "rustls-pki-types", "schannel", - "security-framework", + "security-framework 2.11.1", +] + +[[package]] +name = "rustls-native-certs" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3" +dependencies = [ + "openssl-probe", + "rustls-pki-types", + "schannel", + "security-framework 3.1.0", ] [[package]] @@ -7144,7 +7169,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ "bitflags 2.6.0", - "core-foundation", + "core-foundation 0.9.4", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81d3f8c9bfcc3cbb6b0179eb57042d75b1582bdc65c3cb95f3fa999509c03cbc" +dependencies = [ + "bitflags 2.6.0", + "core-foundation 0.10.0", "core-foundation-sys", "libc", "security-framework-sys", @@ -7152,9 +7190,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.12.0" +version = "2.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea4a292869320c0272d7bc55a5a6aafaff59b4f63404a003887b679a2e05b4b6" +checksum = "1863fd3768cd83c56a7f60faa4dc0d403f1b6df0a38c3c25f44b7894e45370d5" dependencies = [ "core-foundation-sys", "libc", @@ -7615,7 +7653,7 @@ dependencies = [ "tokio-stream", "tracing", "url", - "webpki-roots 0.26.6", + "webpki-roots 0.26.7", ] [[package]] @@ -8413,12 +8451,11 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.26.0" +version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" +checksum = "5f6d0975eaace0cf0fcadee4e4aaa5da15b5c079146f2cffb67c113be122bf37" dependencies = [ "rustls 0.23.15", - "rustls-pki-types", "tokio", ] @@ -8444,9 +8481,9 @@ dependencies = [ "rustls 0.23.15", "rustls-pki-types", "tokio", - "tokio-rustls 0.26.0", + "tokio-rustls 0.26.1", "tungstenite", - "webpki-roots 0.26.6", + "webpki-roots 0.26.7", ] [[package]] @@ -8534,7 +8571,7 @@ dependencies = [ "rustls-pemfile 2.2.0", "socket2", "tokio", - "tokio-rustls 0.26.0", + "tokio-rustls 0.26.1", "tokio-stream", "tower 0.4.13", "tower-layer", @@ -8889,7 +8926,7 @@ dependencies = [ "rustls 0.23.15", "rustls-pki-types", "url", - "webpki-roots 0.26.6", + "webpki-roots 0.26.7", ] [[package]] @@ -9190,9 +9227,9 @@ checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" [[package]] name = "webpki-roots" -version = "0.26.6" +version = "0.26.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "841c67bff177718f1d4dfefde8d8f0e78f9b6589319ba88312f567fc5841a958" +checksum = "5d642ff16b7e79272ae451b7322067cdc17cadf68c23264be9d94a32319efe7e" dependencies = [ "rustls-pki-types", ] diff --git a/core/Cargo.toml b/core/Cargo.toml index d404845dd3e2..92b4115b08ab 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -159,7 +159,12 @@ services-ipmfs = [] services-koofr = [] services-lakefs = [] services-libsql = ["dep:hrana-client-proto"] -services-memcached = ["dep:bb8"] +services-memcached = [ + "dep:bb8", + "dep:tokio-rustls", + "dep:rustls", + "dep:rustls-native-certs", +] services-memory = [] services-mini-moka = ["dep:mini-moka"] services-moka = ["dep:moka"] @@ -359,6 +364,13 @@ monoio = { version = "0.2.4", optional = true, features = [ "unlinkat", "renameat", ] } +# for services-memcached +rustls = { version = "0.23.15", default-features = false, features = [ + "std", +], optional = true } +rustls-native-certs = {version = "0.8.1", optional = true} +tokio-rustls = { version = "0.26.1", optional = true } + # Layers # for layers-async-backtrace diff --git a/core/src/services/memcached/backend.rs b/core/src/services/memcached/backend.rs index 5ddfa4b9c114..c821582cf4a0 100644 --- a/core/src/services/memcached/backend.rs +++ b/core/src/services/memcached/backend.rs @@ -15,18 +15,22 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; use std::time::Duration; -use bb8::RunError; -use tokio::net::TcpStream; -use tokio::sync::OnceCell; - use super::binary; use crate::raw::adapters::kv; use crate::raw::*; use crate::services::MemcachedConfig; use crate::*; +use bb8::RunError; +use rustls::pki_types::pem::PemObject; +use rustls::pki_types::{CertificateDer, PrivateKeyDer, ServerName}; +use tokio::net::TcpStream; +use tokio::sync::OnceCell; +use tokio_rustls::TlsConnector; + impl Configurator for MemcachedConfig { type Builder = MemcachedBuilder; fn into_builder(self) -> Self::Builder { @@ -82,6 +86,24 @@ impl MemcachedBuilder { self.config.default_ttl = Some(ttl); self } + + /// Set the tls connect on. + pub fn enable_tls(mut self, enable_tls: bool) -> Self { + self.config.enable_tls = enable_tls; + self + } + + /// Set the tls key path. + pub fn tls_key(mut self, tls_key: &str) -> Self { + self.config.tls_key = Some(tls_key.to_string()); + self + } + + /// Set the tls cert path. + pub fn tls_cert(mut self, tls_cert: &str) -> Self { + self.config.tls_cert = Some(tls_cert.to_string()); + self + } } impl Builder for MemcachedBuilder { @@ -126,6 +148,22 @@ impl Builder for MemcachedBuilder { .with_context("endpoint", &endpoint), ); }; + if self.config.enable_tls { + rustls::crypto::ring::default_provider() + .install_default() + .map_err(|_err| { + Error::new( + ErrorKind::Unexpected, + "no process-level CryptoProvider available", + ) + })?; + ServerName::try_from(host.clone()).map_err(|err| { + Error::new(ErrorKind::ConfigInvalid, "Invalid dns name error") + .with_context("service", Scheme::Memcached) + .with_context("host", &host) + .set_source(err) + })?; + } let port = if let Some(port) = uri.port_u16() { port } else { @@ -150,6 +188,9 @@ impl Builder for MemcachedBuilder { endpoint, username: self.config.username.clone(), password: self.config.password.clone(), + enable_tls: self.config.enable_tls, + tls_key: self.config.tls_key.clone(), + tls_cert: self.config.tls_cert.clone(), conn, default_ttl: self.config.default_ttl, }) @@ -166,6 +207,9 @@ pub struct Adapter { username: Option, password: Option, default_ttl: Option, + enable_tls: bool, + tls_key: Option, + tls_cert: Option, conn: OnceCell>, } @@ -178,6 +222,9 @@ impl Adapter { &self.endpoint, self.username.clone(), self.password.clone(), + self.enable_tls, + self.tls_key.clone(), + self.tls_cert.clone(), ); bb8::Pool::builder().build(mgr).await.map_err(|err| { @@ -246,14 +293,27 @@ struct MemcacheConnectionManager { address: String, username: Option, password: Option, + enable_tls: bool, + tls_key: Option, + tls_cert: Option, } impl MemcacheConnectionManager { - fn new(address: &str, username: Option, password: Option) -> Self { + fn new( + address: &str, + username: Option, + password: Option, + enable_tls: bool, + tls_key: Option, + tls_cert: Option, + ) -> Self { Self { address: address.to_string(), username, password, + enable_tls, + tls_key, + tls_cert, } } } @@ -265,11 +325,67 @@ impl bb8::ManageConnection for MemcacheConnectionManager { /// TODO: Implement unix stream support. async fn connect(&self) -> Result { - let conn = TcpStream::connect(&self.address) - .await - .map_err(new_std_io_error)?; - let mut conn = binary::Connection::new(conn); + let mut conn = if self.enable_tls { + let mut root_cert_store = rustls::RootCertStore::empty(); + + let native_certs = rustls_native_certs::load_native_certs() + .expect("errors occurred while loading certificates"); + for cert in native_certs { + root_cert_store.add(cert).map_err(|err| { + Error::new(ErrorKind::Unexpected, "load cafile failed").set_source(err) + })?; + } + let tls_config = + rustls::ClientConfig::builder().with_root_certificates(root_cert_store); + let config = if let (Some(cert_path), Some(key_path)) = (&self.tls_cert, &self.tls_key) + { + let cert_chain = CertificateDer::pem_file_iter(cert_path) + .map_err(|err| { + Error::new(ErrorKind::Unexpected, "load tls cert failed").set_source(err) + })? + .filter_map(Result::ok) + .collect(); + let key_der = PrivateKeyDer::from_pem_file(key_path).map_err(|err| { + Error::new(ErrorKind::Unexpected, "load tls key failed").set_source(err) + })?; + + tls_config + .with_client_auth_cert(cert_chain, key_der) + .map_err(|err| { + Error::new(ErrorKind::Unexpected, "build tls client failed").set_source(err) + })? + } else { + tls_config.with_no_client_auth() + }; + + let connector = TlsConnector::from(Arc::new(config)); + let conn = TcpStream::connect(&self.address) + .await + .map_err(new_std_io_error)?; + + let uri = http::Uri::try_from(&self.address).expect("unreachable!"); + let host = uri.host().expect("unreachable!"); + let domain = ServerName::try_from(host) + .map_err(|err| { + Error::new(ErrorKind::ConfigInvalid, "Invalid dns name error") + .with_context("service", Scheme::Memcached) + .with_context("address", &self.address) + .set_source(err) + })? + .to_owned(); + + let conn = connector.connect(domain, conn).await.map_err(|err| { + Error::new(ErrorKind::Unexpected, "tls connect failed").set_source(err) + })?; + binary::Connection::new(Box::new(conn)) + } else { + let conn = TcpStream::connect(&self.address) + .await + .map_err(new_std_io_error)?; + + binary::Connection::new(Box::new(conn)) + }; if let (Some(username), Some(password)) = (self.username.as_ref(), self.password.as_ref()) { conn.auth(username, password).await?; } diff --git a/core/src/services/memcached/binary.rs b/core/src/services/memcached/binary.rs index f24db3a4dbe2..d6b82b940c78 100644 --- a/core/src/services/memcached/binary.rs +++ b/core/src/services/memcached/binary.rs @@ -15,15 +15,13 @@ // specific language governing permissions and limitations // under the License. -use tokio::io::AsyncReadExt; -use tokio::io::AsyncWriteExt; -use tokio::io::BufReader; -use tokio::io::{self}; -use tokio::net::TcpStream; - use crate::raw::*; use crate::*; +use tokio::io::{self, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader}; +use tokio::net::TcpStream; +use tokio_rustls::client::TlsStream; + pub(super) mod constants { pub const OK_STATUS: u16 = 0x0; pub const KEY_NOT_FOUND: u16 = 0x1; @@ -61,7 +59,7 @@ pub struct PacketHeader { } impl PacketHeader { - pub async fn write(self, writer: &mut TcpStream) -> io::Result<()> { + pub async fn write(self, writer: &mut T) -> io::Result<()> { writer.write_u8(self.magic).await?; writer.write_u8(self.opcode).await?; writer.write_u16(self.key_length).await?; @@ -74,7 +72,9 @@ impl PacketHeader { Ok(()) } - pub async fn read(reader: &mut TcpStream) -> Result { + pub async fn read( + reader: &mut T, + ) -> Result { let header = PacketHeader { magic: reader.read_u8().await?, opcode: reader.read_u8().await?, @@ -98,11 +98,11 @@ pub struct Response { } pub struct Connection { - io: BufReader, + io: BufReader>, } impl Connection { - pub fn new(io: TcpStream) -> Self { + pub fn new(io: Box) -> Self { Self { io: BufReader::new(io), } @@ -246,8 +246,12 @@ impl Connection { } } -pub async fn parse_response(reader: &mut TcpStream) -> Result { - let header = PacketHeader::read(reader).await.map_err(new_std_io_error)?; +pub async fn parse_response( + reader: &mut T, +) -> Result { + let header = PacketHeader::read::(reader) + .await + .map_err(new_std_io_error)?; if header.vbucket_id_or_status != constants::OK_STATUS && header.vbucket_id_or_status != constants::KEY_NOT_FOUND @@ -287,3 +291,12 @@ pub async fn parse_response(reader: &mut TcpStream) -> Result { value, }) } + +#[async_trait::async_trait] +pub trait Connect: + AsyncWrite + std::marker::Unpin + tokio::io::AsyncRead + std::marker::Send +{ +} + +impl Connect for TcpStream {} +impl Connect for TlsStream {} diff --git a/core/src/services/memcached/config.rs b/core/src/services/memcached/config.rs index f0b5815ff7e6..a34556729a32 100644 --- a/core/src/services/memcached/config.rs +++ b/core/src/services/memcached/config.rs @@ -40,4 +40,10 @@ pub struct MemcachedConfig { pub password: Option, /// The default ttl for put operations. pub default_ttl: Option, + /// default is false + pub enable_tls: bool, + /// Path to the CA certificate for TLS key. + pub tls_key: Option, + /// Path to the CA certificate for TLS cert. + pub tls_cert: Option, }