Skip to content

Commit

Permalink
feat(management): switch to using unix sockets for IPC
Browse files Browse the repository at this point in the history
also improves guarantees of process startup and shutdown
  • Loading branch information
stakach committed Jun 27, 2023
1 parent 9d4aaca commit 6a0c26c
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 44 deletions.
2 changes: 1 addition & 1 deletion shard.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: placeos-driver
version: 6.8.6
version: 6.9.0
crystal: ">= 1.0.0"

dependencies:
Expand Down
4 changes: 2 additions & 2 deletions src/placeos-driver.cr
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ abstract class PlaceOS::Driver

# applies a security level to a driver function
#
# ```crystal
# ```
# @[Security(Level::Administrator)]
# def my_driver_function
# end
Expand Down Expand Up @@ -361,7 +361,7 @@ abstract class PlaceOS::Driver
end

# :nodoc:
IGNORE_KLASSES = ["PlaceOS::Driver", "Reference", "Object", "Spec::ObjectExtensions", "Colorize::ObjectExtensions"]
IGNORE_KLASSES = ["PlaceOS::Driver", "Reference", "Object", "Spec::ObjectExtensions", "Colorize::ObjectExtensions"]

RESERVED_METHODS = {} of Nil => Nil
{% RESERVED_METHODS["initialize"] = true %}
Expand Down
2 changes: 1 addition & 1 deletion src/placeos-driver/driver-specs/mock_driver.cr
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ abstract class DriverSpecs::MockDriver
end

# :nodoc:
IGNORE_KLASSES = ["DriverSpecs", "PlaceOS::Driver", "Reference", "Object", "Spec::ObjectExtensions", "Colorize::ObjectExtensions"]
IGNORE_KLASSES = ["DriverSpecs", "PlaceOS::Driver", "Reference", "Object", "Spec::ObjectExtensions", "Colorize::ObjectExtensions"]

# :nodoc:
RESERVED_METHODS = {} of Nil => Nil
Expand Down
7 changes: 4 additions & 3 deletions src/placeos-driver/driver-specs/runner.cr
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ class DriverSpecs
unix_server = UNIXServer.new(unix_socket)
wait_driver_open = Channel(UNIXSocket).new

# no need to keep the server open once the process has checked in
spawn do
while client = unix_server.accept?
wait_driver_open.send client
end
client = unix_server.accept?
wait_driver_open.send client.as(UNIXSocket)
unix_server.close
end

wait_driver_close = Channel(Nil).new
Expand Down
128 changes: 91 additions & 37 deletions src/placeos-driver/protocol/management.cr
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ class PlaceOS::Driver::Protocol::Management

property on_redis : Proc(RedisAction, String, String, String?, Nil) = ->(_action : RedisAction, _hash_id : String, _key_name : String, _status_value : String?) {}

getter? terminated = false
getter shutting_down : Channel(Nil) = Channel(Nil).new(1)
getter? terminated : Bool = false
getter proc : Process? = nil
getter pid : Int64 = -1

getter last_exit_code : Int32 = 0
Expand All @@ -47,7 +49,7 @@ class PlaceOS::Driver::Protocol::Management
private getter modules : Hash(String, String) = {} of String => String
private getter events : Channel(Request) = Channel(Request).new

@io : IO::Stapled? = nil
@io : UNIXSocket? = nil

def initialize(@driver_path : String, @on_edge : Bool = false)
@requests = {} of UInt64 => Promise::DeferredPromise(Tuple(String, Int32))
Expand Down Expand Up @@ -260,11 +262,28 @@ class PlaceOS::Driver::Protocol::Management

modules.clear

channel = shutting_down
spawn(same_thread: true) { ensure_shutdown(channel) }

# The driver will shutdown the modules gracefully
json = %({"id":"t","cmd":"terminate"})
io.write_bytes json.bytesize
io.write json.to_slice
io.flush
rescue
process = proc
return unless process
process.terminate graceful: false
end

private def ensure_shutdown(channel)
select
when channel.receive
when timeout 10.seconds
if (process = proc) && channel == @shutting_down
process.terminate graceful: false
end
end
end

private def exec(module_id : String, payload : String, seq : UInt64, user_id : String?) : Nil
Expand Down Expand Up @@ -312,68 +331,102 @@ class PlaceOS::Driver::Protocol::Management
# Create the host driver process, then load modules that have been assigned.
private def start_process : Nil
return if @io || terminated?
io = nil
process = nil

