diff --git a/shard.yml b/shard.yml index 282ceda8..6beb8797 100644 --- a/shard.yml +++ b/shard.yml @@ -1,5 +1,5 @@ name: placeos-driver -version: 6.8.6 +version: 6.9.0 crystal: ">= 1.0.0" dependencies: diff --git a/src/placeos-driver.cr b/src/placeos-driver.cr index e530a86f..21c3fa16 100644 --- a/src/placeos-driver.cr +++ b/src/placeos-driver.cr @@ -62,7 +62,7 @@ abstract class PlaceOS::Driver # applies a security level to a driver function # - # ```crystal + # ``` # @[Security(Level::Administrator)] # def my_driver_function # end @@ -361,7 +361,7 @@ abstract class PlaceOS::Driver end # :nodoc: - IGNORE_KLASSES = ["PlaceOS::Driver", "Reference", "Object", "Spec::ObjectExtensions", "Colorize::ObjectExtensions"] + IGNORE_KLASSES = ["PlaceOS::Driver", "Reference", "Object", "Spec::ObjectExtensions", "Colorize::ObjectExtensions"] RESERVED_METHODS = {} of Nil => Nil {% RESERVED_METHODS["initialize"] = true %} diff --git a/src/placeos-driver/driver-specs/mock_driver.cr b/src/placeos-driver/driver-specs/mock_driver.cr index 3cec1b2e..0a59fcba 100644 --- a/src/placeos-driver/driver-specs/mock_driver.cr +++ b/src/placeos-driver/driver-specs/mock_driver.cr @@ -115,7 +115,7 @@ abstract class DriverSpecs::MockDriver end # :nodoc: - IGNORE_KLASSES = ["DriverSpecs", "PlaceOS::Driver", "Reference", "Object", "Spec::ObjectExtensions", "Colorize::ObjectExtensions"] + IGNORE_KLASSES = ["DriverSpecs", "PlaceOS::Driver", "Reference", "Object", "Spec::ObjectExtensions", "Colorize::ObjectExtensions"] # :nodoc: RESERVED_METHODS = {} of Nil => Nil diff --git a/src/placeos-driver/driver-specs/runner.cr b/src/placeos-driver/driver-specs/runner.cr index e51d3bf6..c07d3ac0 100644 --- a/src/placeos-driver/driver-specs/runner.cr +++ b/src/placeos-driver/driver-specs/runner.cr @@ -37,10 +37,11 @@ class DriverSpecs unix_server = UNIXServer.new(unix_socket) wait_driver_open = Channel(UNIXSocket).new + # no need to keep the server open once the process has checked in spawn do - while client = unix_server.accept? - wait_driver_open.send client - end + client = unix_server.accept? + wait_driver_open.send client.as(UNIXSocket) + unix_server.close end wait_driver_close = Channel(Nil).new diff --git a/src/placeos-driver/protocol/management.cr b/src/placeos-driver/protocol/management.cr index fa655759..6d1c60dc 100644 --- a/src/placeos-driver/protocol/management.cr +++ b/src/placeos-driver/protocol/management.cr @@ -31,7 +31,9 @@ class PlaceOS::Driver::Protocol::Management property on_redis : Proc(RedisAction, String, String, String?, Nil) = ->(_action : RedisAction, _hash_id : String, _key_name : String, _status_value : String?) {} - getter? terminated = false + getter shutting_down : Channel(Nil) = Channel(Nil).new(1) + getter? terminated : Bool = false + getter proc : Process? = nil getter pid : Int64 = -1 getter last_exit_code : Int32 = 0 @@ -47,7 +49,7 @@ class PlaceOS::Driver::Protocol::Management private getter modules : Hash(String, String) = {} of String => String private getter events : Channel(Request) = Channel(Request).new - @io : IO::Stapled? = nil + @io : UNIXSocket? = nil def initialize(@driver_path : String, @on_edge : Bool = false) @requests = {} of UInt64 => Promise::DeferredPromise(Tuple(String, Int32)) @@ -260,11 +262,28 @@ class PlaceOS::Driver::Protocol::Management modules.clear + channel = shutting_down + spawn(same_thread: true) { ensure_shutdown(channel) } + # The driver will shutdown the modules gracefully json = %({"id":"t","cmd":"terminate"}) io.write_bytes json.bytesize io.write json.to_slice io.flush + rescue + process = proc + return unless process + process.terminate graceful: false + end + + private def ensure_shutdown(channel) + select + when channel.receive + when timeout 10.seconds + if (process = proc) && channel == @shutting_down + process.terminate graceful: false + end + end end private def exec(module_id : String, payload : String, seq : UInt64, user_id : String?) : Nil @@ -312,48 +331,79 @@ class PlaceOS::Driver::Protocol::Management # Create the host driver process, then load modules that have been assigned. private def start_process : Nil return if @io || terminated? + io = nil + process = nil - stdin_reader, input = IO.pipe - output, stderr_writer = IO.pipe + begin + # Prepare driver IO + unix_socket = File.tempname("pos", ".driver") + unix_server = UNIXServer.new(unix_socket) + wait_driver_open = Promise.new(UNIXSocket, 15.seconds) - # We want to be manually flushing our writes - input.sync = false - io = IO::Stapled.new(output, input, true) + # no need to keep the server open once the process has checked in + spawn do + begin + client = unix_server.accept + # We want to be manually flushing our writes + client.sync = false + wait_driver_open.resolve client + unix_server.close + rescue error + wait_driver_open.reject error + end + end - @launch_count += 1 - @launch_time = Time.utc.to_unix + @launch_count += 1 + @launch_time = Time.utc.to_unix - fetch_pid = Promise.new(Int64) - spawn(same_thread: true) { launch_driver(fetch_pid, stdin_reader, stderr_writer) } - @pid = fetch_pid.get.as(Int64) + fetch_proc = Promise.new(Process) + spawn(same_thread: true) { launch_driver(fetch_proc, unix_socket) } + @proc = process = fetch_proc.get + @pid = process.pid - # Start processing the output of the driver - loaded = Promise.new(Nil) - spawn(same_thread: true) { process_comms(io, loaded) } - loaded.get + io = wait_driver_open.get - # start the desired instances - modules.each do |module_id, payload| - json = %({"id":"#{module_id}","cmd":"start","payload":#{payload.to_json}}) - io.write_bytes json.bytesize - io.write json.to_slice - io.flush - end + # Start processing the output of the driver + loaded = Promise.new(Nil) + spawn(same_thread: true) { process_comms(io, loaded) } + loaded.get + + # start the desired instances + modules.each do |module_id, payload| + json = %({"id":"#{module_id}","cmd":"start","payload":#{payload.to_json}}) + io.write_bytes json.bytesize + io.write json.to_slice + io.flush + end + + # events can now write directly to the io, driver is running + @io = io + rescue error + Log.error(exception: error) { {message: "failed to launch driver", driver_path: @driver_path} } + + if io.nil? + if process + process.terminate graceful: false + end - # events can now write directly to the io, driver is running - @io = io + # attempt to relaunch + sleep 5 + return if @io || terminated? + spawn(same_thread: true) { relaunch("-1") } + end + end end # launches the driver and manages the process - private def launch_driver(fetch_pid, stdin_reader, stderr_writer) : Nil + private def launch_driver(fetch_proc, unix_socket) : Nil Process.run( @driver_path, - @on_edge ? {"-p", "-e"} : {"-p"}, - input: stdin_reader, + @on_edge ? {"-p", "-e", "-s", unix_socket} : {"-p", "-s", unix_socket}, + input: Process::Redirect::Close, output: Process::Redirect::Inherit, - error: stderr_writer + error: Process::Redirect::Inherit ) do |process| - fetch_pid.resolve process.pid + fetch_proc.resolve process end status = $? @@ -361,19 +411,22 @@ class PlaceOS::Driver::Protocol::Management Log.warn { {message: "driver process exited with #{last_exit_code}", driver_path: @driver_path} } unless status.success? - begin - @io.try &.close - rescue - ensure - @io = nil + if io = @io + io.close rescue nil + @shutting_down.send nil @events.send(Request.new(last_exit_code, :exited)) end + rescue error + Log.error(exception: error) { "error launching driver: #{@driver_path}" } + fetch_proc.reject error end private def relaunch(last_exit_code : String) : Nil @io = nil - @last_exit_code = last_exit_code.to_i return if terminated? + @last_exit_code = last_exit_code.to_i? || -1 + + @shutting_down = Channel(Nil).new(1) start_process unless modules.empty? end @@ -417,7 +470,8 @@ class PlaceOS::Driver::Protocol::Management end rescue error : IO::Error # Input stream closed. This should only occur on termination - Log.debug(exception: error) { "comms closed for #{@driver_path}" } + Log.debug(exception: error) { "comms closed for #{@driver_path}" } unless terminated? + loaded.reject error ensure # Reject any pending request temp_reqs = request_lock.synchronize do