Skip to content

Commit

Permalink
Updated install_and_test.sh so that it accounts for babushkaclient.
Browse files Browse the repository at this point in the history
Updated lib.rs with types for SuccessCallback and FailureCallback to automate binding creation.

Reformatted project structure for babushkaclient and also the benchmarkApp.

Cleaned up babushkaclient
-Got rid of Callback indexes
-Fixed naming conventions
  • Loading branch information
SanHalacogluImproving committed Nov 9, 2023
1 parent cb49434 commit ebde38f
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 118 deletions.
13 changes: 10 additions & 3 deletions benchmarks/install_and_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,11 @@ function runRustBenchmark(){
}

function runGoBenchmark() {
cd ${BENCH_FOLDER}/../go/benchmarks/main/benchmarkApp
echo "go run main.go --resultsFile=${BENCH_FOLDER}/$1 --concurrentTasks "$concurrentTasks" --dataSize "$2" --clients $chosenClients --host $host --clientCount "$clientCount" $tlsFlag $clusterFlag $portFlag"
go run main.go --resultsFile=${BENCH_FOLDER}/$1 --concurrentTasks "$concurrentTasks" --dataSize "$2" --clients $chosenClients --host $host --clientCount "$clientCount" $tlsFlag $clusterFlag $portFlag
cd ${BENCH_FOLDER}/../go/benchmarks/benchmarkApp
echo "Compiling Go code..."
go build -o main main.go
echo "./main --resultsFile=${BENCH_FOLDER}/$1 --concurrentTasks "$concurrentTasks" --dataSize "$2" --clients $chosenClients --host $host --clientCount "$clientCount" $tlsFlag $clusterFlag $portFlag"
./main --resultsFile=${BENCH_FOLDER}/$1 --concurrentTasks "$concurrentTasks" --dataSize "$2" --clients $chosenClients --host $host --clientCount "$clientCount" $tlsFlag $clusterFlag $portFlag
}


Expand Down Expand Up @@ -212,6 +214,11 @@ do
runGo=1
chosenClients="go-redis"
;;
-go-babushka)
runAllBenchmarks=0
runGo=1
chosenClients="go-babushka"
;;
-only-socket)
chosenClients="socket"
;;
Expand Down
84 changes: 0 additions & 84 deletions go/benchmarks/asyncClientRawFFI/asyncredisclient.go

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "gorustffi"
name = "babushkaclient"
version = "0.1.0"
edition = "2021"
build = "build.rs"
Expand All @@ -12,7 +12,7 @@ cbindgen = "0.20"
crate-type = ["cdylib"]

[dependencies]
redis = { path = "../../../submodules/redis-rs/redis", features = ["aio", "tokio-comp", "tls", "tokio-native-tls-comp"] }
redis = { path = "../../../submodules/redis-rs/redis", features = ["aio", "tokio-comp", "tls", "tokio-native-tls-comp", "tls-rustls-insecure"] }
babushka = { path = "../../../babushka-core" }
tokio = { version = "^1", features = ["rt", "macros", "rt-multi-thread", "time"] }
num-derive = "0.4.0"
Expand Down
83 changes: 83 additions & 0 deletions go/benchmarks/babushkaclient/babushkaclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package babushkaclient

/*
#cgo LDFLAGS: -L./target/release -lbabushkaclient
#include "lib.h"
void successCallback(char *message, uintptr_t channelPtr);
void failureCallback(uintptr_t channelPtr);
*/
import "C"

import (
"fmt"
"github.com/aws/babushka/go/benchmarks"
"unsafe"
)

type BabushkaRedisClient struct {
coreClient unsafe.Pointer
}

//export successCallback
func successCallback(message *C.char, channelPtr C.uintptr_t) {
goMessage := C.GoString(message)
channelAddress := uintptr(channelPtr)
channel := *(*chan string)(unsafe.Pointer(channelAddress))
channel <- goMessage
}

//export failureCallback
func failureCallback(channelPtr C.uintptr_t) {
panic("Failure for get or set")
}

func (babushkaRedisClient *BabushkaRedisClient) ConnectToRedis(connectionSettings *benchmarks.ConnectionSettings) error {
caddress := C.CString(connectionSettings.Host)
defer C.free(unsafe.Pointer(caddress))

babushkaRedisClient.coreClient = C.create_connection(caddress, C.uint32_t(connectionSettings.Port), C._Bool(connectionSettings.UseSsl), C._Bool(connectionSettings.ClusterModeEnabled), (C.SuccessCallback)(unsafe.Pointer(C.successCallback)), (C.FailureCallback)(unsafe.Pointer(C.failureCallback)))
if babushkaRedisClient.coreClient == nil {
return fmt.Errorf("error connecting to babushkaRedisClient")
}
return nil
}