stdin_reader, input = IO.pipe
output, stderr_writer = IO.pipe
begin
# Prepare driver IO
unix_socket = File.tempname("pos", ".driver")
unix_server = UNIXServer.new(unix_socket)
wait_driver_open = Promise.new(UNIXSocket, 15.seconds)

# We want to be manually flushing our writes
input.sync = false
io = IO::Stapled.new(output, input, true)
# no need to keep the server open once the process has checked in
spawn do
begin
client = unix_server.accept
# We want to be manually flushing our writes
client.sync = false
wait_driver_open.resolve client
unix_server.close
rescue error
wait_driver_open.reject error
end
end

@launch_count += 1
@launch_time = Time.utc.to_unix
@launch_count += 1
@launch_time = Time.utc.to_unix

fetch_pid = Promise.new(Int64)
spawn(same_thread: true) { launch_driver(fetch_pid, stdin_reader, stderr_writer) }
@pid = fetch_pid.get.as(Int64)
fetch_proc = Promise.new(Process)
spawn(same_thread: true) { launch_driver(fetch_proc, unix_socket) }
@proc = process = fetch_proc.get
@pid = process.pid

# Start processing the output of the driver
loaded = Promise.new(Nil)
spawn(same_thread: true) { process_comms(io, loaded) }
loaded.get
io = wait_driver_open.get

# start the desired instances
modules.each do |module_id, payload|
json = %({"id":"#{module_id}","cmd":"start","payload":#{payload.to_json}})
io.write_bytes json.bytesize
io.write json.to_slice
io.flush
end
# Start processing the output of the driver
loaded = Promise.new(Nil)
spawn(same_thread: true) { process_comms(io, loaded) }
loaded.get

# start the desired instances
modules.each do |module_id, payload|
json = %({"id":"#{module_id}","cmd":"start","payload":#{payload.to_json}})
io.write_bytes json.bytesize
io.write json.to_slice
io.flush
end

# events can now write directly to the io, driver is running
@io = io
rescue error
Log.error(exception: error) { {message: "failed to launch driver", driver_path: @driver_path} }

if io.nil?
if process
process.terminate graceful: false
end

# events can now write directly to the io, driver is running
@io = io
# attempt to relaunch
sleep 5
return if @io || terminated?
spawn(same_thread: true) { relaunch("-1") }
end
end
end

# launches the driver and manages the process
private def launch_driver(fetch_pid, stdin_reader, stderr_writer) : Nil
private def launch_driver(fetch_proc, unix_socket) : Nil
Process.run(
@driver_path,
@on_edge ? {"-p", "-e"} : {"-p"},
input: stdin_reader,
@on_edge ? {"-p", "-e", "-s", unix_socket} : {"-p", "-s", unix_socket},
input: Process::Redirect::Close,
output: Process::Redirect::Inherit,
error: stderr_writer
error: Process::Redirect::Inherit
) do |process|
fetch_pid.resolve process.pid
fetch_proc.resolve process
end

status = $?
last_exit_code = status.exit_code.to_s

Log.warn { {message: "driver process exited with #{last_exit_code}", driver_path: @driver_path} } unless status.success?

begin
@io.try &.close
rescue
ensure
@io = nil
if io = @io
io.close rescue nil
@shutting_down.send nil
@events.send(Request.new(last_exit_code, :exited))
end
rescue error
Log.error(exception: error) { "error launching driver: #{@driver_path}" }
fetch_proc.reject error
end

private def relaunch(last_exit_code : String) : Nil
@io = nil
@last_exit_code = last_exit_code.to_i
return if terminated?
@last_exit_code = last_exit_code.to_i? || -1

@shutting_down = Channel(Nil).new(1)
start_process unless modules.empty?
end

Expand Down Expand Up @@ -417,7 +470,8 @@ class PlaceOS::Driver::Protocol::Management
end
rescue error : IO::Error
# Input stream closed. This should only occur on termination
Log.debug(exception: error) { "comms closed for #{@driver_path}" }
Log.debug(exception: error) { "comms closed for #{@driver_path}" } unless terminated?
loaded.reject error
ensure
# Reject any pending request
temp_reqs = request_lock.synchronize do
Expand Down

0 comments on commit 6a0c26c

Please sign in to comment.