Skip to content

Commit

Permalink
fix(driver_manager): ensure connected state
Browse files Browse the repository at this point in the history
ensures connected state is eventually consistent if redis is unavailable
  • Loading branch information
stakach committed Nov 13, 2024
1 parent 8e76beb commit 594f025
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 12 deletions.
3 changes: 1 addition & 2 deletions shard.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
name: placeos-driver
version: 7.2.17
crystal: ">= 1.0.0"
version: 7.2.18

dependencies:
action-controller:
Expand Down
13 changes: 10 additions & 3 deletions spec/transport_http_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ require "./helper"

describe PlaceOS::Driver::TransportHTTP do
it "should perform a secure request" do
responses = 0
queue = Helper.queue
transport = PlaceOS::Driver::TransportHTTP.new(queue, "https://www.google.com.au/", ::PlaceOS::Driver::Settings.new(%({"disable_cookies": true})))
transport = PlaceOS::Driver::TransportHTTP.new(queue, "https://www.google.com.au/", ::PlaceOS::Driver::Settings.new(%({"disable_cookies": true}))) do
responses += 1
end
transport.before_request do |request|
request.hostname.should eq "www.google.com.au"
end
Expand All @@ -14,16 +17,19 @@ describe PlaceOS::Driver::TransportHTTP do
response = transport.http(:get, "/")
response.status_code.should eq(200)
transport.cookies.size.should eq 0
responses.should eq 1

# Close the connection
transport.terminate
end

it "should perform an insecure request" do
queue = Helper.queue

responses = 0
# Selected from: https://whynohttps.com/
transport = PlaceOS::Driver::TransportHTTP.new(queue, "http://blog.jp/", ::PlaceOS::Driver::Settings.new("{}"))
transport = PlaceOS::Driver::TransportHTTP.new(queue, "http://blog.jp/", ::PlaceOS::Driver::Settings.new("{}")) do
responses += 1
end
transport.before_request do |request|
request.hostname.should eq "blog.jp"
end
Expand All @@ -34,6 +40,7 @@ describe PlaceOS::Driver::TransportHTTP do
response = transport.http(:get, "/")
response.status_code.should eq(200)
(transport.cookies.size > 0).should be_true
responses.should eq 1

# Close the connection
transport.terminate
Expand Down
2 changes: 1 addition & 1 deletion src/placeos-driver/constants.cr
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
require "action-controller/logger"

class PlaceOS::Driver
LOG_FORMAT = ENV["PLACE_LOG_FORMAT"]?.presence || "JSON"
LOG_FORMAT = ENV["PLACE_LOG_FORMAT"]?.presence || "JSON"
LOG_FORMATTER = LOG_FORMAT == "JSON" ? ActionController.json_formatter : ActionController.default_formatter
VERSION = {{ `shards version "#{__DIR__}"`.chomp.stringify.downcase }}
end
38 changes: 33 additions & 5 deletions src/placeos-driver/driver_manager.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class PlaceOS::Driver::DriverManager
@queue = Queue.new(@logger) { |state| connection(state) }
@schedule = PlaceOS::Driver::Proxy::Scheduler.new(@logger)
@subscriptions = edge_driver ? nil : Proxy::Subscriptions.new(subscriptions || Subscriptions.new(module_id: @module_id), @logger)
@calibrate_connected = false

# Ensures execution all occurs on a single thread
@requests = ::Channel(Tuple(::Channel(Nil), Protocol::Request)).new(1)
Expand Down Expand Up @@ -37,7 +38,9 @@ class PlaceOS::Driver::DriverManager
end
end
in .http?
PlaceOS::Driver::TransportHTTP.new(@queue, @model.uri.not_nil!, @settings)
PlaceOS::Driver::TransportHTTP.new(@queue, @model.uri.not_nil!, @settings) do
http_received
end
in .logic?
# nothing required to be done here
PlaceOS::Driver::TransportLogic.new(@queue)
Expand All @@ -52,6 +55,7 @@ class PlaceOS::Driver::DriverManager

@subscriptions : Proxy::Subscriptions?
getter model : ::PlaceOS::Driver::DriverModel
getter calibrate_connected : Bool

# This hack is required to "dynamically" load the user defined class
# The compiler is somewhat fragile when it comes to initializers
Expand Down Expand Up @@ -271,14 +275,23 @@ class PlaceOS::Driver::DriverManager

private def connection(state : Bool) : Nil
driver = @driver
# this is just a hint so we can ignore it if redis was offline
check_proxy_usage(driver) rescue nil

# connected status is more important so we should ensure this is calibrated
begin
driver[:connected] = state
@calibrate_connected = false
rescue error
@calibrate_connected = true
logger.warn(exception: error) { "error setting connected status #{driver.class} (#{@module_id})" }
end

# we want to run these callbacks even if redis was offline
begin
if state
check_proxy_usage(driver)
driver[:connected] = true
driver.connected if driver.responds_to?(:connected)
else
check_proxy_usage(driver)
driver[:connected] = false
driver.disconnected if driver.responds_to?(:disconnected)
end
rescue error
Expand Down Expand Up @@ -306,6 +319,21 @@ class PlaceOS::Driver::DriverManager
logger.warn { "no received function provided for #{driver.class} (#{@module_id})" }
task.try &.abort("no received function provided")
end

# ensure connected state is eventually consistent if there was an issue
# with redis when the connected state changed
ensure_connected_calibrate(driver) if @calibrate_connected
end

protected def http_received
ensure_connected_calibrate(@driver) if @calibrate_connected
end

private def ensure_connected_calibrate(driver) : Nil
driver[:connected] = true
check_proxy_usage(driver) rescue nil
@calibrate_connected = false
rescue
end

private def check_proxy_usage(driver)
Expand Down
4 changes: 3 additions & 1 deletion src/placeos-driver/transport/http.cr
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ class PlaceOS::Driver
def initialize(
@queue : PlaceOS::Driver::Queue,
@uri : String,
@settings : ::PlaceOS::Driver::Settings
@settings : ::PlaceOS::Driver::Settings,
&@received : -> Nil
)
@terminated = false
@tls = new_tls_context
Expand Down Expand Up @@ -276,6 +277,7 @@ class PlaceOS::Driver

# assuming we're typically online, this check before assignment is more performant
@queue.online = true unless @queue.online
@received.call

# fallback in case the HTTP client lib doesn't decompress the response
check_http_response_encoding response
Expand Down

0 comments on commit 594f025

Please sign in to comment.