Skip to content

Commit

Permalink
refactor(storage): stop extending stdlib hash
Browse files Browse the repository at this point in the history
  • Loading branch information
caspiano committed Apr 29, 2021
1 parent 82d85d0 commit 82d07a7
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 55 deletions.
2 changes: 1 addition & 1 deletion shard.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: placeos-driver
version: 5.0.10
version: 5.1.0
crystal: ">= 0.36.1"

dependencies:
Expand Down
5 changes: 4 additions & 1 deletion src/placeos-driver/status.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
class PlaceOS::Driver::Status < Hash(String, String)
class PlaceOS::Driver::Status
private getter hash : Hash(String, String) = {} of String => String
forward_missing_to hash

def set_json(key, value)
key = key.to_s
current_value = self[key]?
Expand Down
10 changes: 9 additions & 1 deletion src/placeos-driver/storage.cr
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
abstract class PlaceOS::Driver; end

# Abstraction of a redis hset
abstract class PlaceOS::Driver::Storage < Hash(String, String)
abstract class PlaceOS::Driver::Storage
DEFAULT_PREFIX = "status"

getter hash_key : String
getter id : String
getter prefix : String

def initialize(@id : String, @prefix = DEFAULT_PREFIX)
@hash_key = "#{prefix}/#{@id}"
end

abstract def signal_status(status_name) : String?
end

Expand Down
35 changes: 16 additions & 19 deletions src/placeos-driver/storage/edge-storage.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,38 @@ require "../storage"
require "../protocol"

class PlaceOS::Driver::EdgeStorage < PlaceOS::Driver::Storage
def initialize(@id : String, @prefix = DEFAULT_PREFIX)
super()
@hash_key = "#{prefix}/#{@id}"
private getter hash : Hash(String, String) = {} of String => String
forward_missing_to hash

# This is the same as setting a value as this is often used when
# a hash value is updated and we want to notify of this change.
def signal_status(status_name) : String?
status_name = status_name.to_s
json_value = self[status_name]?
adjusted_value = json_value || "null"
PlaceOS::Driver::Protocol.instance.request(hash_key, "hset", "#{status_name}\x03#{adjusted_value}", raw: true)
json_value
end

getter hash_key : String
getter id : String
getter prefix : String
# Hash methods
#################################################################################################

def []=(status_name, json_value)
status_name = status_name.to_s
adjusted_value = json_value.to_s.presence

if adjusted_value
upsert(status_name, adjusted_value)
hash[status_name] = adjusted_value
PlaceOS::Driver::Protocol.instance.request(hash_key, "hset", "#{status_name}\x03#{adjusted_value}", raw: true)
else
delete(status_name)
end
json_value
end

# This is the same as setting a value as this is often used when
# a hash value is updated and we want to notify of this change
def signal_status(status_name) : String?
status_name = status_name.to_s
json_value = self[status_name]?
adjusted_value = json_value || "null"
PlaceOS::Driver::Protocol.instance.request(hash_key, "hset", "#{status_name}\x03#{adjusted_value}", raw: true)
json_value
end

def delete(key)
key = key.to_s
value = delete_impl(key)
value = hash.delete(key)
if value
PlaceOS::Driver::Protocol.instance.request(hash_key, "hset", "#{key}\x03null", raw: true)
return value
Expand All @@ -45,7 +42,7 @@ class PlaceOS::Driver::EdgeStorage < PlaceOS::Driver::Storage
end

def clear
clear_impl
hash.clear
PlaceOS::Driver::Protocol.instance.request(hash_key, "clear", raw: true)
self
end
Expand Down
79 changes: 47 additions & 32 deletions src/placeos-driver/storage/redis-storage.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,39 @@ require "../storage"
class PlaceOS::Driver::RedisStorage < PlaceOS::Driver::Storage
REDIS_URL = ENV["REDIS_URL"]? || "redis://localhost:6379"

def initialize(@id : String, @prefix = DEFAULT_PREFIX)
super()
@hash_key = "#{prefix}/#{@id}"
@@mutex : Mutex = Mutex.new(:reentrant)

