Skip to content

Commit

Permalink
redis: alternate healthcheck (envoyproxy#2644)
Browse files Browse the repository at this point in the history
Adds EXISTS healthcheck when key is specified in config. This allows the user to fail the healthcheck. See the data-plane changes linked below for more details.

Risk Level
Low

Testing
Unit, manual

Docs and API Changes
envoyproxy/data-plane-api#499

Signed-off-by: Daniel Hochman <[email protected]>
  • Loading branch information
danielhochman authored and htuch committed Feb 23, 2018
1 parent af59a51 commit e31b92b
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 24 deletions.
5 changes: 4 additions & 1 deletion source/common/config/cds_json.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ void CdsJson::translateHealthCheck(const Json::Object& json_health_check,
}
} else {
ASSERT(hc_type == "redis");
health_check.mutable_redis_health_check();
auto* redis_health_check = health_check.mutable_redis_health_check();
if (json_health_check.hasObject("redis_key")) {
redis_health_check->set_key(json_health_check.getString("redis_key"));
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion source/common/json/config_schemas.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1429,7 +1429,8 @@ const std::string Json::Schema::CLUSTER_HEALTH_CHECK_SCHEMA(R"EOF(
"exclusiveMinimum" : true
},
"reuse_connection" : {"type" : "boolean"},
"service_name" : {"type" : "string"}
"service_name" : {"type" : "string"},
"redis_key" : {"type" : "string"}
},
"required" : ["type", "timeout_ms", "interval_ms", "unhealthy_threshold", "healthy_threshold"],
"additionalProperties" : false
Expand Down
53 changes: 47 additions & 6 deletions source/common/upstream/health_checker_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,13 @@ RedisHealthCheckerImpl::RedisHealthCheckerImpl(const Cluster& cluster,
Runtime::RandomGenerator& random,
Redis::ConnPool::ClientFactory& client_factory)
: HealthCheckerImplBase(cluster, config, dispatcher, runtime, random),
client_factory_(client_factory) {}
client_factory_(client_factory), key_(config.redis_health_check().key()) {
if (!key_.empty()) {
type_ = Type::Exists;
} else {
type_ = Type::Ping;
}
}

RedisHealthCheckerImpl::RedisActiveHealthCheckSession::RedisActiveHealthCheckSession(
RedisHealthCheckerImpl& parent, const HostSharedPtr& host)
Expand Down Expand Up @@ -561,17 +567,42 @@ void RedisHealthCheckerImpl::RedisActiveHealthCheckSession::onInterval() {
}

ASSERT(!current_request_);
current_request_ = client_->makeRequest(healthCheckRequest(), *this);

switch (parent_.type_) {
case Type::Exists:
current_request_ = client_->makeRequest(existsHealthCheckRequest(parent_.key_), *this);
break;
case Type::Ping:
current_request_ = client_->makeRequest(pingHealthCheckRequest(), *this);
break;
default:
NOT_REACHED;
}
}

