Skip to content

Commit

Permalink
Add option to Zookeeper reporter to periodically "touch" node (#126)
Browse files Browse the repository at this point in the history
This essentially implements a heartbeat mechanism for persistent nodes, in order to keep the node's `mtime` up-to-date.
  • Loading branch information
panchr authored Mar 27, 2020
1 parent 0e67415 commit b676030
Show file tree
Hide file tree
Showing 7 changed files with 546 additions and 28 deletions.
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
nerve (0.9.5)
nerve (0.9.6)
bunny (= 1.1.0)
dogstatsd-ruby (~> 3.3.0)
etcd (~> 0.2.3)
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ If you set your `reporter_type` to `"zookeeper"` you should also set these param
* `zk_path`: the path (or [znode](https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_zkDataModel_znodes)) where the registration will be created
* `use_path_encoding`: optional flag to turn on path encoding optimization, the canonical config data at host level (e.g. ip, port, az) is encoded using json base64 and written as zk child name, the zk child data will still be written for backward compatibility
* `node_type`: the [type](https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#Ephemeral+Nodes) of znode that nerve will register as. The available types are `ephemeral_sequential`, `persistent_sequential`, `persistent`, and `ephemeral`. If not specified, nerve will create the znode as `ephemeral_sequential` type by default
* `ttl_seconds`: repeatedly 'touch' the created node at this interval in order to update the `mtime`. If nil (the default), it will not perform this periodic update.

#### Etcd Reporter ####

Expand Down
20 changes: 20 additions & 0 deletions lib/nerve/atomic.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
require 'thread'

module Nerve
class AtomicValue
def initialize(initial_value)
@mu = Mutex.new
set(initial_value)
end

def get
return @mu.synchronize { @value }
end

def set(new_value)
@mu.synchronize {
@value = new_value
}
end
end
end
132 changes: 115 additions & 17 deletions lib/nerve/reporter/zookeeper.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'nerve/reporter/base'
require 'nerve/atomic'
require 'thread'
require 'zk'
require 'zookeeper'
Expand All @@ -14,6 +15,7 @@ class Zookeeper < Base
PATH_ENCODING_MAX_LENGTH = 65536

DEFAULT_NODE_TYPE = 'ephemeral_sequential'
TTL_RENEW_EXCLUSIONS = [ :ephemeral, :ephemeral_sequential ].freeze

@@zk_pool = {}
@@zk_pool_count = {}
Expand All @@ -31,7 +33,8 @@ def initialize(service)
@mode = (service['node_type'] || DEFAULT_NODE_TYPE).to_sym
@zk_path = service['zk_path']
@key_prefix = @zk_path + encode_child_name(service)
@full_key = nil
@node_ttl = service['ttl_seconds']
@full_key = Nerve::AtomicValue.new(nil)
end

def start()
Expand All @@ -53,10 +56,16 @@ def start()
@zk = @@zk_pool[@zk_connection_string]
log.info "nerve: retrieved zk connection to #{@zk_connection_string}"
}

start_ttl_renew_thread
end

def stop()
log.info "nerve: removing zk node at #{@full_key}" if @full_key
stop_ttl_renew_thread

node_path = @full_key.get
log.info "nerve: removing zk node at #{node_path}" if node_path

begin
report_down
ensure
Expand All @@ -76,14 +85,16 @@ def stop()
end

def report_up()
node_path = @full_key.get

if not @zk.connected?
log.error "nerve: error in reporting up on zk node #{@full_key}: loss connection"
log.error "nerve: error in reporting up on zk node #{node_path}: loss connection"
return false
else
begin
zk_save
zk_save(node_path)
rescue *ZK_CONNECTION_ERRORS => e
log.error "nerve: error in reporting up on zk node #{@full_key}: #{e.message}"
log.error "nerve: error in reporting up on zk node #{node_path}: #{e.message}"
return false
end

Expand All @@ -92,14 +103,16 @@ def report_up()
end

def report_down
node_path = @full_key.get

if not @zk.connected?
log.error "nerve: error in reporting down on zk node #{@full_key}: loss connection"
log.error "nerve: error in reporting down on zk node #{node_path}: loss connection"
return false
else
begin
zk_delete
rescue *ZK_CONNECTION_ERRORS => e
log.error "nerve: error in reporting down on zk node #{@full_key}: #{e.message}"
log.error "nerve: error in reporting down on zk node #{node_path}: #{e.message}"
return false
end

Expand All @@ -108,14 +121,16 @@ def report_down
end

def ping?
node_path = @full_key.get

if not @zk.connected?
log.error "nerve: error in ping reporter at zk node #{@full_key}: loss connection"
log.error "nerve: error in ping reporter at zk node #{node_path}: loss connection"
return false
else
begin
return @zk.exists?(@full_key || '/')
return @zk.exists?(node_path || '/')
rescue *ZK_CONNECTION_ERRORS => e
log.error "nerve: error in ping reporter at zk node #{@full_key}: #{e.message}"
log.error "nerve: error in ping reporter at zk node #{node_path}: #{e.message}"
return false
end
end
Expand Down Expand Up @@ -145,33 +160,116 @@ def encode_child_name(service)
end

def zk_delete
if @full_key
node_path = @full_key.get

if node_path
statsd.time('nerve.reporter.zk.delete.elapsed_time', tags: ["zk_cluster:#{@zk_cluster}"]) do
@zk.delete(@full_key, :ignore => :no_node)
@zk.delete(node_path, :ignore => :no_node)
end
@full_key = nil

@full_key.set(nil)
end
end

def zk_create
# only mkdir_p if the path does not exist
statsd.time('nerve.reporter.zk.create.elapsed_time', tags: ["zk_cluster:#{@zk_cluster}", "zk_path:#{@zk_path}"]) do
@zk.mkdir_p(@zk_path) unless @zk.exists?(@zk_path)
@full_key = @zk.create(@key_prefix, :data => @data, :mode => @mode)

node_path = zk_try_create
@full_key.set(node_path)
log.info "nerve: wrote new ZK node of type #{@mode} at #{node_path}"
end
end

def zk_save
return zk_create unless @full_key
def zk_try_create
begin
return @zk.create(@key_prefix, :data => @data, :mode => @mode)
rescue ::Zookeeper::Exceptions::NodeExists, ZK::Exceptions::NodeExists
# This exception will only occur when not using sequential
# nodes (because sequential nodes are always unique), in which
# case the name is the same as @key_prefix as Zookeeper
# will not append any suffix.
@zk.set(@key_prefix, @data)
log.info "nerve: tried to write node but exists, setting data instead"

return @key_prefix
end
end

def zk_save(node_path)
return zk_create unless node_path

begin
statsd.time('nerve.reporter.zk.save.elapsed_time', tags: ["zk_cluster:#{@zk_cluster}"]) do
@zk.set(@full_key, @data)
@zk.set(node_path, @data)
log.info "nerve: set data on #{node_path}"
end
rescue ZK::Exceptions::NoNode
zk_create
end
end

def start_ttl_renew_thread
@ttl_should_exit = Nerve::AtomicValue.new(false)
@ttl_thread = nil

unless @node_ttl.nil? || TTL_RENEW_EXCLUSIONS.include?(@mode)
@ttl_thread = Thread.new {
log.info "nerve: ttl renew: background thread starting"
last_run = Time.now - rand(@node_ttl)

until @ttl_should_exit.get
last_run = renew_ttl(last_run)
sleep 0.5
end

log.info "synapse: ttl renew: background thread exiting normally"
}
end
end

# Renew the TTL of @full_key if more than @node_ttl seconds has passed
# between `Time.now` and `last_refresh`.
# Returns the last refresh time *after performing the renewal.*
# If the TTL *is* renewed, it will return `Time.now`.
# Otherwise, it will return `last_refresh`.
def renew_ttl(last_refresh)
elapsed = Time.now - last_refresh

if elapsed >= @node_ttl
node_path = @full_key.get

if node_path.nil?
log.info "nerve: ttl renew: not touching ZK node because path not set"
else
begin
@zk.set(node_path, @data)
log.info "nerve: ttl renew: touched ZK node at #{node_path}"
statsd.increment('nerve.reporter.zk.ttl.renew', tags: ["zk_cluster:#{@zk_cluster}", "result:success"])
rescue ::Zookeeper::Exceptions::NoNode, ZK::Exceptions::NoNode
log.info "nerve: ttl renew: failed to touch ZK node because node not found: #{node_path}"
statsd.increment('nerve.reporter.zk.ttl.renew', tags: ["zk_cluster:#{@zk_cluster}", "result:fail", "reason:no_node"])
rescue *ZK_CONNECTION_ERRORS => e
log.info "nerve: ttl renew: Zookeeper connection issue: #{e}"
statsd.increment('nerve.reporter.zk.ttl.renew', tags: ["zk_cluster:#{@zk_cluster}", "result:fail", "reason:connection_error"])
end
end

# last_refresh can be set regardless of whether or not @zk.set is called.
# If @zk.set is called, then it's obvious that it should be set.
# If @zk.set is *not* called, it can only be called after @full_key
# is set, which happens when the node was just written.
return Time.now
end

return last_refresh
end

def stop_ttl_renew_thread
@ttl_should_exit.set(true)
@ttl_thread.join unless @ttl_thread.nil?
end
end
end

2 changes: 1 addition & 1 deletion lib/nerve/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Nerve
VERSION = "0.9.5"
VERSION = "0.9.6"
end
74 changes: 74 additions & 0 deletions spec/lib/nerve/atomic_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
require 'spec_helper'
require 'nerve/atomic'

describe Nerve::AtomicValue do
let(:initial_value) { 'mock-value' }
let(:internal_mu) { subject.instance_variable_get(:@mu) }

subject { Nerve::AtomicValue.new(initial_value) }

describe '#initialize' do
it 'creates successfully' do
expect { subject }.not_to raise_error
end

it 'sets the initial value' do
expect(subject.get).to eq(initial_value)
end

context 'without a provided value' do
it 'raises an error' do
expect { Nerve::AtomicValue.new }.to raise_error(ArgumentError)
end
end
end

describe '#get' do
let(:value) { 'new-value' }

before :each do
subject.instance_variable_set(:@value, value)
end

it 'returns the internal value' do
expect(subject.get).to eq(value)
end

it 'holds a lock' do
expect(internal_mu).to receive(:synchronize).exactly(:once)
subject.get
end

it 'releases lock after call' do
expect(internal_mu.locked?).to eq(false)
subject.get
end

context 'after a set' do
it 'returns the new value' do
subject.set(value)
expect(subject.get).to eq(value)
end
end
end

describe '#set' do
let(:value) { 'new-value' }

it 'sets the internal value' do
expect { subject.set(value) }
.to change { subject.instance_variable_get(:@value) }
.from(initial_value).to(value)
end

it 'holds a lock' do
expect(internal_mu).to receive(:synchronize).exactly(:once)
subject.set(value)
end

it 'releases lock after call' do
expect(internal_mu.locked?).to eq(false)
subject.set(value)
end
end
end
Loading

0 comments on commit b676030

Please sign in to comment.