Skip to content

Commit

Permalink
fix: modules failing to connect may not stop (#165)
Browse files Browse the repository at this point in the history
  • Loading branch information
stakach authored May 9, 2023
1 parent 057609b commit be272e8
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 123 deletions.
2 changes: 1 addition & 1 deletion shard.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: placeos-driver
version: 6.8.4
version: 6.8.5
crystal: ">= 1.0.0"

dependencies:
Expand Down
8 changes: 4 additions & 4 deletions src/placeos-driver/protocol/management.cr
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ class PlaceOS::Driver::Protocol::Management
alias DebugCallback = String -> Nil

# Core should update this callback to route requests
property on_exec : Proc(Request, Proc(Request, Nil), Nil) = ->(request : Request, callback : Proc(Request, Nil)) {}
property on_setting : Proc(String, String, YAML::Any, Nil) = ->(module_id : String, setting_name : String, setting_value : YAML::Any) {}
property on_exec : Proc(Request, Proc(Request, Nil), Nil) = ->(_request : Request, _callback : Proc(Request, Nil)) {}
property on_setting : Proc(String, String, YAML::Any, Nil) = ->(_module_id : String, _setting_name : String, _setting_value : YAML::Any) {}

# A request for the system model as defined in the database
property on_system_model : Proc(Request, Proc(Request, Nil), Nil) = ->(request : Request, callback : Proc(Request, Nil)) {}
property on_system_model : Proc(Request, Proc(Request, Nil), Nil) = ->(_request : Request, _callback : Proc(Request, Nil)) {}

# These are the events coming from the driver where edge is expected to update redis on the drivers behalf
enum RedisAction
Expand All @@ -29,7 +29,7 @@ class PlaceOS::Driver::Protocol::Management
PUBLISH
end

property on_redis : Proc(RedisAction, String, String, String?, Nil) = ->(action : RedisAction, hash_id : String, key_name : String, status_value : String?) {}
property on_redis : Proc(RedisAction, String, String, String?, Nil) = ->(_action : RedisAction, _hash_id : String, _key_name : String, _status_value : String?) {}

getter? terminated = false
getter pid : Int64 = -1
Expand Down
237 changes: 121 additions & 116 deletions src/placeos-driver/transport/ssh.cr
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ class PlaceOS::Driver
channel
end

# ameba:disable Metrics/CyclomaticComplexity
def connect(connect_timeout : Int32 = 10) : Nil
@connect_lock.synchronize do
return if @terminated || @connecting || @messages
Expand All @@ -67,148 +66,154 @@ class PlaceOS::Driver
max_interval: 10.seconds,
randomise: 500.milliseconds
) do
supported_methods = nil
start_socket(connect_timeout) unless @terminated
end

begin
messages = @messages
@messages = Channel(Bytes).new
messages.try &.close
@connect_lock.synchronize { @connecting = false }
disconnect if @terminated
end

# Grab the authentication settings (using not_nil for schema generation)
@ssh_settings = settings = @settings.get { setting?(Settings, :ssh) }.not_nil!

# Open a connection
@socket = socket = TCPSocket.new(@ip, @port, connect_timeout: connect_timeout)
socket.tcp_nodelay = true
socket.sync = true

# Negotiate the SSH session
session = SSH2::Session.new(socket)

# Attempt to authenticate
supported_methods = session.login_with_noauth(settings.username)
if supported_methods
if supported_methods.is_a?(Array(String))
logger.debug { "supported auhentication methods: #{supported_methods}" }

supported_methods.each do |auth_method|
case auth_method
when "publickey"
if prikey = settings.private_key
begin
pubkey = settings.public_key.not_nil!
session.login_with_data(settings.username, prikey, pubkey, settings.passphrase)
rescue SSH2::SessionError
logger.warn { "publickey auth failed, incorrect key" }
end
else
logger.debug { "ignoring publickey authentication as no key provided" }
end
when "password"
if password = settings.password
begin
session.login(settings.username, password)
rescue SSH2::SessionError
logger.warn { "password auth failed, incorrect password" }
end
else
logger.debug { "ignoring password authentication as no password provided" }
# ameba:disable Metrics/CyclomaticComplexity
private def start_socket(connect_timeout)
supported_methods = nil

begin
messages = @messages
@messages = Channel(Bytes).new
messages.try &.close

# Grab the authentication settings (using not_nil for schema generation)
@ssh_settings = settings = @settings.get { setting?(Settings, :ssh) }.not_nil!

# Open a connection
@socket = socket = TCPSocket.new(@ip, @port, connect_timeout: connect_timeout)
socket.tcp_nodelay = true
socket.sync = true

# Negotiate the SSH session
session = SSH2::Session.new(socket)

# Attempt to authenticate
supported_methods = session.login_with_noauth(settings.username)
if supported_methods
if supported_methods.is_a?(Array(String))
logger.debug { "supported auhentication methods: #{supported_methods}" }

supported_methods.each do |auth_method|
case auth_method
when "publickey"
if prikey = settings.private_key
begin
pubkey = settings.public_key.not_nil!
session.login_with_data(settings.username, prikey, pubkey, settings.passphrase)
rescue SSH2::SessionError
logger.warn { "publickey auth failed, incorrect key" }
end
when "keyboard-interactive"
if settings.password
begin
session.interactive_login(settings.username) { @ssh_settings.not_nil!.password.not_nil! }
rescue SSH2::SessionError
logger.warn { "password auth failed, incorrect password" }
end
else
logger.debug { "ignoring keyboard-interactive authentication as no password provided" }
else
logger.debug { "ignoring publickey authentication as no key provided" }
end
when "password"
if password = settings.password
begin
session.login(settings.username, password)
rescue SSH2::SessionError
logger.warn { "password auth failed, incorrect password" }
end
else
logger.debug { "ignoring unsupported authentication method: #{auth_method}" }
logger.debug { "ignoring password authentication as no password provided" }
end