func (babushkaRedisClient *BabushkaRedisClient) Set(key string, value interface{}) error {
strValue := fmt.Sprintf("%v", value)
ckey := C.CString(key)
cval := C.CString(strValue)
defer C.free(unsafe.Pointer(ckey))
defer C.free(unsafe.Pointer(cval))

result := make(chan string)
chAddress := uintptr(unsafe.Pointer(&result))

C.set(babushkaRedisClient.coreClient, ckey, cval, C.uintptr_t(chAddress))

<-result

return nil
}

func (babushkaRedisClient *BabushkaRedisClient) Get(key string) (string, error) {
ckey := C.CString(key)
defer C.free(unsafe.Pointer(ckey))

result := make(chan string)
chAddress := uintptr(unsafe.Pointer(&result))

C.get(babushkaRedisClient.coreClient, ckey, C.uintptr_t(chAddress))
value := <-result

return value, nil
}

func (babushkaRedisClient *BabushkaRedisClient) CloseConnection() error {
C.close_connection(babushkaRedisClient.coreClient)
return nil
}

func (babushkaRedisClient *BabushkaRedisClient) GetName() string {
return "babushka"
}
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@ pub enum Level {
Trace = 4,
}


pub type SuccessCallback = unsafe extern "C" fn(message: *const c_char, channel_address: usize) -> ();
pub type FailureCallback = unsafe extern "C" fn(channel_address: usize) -> ();



pub struct Connection {
connection: BabushkaClient,
success_callback: unsafe extern "C" fn(usize, *const c_char, usize) -> (),
failure_callback: unsafe extern "C" fn(usize) -> (), // TODO - add specific error codes
success_callback: SuccessCallback,
failure_callback: FailureCallback, // TODO - add specific error codes
runtime: Runtime,
}

