From 594f02544ac604295a2744925cc231755200c944 Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Thu, 14 Nov 2024 07:11:00 +1100 Subject: [PATCH] fix(driver_manager): ensure connected state ensures connected state is eventually consistent if redis is unavailable --- shard.yml | 3 +-- spec/transport_http_spec.cr | 13 +++++++--- src/placeos-driver/constants.cr | 2 +- src/placeos-driver/driver_manager.cr | 38 ++++++++++++++++++++++++---- src/placeos-driver/transport/http.cr | 4 ++- 5 files changed, 48 insertions(+), 12 deletions(-) diff --git a/shard.yml b/shard.yml index 8dbfbda2..bbca5dd1 100644 --- a/shard.yml +++ b/shard.yml @@ -1,6 +1,5 @@ name: placeos-driver -version: 7.2.17 -crystal: ">= 1.0.0" +version: 7.2.18 dependencies: action-controller: diff --git a/spec/transport_http_spec.cr b/spec/transport_http_spec.cr index b9e9df04..e7174207 100644 --- a/spec/transport_http_spec.cr +++ b/spec/transport_http_spec.cr @@ -2,8 +2,11 @@ require "./helper" describe PlaceOS::Driver::TransportHTTP do it "should perform a secure request" do + responses = 0 queue = Helper.queue - transport = PlaceOS::Driver::TransportHTTP.new(queue, "https://www.google.com.au/", ::PlaceOS::Driver::Settings.new(%({"disable_cookies": true}))) + transport = PlaceOS::Driver::TransportHTTP.new(queue, "https://www.google.com.au/", ::PlaceOS::Driver::Settings.new(%({"disable_cookies": true}))) do + responses += 1 + end transport.before_request do |request| request.hostname.should eq "www.google.com.au" end @@ -14,6 +17,7 @@ describe PlaceOS::Driver::TransportHTTP do response = transport.http(:get, "/") response.status_code.should eq(200) transport.cookies.size.should eq 0 + responses.should eq 1 # Close the connection transport.terminate @@ -21,9 +25,11 @@ describe PlaceOS::Driver::TransportHTTP do it "should perform an insecure request" do queue = Helper.queue - + responses = 0 # Selected from: https://whynohttps.com/ - transport = PlaceOS::Driver::TransportHTTP.new(queue, "http://blog.jp/", ::PlaceOS::Driver::Settings.new("{}")) + transport = PlaceOS::Driver::TransportHTTP.new(queue, "http://blog.jp/", ::PlaceOS::Driver::Settings.new("{}")) do + responses += 1 + end transport.before_request do |request| request.hostname.should eq "blog.jp" end @@ -34,6 +40,7 @@ describe PlaceOS::Driver::TransportHTTP do response = transport.http(:get, "/") response.status_code.should eq(200) (transport.cookies.size > 0).should be_true + responses.should eq 1 # Close the connection transport.terminate diff --git a/src/placeos-driver/constants.cr b/src/placeos-driver/constants.cr index f8af9c68..fdaa50b6 100644 --- a/src/placeos-driver/constants.cr +++ b/src/placeos-driver/constants.cr @@ -1,7 +1,7 @@ require "action-controller/logger" class PlaceOS::Driver - LOG_FORMAT = ENV["PLACE_LOG_FORMAT"]?.presence || "JSON" + LOG_FORMAT = ENV["PLACE_LOG_FORMAT"]?.presence || "JSON" LOG_FORMATTER = LOG_FORMAT == "JSON" ? ActionController.json_formatter : ActionController.default_formatter VERSION = {{ `shards version "#{__DIR__}"`.chomp.stringify.downcase }} end diff --git a/src/placeos-driver/driver_manager.cr b/src/placeos-driver/driver_manager.cr index d1e7650f..0d053fab 100644 --- a/src/placeos-driver/driver_manager.cr +++ b/src/placeos-driver/driver_manager.cr @@ -9,6 +9,7 @@ class PlaceOS::Driver::DriverManager @queue = Queue.new(@logger) { |state| connection(state) } @schedule = PlaceOS::Driver::Proxy::Scheduler.new(@logger) @subscriptions = edge_driver ? nil : Proxy::Subscriptions.new(subscriptions || Subscriptions.new(module_id: @module_id), @logger) + @calibrate_connected = false # Ensures execution all occurs on a single thread @requests = ::Channel(Tuple(::Channel(Nil), Protocol::Request)).new(1) @@ -37,7 +38,9 @@ class PlaceOS::Driver::DriverManager end end in .http? - PlaceOS::Driver::TransportHTTP.new(@queue, @model.uri.not_nil!, @settings) + PlaceOS::Driver::TransportHTTP.new(@queue, @model.uri.not_nil!, @settings) do + http_received + end in .logic? # nothing required to be done here PlaceOS::Driver::TransportLogic.new(@queue) @@ -52,6 +55,7 @@ class PlaceOS::Driver::DriverManager @subscriptions : Proxy::Subscriptions? getter model : ::PlaceOS::Driver::DriverModel + getter calibrate_connected : Bool # This hack is required to "dynamically" load the user defined class # The compiler is somewhat fragile when it comes to initializers @@ -271,14 +275,23 @@ class PlaceOS::Driver::DriverManager private def connection(state : Bool) : Nil driver = @driver + # this is just a hint so we can ignore it if redis was offline + check_proxy_usage(driver) rescue nil + + # connected status is more important so we should ensure this is calibrated + begin + driver[:connected] = state + @calibrate_connected = false + rescue error + @calibrate_connected = true + logger.warn(exception: error) { "error setting connected status #{driver.class} (#{@module_id})" } + end + + # we want to run these callbacks even if redis was offline begin if state - check_proxy_usage(driver) - driver[:connected] = true driver.connected if driver.responds_to?(:connected) else - check_proxy_usage(driver) - driver[:connected] = false driver.disconnected if driver.responds_to?(:disconnected) end rescue error @@ -306,6 +319,21 @@ class PlaceOS::Driver::DriverManager logger.warn { "no received function provided for #{driver.class} (#{@module_id})" } task.try &.abort("no received function provided") end + + # ensure connected state is eventually consistent if there was an issue + # with redis when the connected state changed + ensure_connected_calibrate(driver) if @calibrate_connected + end + + protected def http_received + ensure_connected_calibrate(@driver) if @calibrate_connected + end + + private def ensure_connected_calibrate(driver) : Nil + driver[:connected] = true + check_proxy_usage(driver) rescue nil + @calibrate_connected = false + rescue end private def check_proxy_usage(driver) diff --git a/src/placeos-driver/transport/http.cr b/src/placeos-driver/transport/http.cr index 3eee0baa..449183f7 100644 --- a/src/placeos-driver/transport/http.cr +++ b/src/placeos-driver/transport/http.cr @@ -116,7 +116,8 @@ class PlaceOS::Driver def initialize( @queue : PlaceOS::Driver::Queue, @uri : String, - @settings : ::PlaceOS::Driver::Settings + @settings : ::PlaceOS::Driver::Settings, + &@received : -> Nil ) @terminated = false @tls = new_tls_context @@ -276,6 +277,7 @@ class PlaceOS::Driver # assuming we're typically online, this check before assignment is more performant @queue.online = true unless @queue.online + @received.call # fallback in case the HTTP client lib doesn't decompress the response check_http_response_encoding response