def signal_status(status_name) : String?
status_name = status_name.to_s
key = "#{hash_key}/#{status_name}"
json_value = self[status_name]?
adjusted_value = json_value || "null"
@@mutex.synchronize { redis.publish(key, adjusted_value) }
json_value
end

@@mutex : Mutex = Mutex.new(:reentrant)
# Hash methods
#################################################################################################

forward_missing_to to_h

getter hash_key : String
getter id : String
getter prefix : String
def to_h
hash = {} of String => String
@@mutex.synchronize { redis.hgetall(hash_key) }.each_slice(2) do |slice|
hash[slice[0].to_s] = slice[1].to_s
end
hash
end

def []=(status_name, json_value)
status_name = status_name.to_s
adjusted_value = json_value.to_s.presence

if adjusted_value
key = hash_key
@@mutex.synchronize do
redis.pipelined(key, reconnect: true) do |pipeline|
pipeline.hset(key, status_name, adjusted_value)
pipeline.publish("#{key}/#{status_name}", adjusted_value)
redis.pipelined(hash_key, reconnect: true) do |pipeline|
pipeline.hset(hash_key, status_name, adjusted_value)
pipeline.publish("#{hash_key}/#{status_name}", adjusted_value)
end
end
else
Expand All @@ -34,37 +46,47 @@ class PlaceOS::Driver::RedisStorage < PlaceOS::Driver::Storage
json_value
end

def signal_status(status_name) : String?
status_name = status_name.to_s
key = "#{hash_key}/#{status_name}"
json_value = self[status_name]?
adjusted_value = json_value || "null"
@@mutex.synchronize { redis.publish(key, adjusted_value) }
json_value
end

def fetch(key)
key = key.to_s
entry = @@mutex.synchronize { redis.hget(hash_key, key) }
entry ? entry.to_s : yield key
end

def fetch(key, default)
fetch(key) { default }
end

def [](key, & : String -> String)
fetch(key) { yield }
end

def [](key)
fetch(key) { raise KeyError.new "Missing hash key: #{key.inspect}" }
end

def []?(key)
fetch(key, nil)
end

def delete(key)
key = key.to_s
value = self[key]?
if value
hkey = hash_key
@@mutex.synchronize do
redis.pipelined(hkey, reconnect: true) do |pipeline|
pipeline.hdel(hkey, key)
pipeline.publish("#{hkey}/#{key}", "null")
redis.pipelined(hash_key, reconnect: true) do |pipeline|
pipeline.hdel(hash_key, key)
pipeline.publish("#{hash_key}/#{key}", "null")
end
end
return value.to_s
end
yield key
end

def delete(key)
delete(key) { nil }
end

def keys
@@mutex.synchronize { redis.hkeys(hash_key) }.map &.to_s
end
Expand All @@ -81,14 +103,6 @@ class PlaceOS::Driver::RedisStorage < PlaceOS::Driver::Storage
size == 0
end

def to_h
hash = {} of String => String
@@mutex.synchronize { redis.hgetall(hash_key) }.each_slice(2) do |slice|
hash[slice[0].to_s] = slice[1].to_s
end
hash
end

def clear
hkey = hash_key
@@mutex.synchronize do
Expand All @@ -106,6 +120,7 @@ class PlaceOS::Driver::RedisStorage < PlaceOS::Driver::Storage
# Redis
#############################################################################

private class_getter redis_lock = Mutex.new
private getter redis : Redis::Client { self.class.shared_redis_client }

def self.get(key)
Expand Down
2 changes: 1 addition & 1 deletion src/placeos-driver/utilities/discovery.cr
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ abstract class PlaceOS::Driver
{% if compiler_enforced %}
{% methods = methods.reject { |method| RESERVED_METHODS[method.name.stringify] } %}
{% methods = methods.reject { |method| method.visibility != :public } %}
{% methods = methods.reject { |method| method.accepts_block? } %}
{% methods = methods.reject &.accepts_block? %}
{% else %}
{% methods = [] of Crystal::Macros::TypeNode %}
{% end %}
Expand Down

0 comments on commit 82d07a7

Please sign in to comment.