Expand Down Expand Up @@ -52,8 +58,8 @@ fn create_connection_internal(
port: u32,
use_tls: bool,
use_cluster_mode: bool,
success_callback: unsafe extern "C" fn(usize, *const c_char, usize) -> (),
failure_callback: unsafe extern "C" fn(usize) -> (),
success_callback: SuccessCallback,
failure_callback: FailureCallback,
) -> RedisResult<Connection> {
let host_cstring = unsafe { CStr::from_ptr(host as *mut c_char) };
let host_string = host_cstring.to_str()?.to_string();
Expand All @@ -79,8 +85,8 @@ pub extern "C" fn create_connection(
port: u32,
use_tls: bool,
use_cluster_mode: bool,
success_callback: unsafe extern "C" fn(usize, *const c_char, usize) -> (),
failure_callback: unsafe extern "C" fn(usize) -> (),
success_callback: SuccessCallback,
failure_callback: FailureCallback,
) -> *const c_void {
match create_connection_internal(host, port, use_tls, use_cluster_mode, success_callback, failure_callback) {
Err(_) => std::ptr::null(), // TODO - log errors
Expand All @@ -99,7 +105,6 @@ pub extern "C" fn close_connection(connection_ptr: *const c_void) {
#[no_mangle]
pub extern "C" fn set(
connection_ptr: *const c_void,
callback_index: usize,
key: *const c_char,
value: *const c_char,
channel: usize
Expand All @@ -120,8 +125,8 @@ pub extern "C" fn set(
unsafe {
let client = Box::leak(Box::from_raw(ptr_address as *mut Connection));
match result {
Ok(_) => (client.success_callback)(callback_index, std::ptr::null(), channel),
Err(_) => (client.failure_callback)(callback_index), // TODO - report errors
Ok(_) => (client.success_callback)(std::ptr::null(), channel),
Err(_) => (client.failure_callback)(channel), // TODO - report errors
};
}
});
Expand All @@ -130,7 +135,7 @@ pub extern "C" fn set(
/// Expects that key will be kept valid until the callback is called. If the callback is called with a string pointer, the pointer must
/// be used synchronously, because the string will be dropped after the callback.
#[no_mangle]
pub extern "C" fn get(connection_ptr: *const c_void, callback_index: usize, key: *const c_char, channel: usize) {
pub extern "C" fn get(connection_ptr: *const c_void, key: *const c_char, channel: usize) {
let connection = unsafe { Box::leak(Box::from_raw(connection_ptr as *mut Connection)) };
// The safety of this needs to be ensured by the calling code. Cannot dispose of the pointer before all operations have completed.
let ptr_address = connection_ptr as usize;
Expand All @@ -146,17 +151,17 @@ pub extern "C" fn get(connection_ptr: *const c_void, callback_index: usize, key:
let value = match result {
Ok(value) => value,
Err(_) => {
unsafe { (connection.failure_callback)(callback_index) }; // TODO - report errors,
unsafe { (connection.failure_callback)(channel) }; // TODO - report errors,
return;
}
};
let result = Option::<CString>::from_redis_value(&value);

unsafe {
match result {
Ok(None) => (connection.success_callback)(callback_index, std::ptr::null(), channel),
Ok(Some(c_str)) => (connection.success_callback)(callback_index, c_str.as_ptr(), channel),
Err(_) => (connection.failure_callback)(callback_index), // TODO - report errors
Ok(None) => (connection.success_callback)(std::ptr::null(), channel),
Ok(Some(c_str)) => (connection.success_callback)(c_str.as_ptr(), channel),
Err(_) => (connection.failure_callback)(channel), // TODO - report errors
};
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"flag"
"fmt"
"github.com/aws/babushka/go/benchmarks"
"github.com/aws/babushka/go/benchmarks/asyncClientRawFFI"
"github.com/aws/babushka/go/benchmarks/babushkaclient"
"os"
"regexp"
"strconv"
Expand Down Expand Up @@ -48,19 +48,18 @@ type runConfiguration struct {
}

type clientNameOptions struct {
goRedis string
babushka string
all string
goRedis string
goBabushka string
all string
}

var clientNameOpts = clientNameOptions{
goRedis: "go-redis",
babushka: "babushka",
all: "all",
goRedis: "go-redis",
goBabushka: "go-babushka",
all: "all",
}

func main() {

opts := parseArguments()

runConfig, err := verifyOptions(opts)
Expand Down Expand Up @@ -88,7 +87,7 @@ func parseArguments() *options {
port := flag.Int("port", 6379, "Port number")
resultsFile := flag.String("resultsFile", "", "Path to results file")
clientCount := flag.String("clientCount", "[1]", "Client Count")
clientNames := flag.String("clients", "all", "One of: all|go-redis|babushka")
clientNames := flag.String("clients", "all", "One of: all|go-redis|go-babushka")
configuration := flag.String("configuration", "Release", "Configuration flag")
concurrentTasks := flag.String("concurrentTasks", "[1 10 100]", "Number of concurrent tasks")
dataSize := flag.String("dataSize", "[100 4000]", "Data block size")
Expand Down Expand Up @@ -147,13 +146,13 @@ func verifyOptions(opts *options) (*runConfiguration, error) {
case strings.EqualFold(opts.clientNames, clientNameOpts.goRedis):
runConfig.clientNames = append(runConfig.clientNames, clientNameOpts.goRedis)

case strings.EqualFold(opts.clientNames, clientNameOpts.babushka):
runConfig.clientNames = append(runConfig.clientNames, clientNameOpts.babushka)
case strings.EqualFold(opts.clientNames, clientNameOpts.goBabushka):
runConfig.clientNames = append(runConfig.clientNames, clientNameOpts.goBabushka)

case strings.EqualFold(opts.clientNames, clientNameOpts.all):
runConfig.clientNames = append(runConfig.clientNames, clientNameOpts.goRedis, clientNameOpts.babushka)
runConfig.clientNames = append(runConfig.clientNames, clientNameOpts.goRedis, clientNameOpts.goBabushka)
default:
return nil, fmt.Errorf("invalid clients option: all|go-redis|babushka")
return nil, fmt.Errorf("invalid clients option: all|go-redis|go-babushka")
}

runConfig.host = opts.host
Expand Down Expand Up @@ -236,8 +235,8 @@ func createClients(clientCount int, clientType string, connectionSettings *bench
switch clientType {
case clientNameOpts.goRedis:
client = &benchmarks.GoRedisClient{}
case clientNameOpts.babushka:
client = &asyncClientRawFFI.AsyncRedisClient{}
case clientNameOpts.goBabushka:
client = &babushkaclient.BabushkaRedisClient{}
}
err := client.ConnectToRedis(connectionSettings)
if err != nil {
Expand Down

0 comments on commit ebde38f

Please sign in to comment.