Skip to content

Commit

Permalink
update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Williangalvani committed Sep 13, 2024
1 parent a06e110 commit 1b98701
Showing 1 changed file with 20 additions and 42 deletions.
62 changes: 20 additions & 42 deletions tests/test_mavftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,11 @@
import logging
from pymavlink import mavutil
from pymavlink.mavftp import FTP_OP, MAVFTP, MAVFTPReturn

from pymavlink.mavftp import FtpError
from pymavlink.mavftp import OP_ListDirectory
from pymavlink.mavftp import OP_ReadFile
from pymavlink.mavftp import OP_Ack
from pymavlink.mavftp import OP_Nack
from pymavlink.mavftp import ERR_None
from pymavlink.mavftp import ERR_Fail
from pymavlink.mavftp import ERR_FailErrno
from pymavlink.mavftp import ERR_InvalidDataSize
from pymavlink.mavftp import ERR_InvalidSession
from pymavlink.mavftp import ERR_NoSessionsAvailable
from pymavlink.mavftp import ERR_EndOfFile
from pymavlink.mavftp import ERR_UnknownCommand
from pymavlink.mavftp import ERR_FileExists
from pymavlink.mavftp import ERR_FileProtected
from pymavlink.mavftp import ERR_FileNotFound
#from pymavlink.mavftp import ERR_NoErrorCodeInPayload
#from pymavlink.mavftp import ERR_NoErrorCodeInNack
#from pymavlink.mavftp import ERR_NoFilesystemErrorInPayload
from pymavlink.mavftp import ERR_InvalidErrorCode
#from pymavlink.mavftp import ERR_PayloadTooLarge
#from pymavlink.mavftp import ERR_InvalidOpcode
from pymavlink.mavftp import ERR_InvalidArguments
from pymavlink.mavftp import ERR_PutAlreadyInProgress
from pymavlink.mavftp import ERR_FailToOpenLocalFile
from pymavlink.mavftp import ERR_RemoteReplyTimeout


