Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into go/xinfostream
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand committed Feb 4, 2025
2 parents 15633dd + 3685e89 commit e3da0d7
Show file tree
Hide file tree
Showing 50 changed files with 2,814 additions and 263 deletions.
4 changes: 2 additions & 2 deletions .github/json_matrices/build-matrix.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
{
"OS": "ubuntu",
"NAMED_OS": "linux",
"RUNNER": ["self-hosted", "Linux", "ARM64"],
"RUNNER": ["self-hosted", "Linux", "ARM64", "ephemeral"],
"ARCH": "arm64",
"TARGET": "aarch64-unknown-linux-gnu",
"PACKAGE_MANAGERS": ["pypi", "npm", "maven"],
Expand All @@ -32,7 +32,7 @@
"NAMED_OS": "linux",
"ARCH": "arm64",
"TARGET": "aarch64-unknown-linux-musl",
"RUNNER": ["self-hosted", "Linux", "ARM64"],
"RUNNER": ["self-hosted", "Linux", "ARM64", "ephemeral"],
"IMAGE": "node:lts-alpine",
"CONTAINER_OPTIONS": "--user root --privileged --rm",
"PACKAGE_MANAGERS": ["npm"],
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ jobs:
make -k unit-test integ-test
- uses: ./.github/workflows/test-benchmark
if: ${{ matrix.engine.version == '8.0' && matrix.host.OS == 'ubuntu' && matrix.host.RUNNER == 'ubuntu-latest' && matrix.go == '1.22.0' }}
with:
language-flag: -go

Expand Down
24 changes: 1 addition & 23 deletions .github/workflows/install-engine/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,9 @@ runs:
~/valkey
key: valkey-${{ inputs.engine-version }}-${{ inputs.target }}

- name: Build Valkey for ARM
if: ${{ contains(inputs.target, 'aarch64-unknown') }}
shell: bash
working-directory: ~
run: |
cd ~
echo "Building valkey ${{ inputs.engine-version }}"
# check if the valkey repo is already cloned
if [[ ! -d valkey ]]; then
git clone https://github.com/valkey-io/valkey.git
else
# check if the branch=version is already checked out
if [[ $(git branch --show-current) != ${{ inputs.engine-version }} ]]; then
cd valkey
make clean
make distclean
sudo rm -rf /usr/local/bin/redis-* /usr/local/bin/valkey-* ./valkey-* ./redis-* ./dump.rdb
git fetch --all
git checkout ${{ inputs.engine-version }}
git pull
fi
fi
# if no cache hit, build the engine
- name: Build Valkey
if: ${{ steps.cache-valkey.outputs.cache-hit != 'true' && !contains(inputs.target, 'aarch64-unknown') }}
if: ${{ steps.cache-valkey.outputs.cache-hit != 'true' }}
shell: bash
run: |
echo "Building valkey ${{ inputs.engine-version }}"
Expand Down
26 changes: 16 additions & 10 deletions .github/workflows/node.yml
Original file line number Diff line number Diff line change
Expand Up @@ -184,17 +184,19 @@ jobs:
image: ${{ matrix.host.IMAGE }}
options: ${{ join(' -q ', matrix.host.CONTAINER_OPTIONS) }} # adding `-q` to bypass empty options
steps:
- name: Install git
- name: Check if TARGET is x86_64-unknown-linux-musl
continue-on-error: true
run: |
if [[ ${{ contains(matrix.host.TARGET, 'musl') }} == true ]]; then
apk update
apk add --no-cache git tar
elif [[ ${{ contains(matrix.host.IMAGE, 'amazonlinux') }} == true ]]; then
yum update
yum install -y git tar
if [[ "${{ matrix.host.TARGET }}" != "x86_64-unknown-linux-musl" ]]; then
echo "Skipping job: TARGET is not x86_64-unknown-linux-musl"
exit 1
fi
echo IMAGE=amazonlinux:latest | sed -r 's/:/-/g' >> $GITHUB_ENV
# Replace `:` in the variable otherwise it can't be used in `upload-artifact`
- name: Install git
run: |
apk update
apk add --no-cache git tar
- uses: actions/checkout@v4

- name: Setup musl on Linux
Expand Down Expand Up @@ -226,12 +228,16 @@ jobs:
run: npm test
working-directory: ./node

- name: Sanitize IMAGE variable
# Replace `:` in the variable otherwise it can't be used in `upload-artifact`
run: echo "SANITIZED_IMAGE=${{ matrix.host.IMAGE }}" | sed -r 's/:/-/g' >> $GITHUB_ENV

- name: Upload test reports
if: always()
continue-on-error: true
uses: actions/upload-artifact@v4
with:
name: test-report-node-${{ matrix.node }}-${{ matrix.engine.type }}-${{ matrix.engine.version }}-${{ env.IMAGE }}-${{ matrix.host.ARCH }}
name: test-report-node-${{ matrix.node }}-${{ matrix.engine.type }}-${{ matrix.engine.version }}-${{ env.SANITIZED_IMAGE }}-${{ matrix.host.ARCH }}
path: |
node/test-report*.html
utils/clusters/**
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/scale-shr-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:

jobs:
hello-world:
runs-on: [self-hosted, linux, ARM64]
runs-on: [self-hosted, Linux, ARM64, ephemeral]
steps:
- name: print Hello World
run: echo "Hello World"
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
* Java: Shadow `protobuf` dependency ([#2931](https://github.com/valkey-io/valkey-glide/pull/2931))
* Java: Add `RESP2` support ([#2383](https://github.com/valkey-io/valkey-glide/pull/2383))
* Node, Python: Add `IFEQ` option ([#2909](https://github.com/valkey-io/valkey-glide/pull/2909), [#2962](https://github.com/valkey-io/valkey-glide/pull/2962))
* Java: Add `IFEQ` option ([#2978](https://github.com/valkey-io/valkey-glide/pull/2978))

#### Breaking Changes

#### Fixes

* Node: Fix `zrangeWithScores` (disallow `RangeByLex` as it is not supported) ([#2926](https://github.com/valkey-io/valkey-glide/pull/2926))
* Core: improve fix in #2381 ([#2929](https://github.com/valkey-io/valkey-glide/pull/2929))
* Java: Fix `lpopCount` null handling ([#3025](https://github.com/valkey-io/valkey-glide/pull/3025))

#### Operational Enhancements

Expand Down
14 changes: 14 additions & 0 deletions glide-core/redis-rs/redis/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use rand::Rng;
#[cfg(feature = "cluster-async")]
use std::ops::Add;
use std::time::Duration;
use telemetrylib::GlideOpenTelemetryConfig;

#[cfg(feature = "tls-rustls")]
use crate::tls::TlsConnParams;
Expand Down Expand Up @@ -49,6 +50,7 @@ struct BuilderParams {
response_timeout: Option<Duration>,
protocol: ProtocolVersion,
pubsub_subscriptions: Option<PubSubSubscriptionInfo>,
open_telemetry_config: Option<GlideOpenTelemetryConfig>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -402,6 +404,18 @@ impl ClusterClientBuilder {
self
}

/// Set OpenTelemetry configuration for this client
///
/// # Parameters
/// - `open_telemetry_config`: Use the `open_telemetry_config` property to specify the endpoint of the collector to export the measurments.
pub fn open_telemetry_config(
mut self,
open_telemetry_config: GlideOpenTelemetryConfig,
) -> ClusterClientBuilder {
self.builder_params.open_telemetry_config = Some(open_telemetry_config);
self
}

/// Enables periodic topology checks for this client.
///
/// If enabled, periodic topology checks will be executed at the configured intervals to examine whether there
Expand Down
76 changes: 76 additions & 0 deletions glide-core/redis-rs/redis/tests/test_cluster_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ mod cluster_async {
use futures_time::{future::FutureExt, task::sleep};
use once_cell::sync::Lazy;
use std::ops::Add;
use std::str::FromStr;
use telemetrylib::*;

use redis::{
aio::{ConnectionLike, MultiplexedConnection},
Expand Down Expand Up @@ -206,6 +208,80 @@ mod cluster_async {
.unwrap();
}

#[tokio::test]
async fn test_async_open_telemetry_config() {
let glide_ot_config = GlideOpenTelemetryConfigBuilder::default()
.with_flush_interval(std::time::Duration::from_millis(400))
.with_trace_exporter(
GlideOpenTelemetryTraceExporter::from_str("http://valid-url.com").unwrap(),
)
.build();
let result = std::panic::catch_unwind(|| {
GlideOpenTelemetry::initialise(glide_ot_config.clone());
});
assert!(result.is_err(), "Expected a panic but no panic occurred");

// Check the panic message
if let Err(err) = result {
let panic_msg = err
.downcast_ref::<String>()
.map(String::as_str)
.or_else(|| err.downcast_ref::<&str>().copied())
.unwrap_or("Unknown panic message");

assert!(
panic_msg.contains("not yet implemented: HTTP protocol is not implemented yet!"),
"Unexpected panic message: {}",
panic_msg
);
}
}

#[tokio::test]
async fn test_async_open_telemetry_invalid_config() {
let result = GlideOpenTelemetryTraceExporter::from_str("invalid-protocol.com");
assert!(result.is_err(), "Expected `from_str` to return an error");
assert_eq!(
result.unwrap_err().kind(),
std::io::ErrorKind::InvalidInput,
"Expected ErrorKind::InvalidInput"
);
}

#[tokio::test]
async fn test_async_open_telemetry_interval_config() {
let exporter = GlideOpenTelemetryTraceExporter::from_str("http://valid-url.com").unwrap();
let glide_ot_config = GlideOpenTelemetryConfigBuilder::default()
.with_flush_interval(std::time::Duration::from_millis(400))
.with_trace_exporter(exporter.clone())
.build();
assert_eq!(GlideOpenTelemetry::get_span_interval(glide_ot_config), 400);
// check the default interval
let glide_ot_config = GlideOpenTelemetryConfigBuilder::default()
.with_trace_exporter(exporter)
.build();
assert_eq!(
GlideOpenTelemetry::get_span_interval(glide_ot_config.clone()),
5000
);

let cluster = TestClusterContext::new(3, 0);

let cluster_addresses: Vec<_> = cluster
.cluster
.servers
.iter()
.map(|server| server.connection_info())
.collect();
ClusterClient::builder(cluster_addresses.clone())
.open_telemetry_config(glide_ot_config.clone())
.build()
.unwrap()
.get_async_connection(None)
.await
.unwrap();
}

#[tokio::test]
async fn test_routing_by_slot_to_replica_with_az_affinity_strategy_to_half_replicas() {
// Skip test if version is less then Valkey 8.0
Expand Down
21 changes: 21 additions & 0 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use redis::{
};
pub use standalone_client::StandaloneClient;
use std::io;
use std::str::FromStr;
use std::sync::atomic::{AtomicIsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -28,6 +29,7 @@ mod reconnecting_connection;
mod standalone_client;
mod value_conversion;
use redis::InfoDict;
use telemetrylib::*;
use tokio::sync::mpsc;
use versions::Versioning;

Expand Down Expand Up @@ -661,13 +663,15 @@ pub enum ConnectionError {
Standalone(standalone_client::StandaloneClientConnectionError),
Cluster(redis::RedisError),
Timeout,
IoError(std::io::Error),
}

impl std::fmt::Debug for ConnectionError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Standalone(arg0) => f.debug_tuple("Standalone").field(arg0).finish(),
Self::Cluster(arg0) => f.debug_tuple("Cluster").field(arg0).finish(),
Self::IoError(arg0) => f.debug_tuple("IoError").field(arg0).finish(),
Self::Timeout => write!(f, "Timeout"),
}
}
Expand All @@ -678,6 +682,7 @@ impl std::fmt::Display for ConnectionError {
match self {
ConnectionError::Standalone(err) => write!(f, "{err:?}"),
ConnectionError::Cluster(err) => write!(f, "{err}"),
ConnectionError::IoError(err) => write!(f, "{err}"),
ConnectionError::Timeout => f.write_str("connection attempt timed out"),
}
}
Expand Down Expand Up @@ -800,6 +805,22 @@ impl Client {
let inflight_requests_allowed = Arc::new(AtomicIsize::new(
inflight_requests_limit.try_into().unwrap(),
));

if let Some(endpoint_str) = &request.otel_endpoint {
let trace_exporter = GlideOpenTelemetryTraceExporter::from_str(endpoint_str.as_str())
.map_err(ConnectionError::IoError)?;
let config = GlideOpenTelemetryConfigBuilder::default()
.with_flush_interval(std::time::Duration::from_millis(
request
.otel_span_flush_interval_ms
.unwrap_or(DEFAULT_FLUSH_SPAN_INTERVAL_MS),
))
.with_trace_exporter(trace_exporter)
.build();

GlideOpenTelemetry::initialise(config);
};

tokio::time::timeout(DEFAULT_CLIENT_CREATION_TIMEOUT, async move {
let internal_client = if request.cluster_mode_enabled {
let client = create_cluster_client(request, push_sender)
Expand Down
7 changes: 7 additions & 0 deletions glide-core/src/client/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub struct ConnectionRequest {
pub periodic_checks: Option<PeriodicCheck>,
pub pubsub_subscriptions: Option<redis::PubSubSubscriptionInfo>,
pub inflight_requests_limit: Option<u32>,
pub otel_endpoint: Option<String>,
pub otel_span_flush_interval_ms: Option<u64>,
}

pub struct AuthenticationInfo {
Expand Down Expand Up @@ -206,6 +208,9 @@ impl From<protobuf::ConnectionRequest> for ConnectionRequest {

let inflight_requests_limit = none_if_zero(value.inflight_requests_limit);

let otel_endpoint = chars_to_string_option(&value.opentelemetry_config.collector_end_point);
let otel_span_flush_interval_ms = value.opentelemetry_config.span_flush_interval;

ConnectionRequest {
read_from,
client_name,
Expand All @@ -221,6 +226,8 @@ impl From<protobuf::ConnectionRequest> for ConnectionRequest {
periodic_checks,
pubsub_subscriptions,
inflight_requests_limit,
otel_endpoint,
otel_span_flush_interval_ms,
}
}
}
7 changes: 7 additions & 0 deletions glide-core/src/protobuf/connection_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ message PubSubSubscriptions
map<uint32, PubSubChannelsOrPatterns> channels_or_patterns_by_type = 1;
}

message OpenTelemetryConfig
{
string collector_end_point = 1;
optional uint64 span_flush_interval= 2;
}

// IMPORTANT - if you add fields here, you probably need to add them also in client/mod.rs:`sanitized_request_string`.
message ConnectionRequest {
repeated NodeAddress addresses = 1;
Expand All @@ -72,6 +78,7 @@ message ConnectionRequest {
uint32 inflight_requests_limit = 14;
string client_az = 15;
uint32 connection_timeout = 16;
OpenTelemetryConfig opentelemetry_config = 17;
}

message ConnectionRetryStrategy {
Expand Down
1 change: 1 addition & 0 deletions glide-core/telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ serde_json = "1"
chrono = "0"
futures-util = "0"
tokio = { version = "1", features = ["macros", "time"] }
url = "2"

opentelemetry = "0"
opentelemetry_sdk = { version = "0", features = ["rt-tokio"] }
2 changes: 1 addition & 1 deletion glide-core/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::RwLock as StdRwLock;
mod open_telemetry;
mod open_telemetry_exporter_file;

pub use open_telemetry::{GlideOpenTelemetry, GlideSpan};
pub use open_telemetry::*;
pub use open_telemetry_exporter_file::SpanExporterFile;

#[derive(Default, Serialize)]
Expand Down
Loading

0 comments on commit e3da0d7

Please sign in to comment.