void RedisHealthCheckerImpl::RedisActiveHealthCheckSession::onResponse(
Redis::RespValuePtr&& value) {
current_request_ = nullptr;
if (value->type() == Redis::RespType::SimpleString && value->asString() == "PONG") {
handleSuccess();
} else {
handleFailure(FailureType::Active);

switch (parent_.type_) {
case Type::Exists:
if (value->type() == Redis::RespType::Integer && value->asInteger() == 0) {
handleSuccess();
} else {
handleFailure(FailureType::Active);
}
break;
case Type::Ping:
if (value->type() == Redis::RespType::SimpleString && value->asString() == "PONG") {
handleSuccess();
} else {
handleFailure(FailureType::Active);
}
break;
default:
NOT_REACHED;
}

if (!parent_.reuse_connection_) {
client_->close();
}
Expand All @@ -588,6 +619,16 @@ void RedisHealthCheckerImpl::RedisActiveHealthCheckSession::onTimeout() {
client_->close();
}

RedisHealthCheckerImpl::HealthCheckRequest::HealthCheckRequest(const std::string& key) {
std::vector<Redis::RespValue> values(2);
values[0].type(Redis::RespType::BulkString);
values[0].asString() = "EXISTS";
values[1].type(Redis::RespType::BulkString);
values[1].asString() = key;
request_.type(Redis::RespType::Array);
request_.asArray().swap(values);
}

RedisHealthCheckerImpl::HealthCheckRequest::HealthCheckRequest() {
std::vector<Redis::RespValue> values(1);
values[0].type(Redis::RespType::BulkString);
Expand Down
13 changes: 11 additions & 2 deletions source/common/upstream/health_checker_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -367,19 +367,23 @@ class RedisHealthCheckerImpl : public HealthCheckerImplBase {
Runtime::RandomGenerator& random,
Redis::ConnPool::ClientFactory& client_factory);

static const Redis::RespValue& healthCheckRequest() {
static const Redis::RespValue& pingHealthCheckRequest() {
static HealthCheckRequest* request = new HealthCheckRequest();
return request->request_;
}

static const Redis::RespValue& existsHealthCheckRequest(const std::string& key) {
static HealthCheckRequest* request = new HealthCheckRequest(key);
return request->request_;
}

private:
struct RedisActiveHealthCheckSession : public ActiveHealthCheckSession,
public Redis::ConnPool::Config,
public Redis::ConnPool::PoolCallbacks,
public Network::ConnectionCallbacks {
RedisActiveHealthCheckSession(RedisHealthCheckerImpl& parent, const HostSharedPtr& host);
~RedisActiveHealthCheckSession();

// ActiveHealthCheckSession
void onInterval() override;
void onTimeout() override;
Expand All @@ -405,7 +409,10 @@ class RedisHealthCheckerImpl : public HealthCheckerImplBase {
Redis::ConnPool::PoolRequest* current_request_{};
};

enum class Type { Ping, Exists };

struct HealthCheckRequest {
HealthCheckRequest(const std::string& key);
HealthCheckRequest();

Redis::RespValue request_;
Expand All @@ -419,6 +426,8 @@ class RedisHealthCheckerImpl : public HealthCheckerImplBase {
}

Redis::ConnPool::ClientFactory& client_factory_;
Type type_;
const std::string key_;
};

/**
Expand Down
100 changes: 86 additions & 14 deletions test/common/upstream/health_checker_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1413,6 +1413,22 @@ class RedisHealthCheckerImplTest : public testing::Test, public Redis::ConnPool:
dispatcher_, runtime_, random_, *this));
}

void setupExistsHealthcheck() {
std::string json = R"EOF(
{
"type": "redis",
"timeout_ms": 1000,
"interval_ms": 1000,
"unhealthy_threshold": 1,
"healthy_threshold": 1,
"redis_key": "foo"
}
)EOF";

health_checker_.reset(new RedisHealthCheckerImpl(*cluster_, parseHealthCheckFromJson(json),
dispatcher_, runtime_, random_, *this));
}

void setupDontReuseConnection() {
std::string json = R"EOF(
{
Expand Down Expand Up @@ -1447,8 +1463,14 @@ class RedisHealthCheckerImplTest : public testing::Test, public Redis::ConnPool:
EXPECT_CALL(*client_, addConnectionCallbacks(_));
}

void expectRequestCreate() {
EXPECT_CALL(*client_, makeRequest(Ref(RedisHealthCheckerImpl::healthCheckRequest()), _))
void expectExistsRequestCreate() {
EXPECT_CALL(*client_, makeRequest(Ref(RedisHealthCheckerImpl::existsHealthCheckRequest("")), _))
.WillOnce(DoAll(WithArg<1>(SaveArgAddress(&pool_callbacks_)), Return(&pool_request_)));
EXPECT_CALL(*timeout_timer_, enableTimer(_));
}

void expectPingRequestCreate() {
EXPECT_CALL(*client_, makeRequest(Ref(RedisHealthCheckerImpl::pingHealthCheckRequest()), _))
.WillOnce(DoAll(WithArg<1>(SaveArgAddress(&pool_callbacks_)), Return(&pool_request_)));
EXPECT_CALL(*timeout_timer_, enableTimer(_));
}
Expand All @@ -1465,7 +1487,7 @@ class RedisHealthCheckerImplTest : public testing::Test, public Redis::ConnPool:
std::shared_ptr<RedisHealthCheckerImpl> health_checker_;
};

TEST_F(RedisHealthCheckerImplTest, All) {
TEST_F(RedisHealthCheckerImplTest, PingAndVariousFailures) {
InSequence s;
setup();

Expand All @@ -1474,7 +1496,7 @@ TEST_F(RedisHealthCheckerImplTest, All) {

expectSessionCreate();
expectClientCreate();
expectRequestCreate();
expectPingRequestCreate();
health_checker_->start();

client_->runHighWatermarkCallbacks();
Expand All @@ -1488,7 +1510,7 @@ TEST_F(RedisHealthCheckerImplTest, All) {
response->asString() = "PONG";
pool_callbacks_->onResponse(std::move(response));

expectRequestCreate();
expectPingRequestCreate();
interval_timer_->callback_();

// Failure
Expand All @@ -1497,7 +1519,7 @@ TEST_F(RedisHealthCheckerImplTest, All) {
response.reset(new Redis::RespValue());
pool_callbacks_->onResponse(std::move(response));

expectRequestCreate();
expectPingRequestCreate();
interval_timer_->callback_();

// Redis failure via disconnect
Expand All @@ -1507,7 +1529,7 @@ TEST_F(RedisHealthCheckerImplTest, All) {
client_->raiseEvent(Network::ConnectionEvent::RemoteClose);

expectClientCreate();
expectRequestCreate();
expectPingRequestCreate();
interval_timer_->callback_();

// Timeout
Expand All @@ -1518,7 +1540,7 @@ TEST_F(RedisHealthCheckerImplTest, All) {
timeout_timer_->callback_();

expectClientCreate();
expectRequestCreate();
expectPingRequestCreate();
interval_timer_->callback_();

// Shutdown with active request.
Expand All @@ -1531,8 +1553,58 @@ TEST_F(RedisHealthCheckerImplTest, All) {
EXPECT_EQ(2UL, cluster_->info_->stats_store_.counter("health_check.network_failure").value());
}

TEST_F(RedisHealthCheckerImplTest, Exists) {
InSequence s;
setupExistsHealthcheck();

cluster_->prioritySet().getMockHostSet(0)->hosts_ = {
makeTestHost(cluster_->info_, "tcp://127.0.0.1:80")};

expectSessionCreate();
expectClientCreate();
expectExistsRequestCreate();
health_checker_->start();

client_->runHighWatermarkCallbacks();
client_->runLowWatermarkCallbacks();

// Success
EXPECT_CALL(*timeout_timer_, disableTimer());
EXPECT_CALL(*interval_timer_, enableTimer(_));
Redis::RespValuePtr response(new Redis::RespValue());
response->type(Redis::RespType::Integer);
response->asInteger() = 0;
pool_callbacks_->onResponse(std::move(response));

expectExistsRequestCreate();
interval_timer_->callback_();

// Failure, exists
EXPECT_CALL(*timeout_timer_, disableTimer());
EXPECT_CALL(*interval_timer_, enableTimer(_));
response.reset(new Redis::RespValue());
response->type(Redis::RespType::Integer);
response->asInteger() = 1;
pool_callbacks_->onResponse(std::move(response));

expectExistsRequestCreate();
interval_timer_->callback_();

// Failure, no value
EXPECT_CALL(*timeout_timer_, disableTimer());
EXPECT_CALL(*interval_timer_, enableTimer(_));
response.reset(new Redis::RespValue());
pool_callbacks_->onResponse(std::move(response));

EXPECT_CALL(*client_, close());

EXPECT_EQ(3UL, cluster_->info_->stats_store_.counter("health_check.attempt").value());
EXPECT_EQ(1UL, cluster_->info_->stats_store_.counter("health_check.success").value());
EXPECT_EQ(2UL, cluster_->info_->stats_store_.counter("health_check.failure").value());
}

// Tests that redis client will behave appropriately when reuse_connection is false.
TEST_F(RedisHealthCheckerImplTest, AllDontReuseConnection) {
TEST_F(RedisHealthCheckerImplTest, NoConnectionReuse) {
InSequence s;
setupDontReuseConnection();

Expand All @@ -1541,7 +1613,7 @@ TEST_F(RedisHealthCheckerImplTest, AllDontReuseConnection) {

expectSessionCreate();
expectClientCreate();
expectRequestCreate();
expectPingRequestCreate();
health_checker_->start();

// The connection will close on success.
Expand All @@ -1554,7 +1626,7 @@ TEST_F(RedisHealthCheckerImplTest, AllDontReuseConnection) {
pool_callbacks_->onResponse(std::move(response));

expectClientCreate();
expectRequestCreate();
expectPingRequestCreate();
interval_timer_->callback_();

// The connection will close on failure.
Expand All @@ -1565,7 +1637,7 @@ TEST_F(RedisHealthCheckerImplTest, AllDontReuseConnection) {
pool_callbacks_->onResponse(std::move(response));

expectClientCreate();
expectRequestCreate();
expectPingRequestCreate();
interval_timer_->callback_();

// Redis failure via disconnect, the connection was closed by the other end.
Expand All @@ -1575,7 +1647,7 @@ TEST_F(RedisHealthCheckerImplTest, AllDontReuseConnection) {
client_->raiseEvent(Network::ConnectionEvent::RemoteClose);

expectClientCreate();
expectRequestCreate();
expectPingRequestCreate();
interval_timer_->callback_();

// Timeout, the connection will be closed.
Expand All @@ -1586,7 +1658,7 @@ TEST_F(RedisHealthCheckerImplTest, AllDontReuseConnection) {
timeout_timer_->callback_();

expectClientCreate();
expectRequestCreate();
expectPingRequestCreate();
interval_timer_->callback_();

// Shutdown with active request.
Expand Down

0 comments on commit e31b92b

Please sign in to comment.