class TestMAVFTPPayloadDecoding(unittest.TestCase):
"""Test MAVFTP payload decoding"""
Expand Down Expand Up @@ -90,52 +68,52 @@ def test_decode_ftp_ack_and_nack(self):
},
{
"name": "Generic Failure",
"op": self.ftp_operation(seq=2, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([ERR_Fail])),
"op": self.ftp_operation(seq=2, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([FtpError.Fail])),
"expected_message": "ListDirectory failed, generic error"
},
{
"name": "System Error",
"op": self.ftp_operation(seq=3, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([ERR_FailErrno, 1])), # System error 1
"op": self.ftp_operation(seq=3, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([FtpError.FailErrno, 1])), # System error 1
"expected_message": "ListDirectory failed, system error 1"
},
{
"name": "Invalid Data Size",
"op": self.ftp_operation(seq=4, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([ERR_InvalidDataSize])),
"op": self.ftp_operation(seq=4, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([FtpError.InvalidDataSize])),
"expected_message": "ListDirectory failed, invalid data size"
},
{
"name": "Invalid Session",
"op": self.ftp_operation(seq=5, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([ERR_InvalidSession])),
"op": self.ftp_operation(seq=5, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([FtpError.InvalidSession])),
"expected_message": "ListDirectory failed, session is not currently open"
},
{
"name": "No Sessions Available",
"op": self.ftp_operation(seq=6, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([ERR_NoSessionsAvailable])),
"op": self.ftp_operation(seq=6, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([FtpError.NoSessionsAvailable])),
"expected_message": "ListDirectory failed, no sessions available"
},
{
"name": "End of File",
"op": self.ftp_operation(seq=7, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([ERR_EndOfFile])),
"op": self.ftp_operation(seq=7, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([FtpError.EndOfFile])),
"expected_message": "ListDirectory failed, offset past end of file"
},
{
"name": "Unknown Command",
"op": self.ftp_operation(seq=8, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([ERR_UnknownCommand])),
"op": self.ftp_operation(seq=8, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([FtpError.UnknownCommand])),
"expected_message": "ListDirectory failed, unknown command"
},
{
"name": "File Exists",
"op": self.ftp_operation(seq=9, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([ERR_FileExists])),
"op": self.ftp_operation(seq=9, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([FtpError.FileExists])),
"expected_message": "ListDirectory failed, file/directory already exists"
},
{
"name": "File Protected",
"op": self.ftp_operation(seq=10, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([ERR_FileProtected])),
"op": self.ftp_operation(seq=10, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([FtpError.FileProtected])),
"expected_message": "ListDirectory failed, file/directory is protected"
},
{
"name": "File Not Found",
"op": self.ftp_operation(seq=11, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([ERR_FileNotFound])),
"op": self.ftp_operation(seq=11, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([FtpError.FileNotFound])),
"expected_message": "ListDirectory failed, file/directory not found"
},
{
Expand All @@ -145,17 +123,17 @@ def test_decode_ftp_ack_and_nack(self):
},
{
"name": "No Error Code in Nack",
"op": self.ftp_operation(seq=13, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([ERR_None])),
"op": self.ftp_operation(seq=13, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([FtpError.Success])),
"expected_message": "ListDirectory failed, no error code"
},
{
"name": "No Filesystem Error in Payload",
"op": self.ftp_operation(seq=14, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([ERR_FailErrno])),
"op": self.ftp_operation(seq=14, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([FtpError.FailErrno])),
"expected_message": "ListDirectory failed, file-system error missing in payload"
},
{
"name": "Invalid Error Code",
"op": self.ftp_operation(seq=15, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([ERR_InvalidErrorCode])),
"op": self.ftp_operation(seq=15, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([FtpError.InvalidErrorCode])),
"expected_message": "ListDirectory failed, invalid error code"
},
{
Expand All @@ -170,12 +148,12 @@ def test_decode_ftp_ack_and_nack(self):
},
{
"name": "Unknown Opcode in Request",
"op": self.ftp_operation(seq=19, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([ERR_UnknownCommand])), # Assuming 100 is an unknown opcode
"op": self.ftp_operation(seq=19, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([FtpError.UnknownCommand])), # Assuming 100 is an unknown opcode
"expected_message": "ListDirectory failed, unknown command"
},
{
"name": "Payload with System Error",
"op": self.ftp_operation(seq=20, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([ERR_FailErrno, 2])), # System error 2
"op": self.ftp_operation(seq=20, opcode=OP_Nack, req_opcode=OP_ListDirectory, payload=bytes([FtpError.FailErrno, 2])), # System error 2
"expected_message": "ListDirectory failed, system error 2"
},
{
Expand All @@ -202,7 +180,7 @@ def test_decode_ftp_ack_and_nack(self):
self.log_stream.truncate(0)

# Invalid Arguments
ret = MAVFTPReturn("Command arguments", ERR_InvalidArguments)
ret = MAVFTPReturn("Command arguments", FtpError.InvalidArguments)
ret.display_message()
log_output = self.log_stream.getvalue().strip()
self.assertIn("Command arguments failed, invalid arguments", log_output, "Expected invalid arguments message")
Expand All @@ -221,23 +199,23 @@ def test_decode_ftp_ack_and_nack(self):
self.log_stream.truncate(0)

# Put already in progress
ret = MAVFTPReturn("Put", ERR_PutAlreadyInProgress)
ret = MAVFTPReturn("Put", FtpError.PutAlreadyInProgress)
ret.display_message()
log_output = self.log_stream.getvalue().strip()
self.assertIn("Put failed, put already in progress", log_output, "Expected put already in progress message")
self.log_stream.seek(0)
self.log_stream.truncate(0)

# Fail to open local file
ret = MAVFTPReturn("Put", ERR_FailToOpenLocalFile)
ret = MAVFTPReturn("Put", FtpError.FailToOpenLocalFile)
ret.display_message()
log_output = self.log_stream.getvalue().strip()
self.assertIn("Put failed, failed to open local file", log_output, "Expected fail to open local file message")
self.log_stream.seek(0)
self.log_stream.truncate(0)

# Remote Reply Timeout
ret = MAVFTPReturn("Put", ERR_RemoteReplyTimeout)
ret = MAVFTPReturn("Put", FtpError.RemoteReplyTimeout)
ret.display_message()
log_output = self.log_stream.getvalue().strip()
self.assertIn("Put failed, remote reply timeout", log_output, "Expected remote reply timeout message")
Expand Down

0 comments on commit 1b98701

Please sign in to comment.