break if session.authenticated?
end
else
if password = settings.password
begin
session.login(settings.username, password)
rescue error : SSH2::SessionError
when "keyboard-interactive"
if settings.password
begin
session.interactive_login(settings.username) { @ssh_settings.not_nil!.password.not_nil! }
rescue SSH2::SessionError
logger.warn { "password auth failed, either not supported or incorrect password" }
logger.warn { "password auth failed, incorrect password" }
end
else
logger.debug { "ignoring keyboard-interactive authentication as no password provided" }
end
else
logger.debug { "ignoring unsupported authentication method: #{auth_method}" }
end

if !session.authenticated? && (prikey = settings.private_key)
break if session.authenticated?
end
else
if password = settings.password
begin
session.login(settings.username, password)
rescue error : SSH2::SessionError
begin
pubkey = settings.public_key.not_nil!
session.login_with_data(settings.username, prikey, pubkey, settings.passphrase)
session.interactive_login(settings.username) { @ssh_settings.not_nil!.password.not_nil! }
rescue SSH2::SessionError
logger.warn { "publickey auth failed, either not supported or incorrect key" }
logger.warn { "password auth failed, either not supported or incorrect password" }
end
end
end
end

raise "all available authentication methods failed" unless session.authenticated?

# Attempt to open a shell - more often then not shell is the only supported method
begin
Tasker.timeout(5.seconds) {
@shell = shell = session.open_session
# Set mode https://tools.ietf.org/html/rfc4254#section-8
shell.request_pty(settings.term || "vt100", [{SSH2::TerminalMode::ECHO, 0u32}])
shell.shell
}
rescue error
# It may not be fatal if a shell is unable to be negotiated
# however this would be a rare device so we log the issue.
if shell = @shell
shell.close
@shell = nil
end
logger.warn(exception: error) { "unable to negotiage a shell on SSH connection" }
end
@session = session

# This will track the socket state when there is no shell
keepalive(settings.keepalive || 30)

# Start consuming data from the shell
spawn(same_thread: true) do
if @shell
consume_messages
else
# if we are not running in shell mode we want to connect on messages close
@messages.try &.receive?
perform_reconnect
if !session.authenticated? && (prikey = settings.private_key)
begin
pubkey = settings.public_key.not_nil!
session.login_with_data(settings.username, prikey, pubkey, settings.passphrase)
rescue SSH2::SessionError
logger.warn { "publickey auth failed, either not supported or incorrect key" }
end
end
end
end

# Enable queuing
@queue.online = true
rescue error
logger.info(exception: error) {
supported_methods = ", supported authentication methods: #{supported_methods}" if supported_methods
"connecting to device#{supported_methods}"
raise "all available authentication methods failed" unless session.authenticated?

# Attempt to open a shell - more often then not shell is the only supported method
begin
Tasker.timeout(5.seconds) {
@shell = shell = session.open_session
# Set mode https://tools.ietf.org/html/rfc4254#section-8
shell.request_pty(settings.term || "vt100", [{SSH2::TerminalMode::ECHO, 0u32}])
shell.shell
}
@queue.online = false
begin
@socket.try &.close
@socket = nil
rescue error
# It may not be fatal if a shell is unable to be negotiated
# however this would be a rare device so we log the issue.
if shell = @shell
shell.close
@shell = nil
@session = nil
rescue
end
raise error
logger.warn(exception: error) { "unable to negotiage a shell on SSH connection" }
end
@session = session

# This will track the socket state when there is no shell
keepalive(settings.keepalive || 30)

# Start consuming data from the shell
spawn(same_thread: true) do
if @shell
consume_messages
else
# if we are not running in shell mode we want to connect on messages close
@messages.try &.receive?
perform_reconnect
end
end
end

@connect_lock.synchronize { @connecting = false }
# Enable queuing
@queue.online = true
rescue error
logger.info(exception: error) {
supported_methods = ", supported authentication methods: #{supported_methods}" if supported_methods
"connecting to device#{supported_methods}"
}
@queue.online = false
begin
@socket.try &.close
@socket = nil
@shell = nil
@session = nil
rescue
end
raise error
end
end

protected def perform_reconnect
Expand Down
3 changes: 2 additions & 1 deletion src/placeos-driver/transport/tcp.cr
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ class PlaceOS::Driver::TransportTCP < PlaceOS::Driver::Transport
max_interval: 10.seconds,
randomise: 500.milliseconds
) do
start_socket(connect_timeout)
start_socket(connect_timeout) unless @terminated
end
disconnect if @terminated
end
end

Expand Down
3 changes: 2 additions & 1 deletion src/placeos-driver/transport/udp.cr
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ class PlaceOS::Driver::TransportUDP < PlaceOS::Driver::Transport
max_interval: 10.seconds,
randomise: 500.milliseconds
) do
start_socket(connect_timeout)
start_socket(connect_timeout) unless @terminated
end
disconnect if @terminated
end

private def start_socket(connect_timeout)
Expand Down

0 comments on commit be272e8

Please sign in to comment.