diff --git a/sys_test/swarm_test_rig/rig_support.py b/sys_test/swarm_test_rig/rig_support.py new file mode 100644 index 0000000000000000000000000000000000000000..f510ada098a03b13f8aa3e5ab6059345338d6070 --- /dev/null +++ b/sys_test/swarm_test_rig/rig_support.py @@ -0,0 +1,77 @@ +# -*- coding: utf-8 -*- +# +# || ____ _ __ +# +------+ / __ )(_) /_______________ _____ ___ +# | 0xBC | / __ / / __/ ___/ ___/ __ `/_ / / _ \ +# +------+ / /_/ / / /_/ /__/ / / /_/ / / /_/ __/ +# || || /_____/_/\__/\___/_/ \__,_/ /___/\___/ +# +# Copyright (C) 2019 Bitcraze AB +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, +# MA 02110-1301, USA. +import time + +from cflib.crtp import RadioDriver +from cflib.drivers.crazyradio import Crazyradio + + +class RigSupport: + def __init__(self): + self.all_uris = [ + 'radio://0/42/2M/E7E7E74201', + 'radio://0/42/2M/E7E7E74202', + 'radio://0/42/2M/E7E7E74203', + 'radio://0/42/2M/E7E7E74204', + 'radio://0/42/2M/E7E7E74205', + 'radio://0/42/2M/E7E7E74206', + 'radio://0/42/2M/E7E7E74207', + 'radio://0/42/2M/E7E7E74208', + 'radio://0/42/2M/E7E7E74209', + 'radio://0/42/2M/E7E7E7420A', + ] + + def restart_devices(self, uris): + def send_packets(uris, value): + for uri in uris: + devid, channel, datarate, address = RadioDriver.parse_uri(uri) + radio.set_channel(channel) + radio.set_data_rate(datarate) + radio.set_address(address) + + received_packet = False + for i in range(10): + # TODO krri Seems to work better with a pause here, + # investigate why + time.sleep(0.2) + result = radio.send_packet((0xf3, 0xfe, value)) + if result.ack is True: + received_packet = True + break + time.sleep(0.1) + + if not received_packet: + raise Exception('Failed to restart device') + + print('Restarting devices') + + BOOTLOADER_CMD_SYSOFF = 0x02 + BOOTLOADER_CMD_SYSON = 0x03 + + radio = Crazyradio() + send_packets(uris, BOOTLOADER_CMD_SYSOFF) + time.sleep(0.1) + send_packets(uris, BOOTLOADER_CMD_SYSON) + radio.close() + time.sleep(5) diff --git a/sys_test/swarm_test_rig/test_connection.py b/sys_test/swarm_test_rig/test_connection.py index 8b90090e077e42710e924cb4fafefd2c5dd3ac61..534862157d2c242db1d276968c024a7453586863 100644 --- a/sys_test/swarm_test_rig/test_connection.py +++ b/sys_test/swarm_test_rig/test_connection.py @@ -26,17 +26,16 @@ import unittest import cflib.crtp from cflib.crazyflie import Crazyflie -from cflib.crazyflie.log import LogConfig from cflib.crazyflie.swarm import CachedCfFactory from cflib.crazyflie.swarm import Swarm from cflib.crazyflie.syncCrazyflie import SyncCrazyflie -from sys_test.swarm_test_rig.test_rig_support import TestRigSupport +from sys_test.swarm_test_rig.rig_support import RigSupport -class TestSwarmConnection(unittest.TestCase): +class TestConnection(unittest.TestCase): def setUp(self): cflib.crtp.init_drivers(enable_debug_driver=False) - self.test_rig_support = TestRigSupport() + self.test_rig_support = RigSupport() def test_that_connection_time_scales_with_more_devices_without_cache(self): # Fixture @@ -46,7 +45,7 @@ class TestSwarmConnection(unittest.TestCase): for nr_of_devices in range(1, len(self.test_rig_support.all_uris)): # Test - uris = self.testRigSupport.all_uris[:nr_of_devices] + uris = self.test_rig_support.all_uris[:nr_of_devices] start_time = time.time() with Swarm(uris): @@ -95,13 +94,10 @@ class TestSwarmConnection(unittest.TestCase): def test_that_the_same_cf_object_can_be_connected_multiple_times(self): # Fixture + self.test_rig_support.restart_devices(self.test_rig_support.all_uris) cf = Crazyflie(rw_cache='./cache') - lg_conf = LogConfig(name='SysTest', period_in_ms=10) - lg_conf.add_variable('stabilizer.roll', 'float') - # Test for uri in self.test_rig_support.all_uris: with SyncCrazyflie(uri, cf=cf): pass - diff --git a/sys_test/swarm_test_rig/test_logging.py b/sys_test/swarm_test_rig/test_logging.py new file mode 100644 index 0000000000000000000000000000000000000000..839058cc75182dfc3372449c4c1ed12c0cb41c43 --- /dev/null +++ b/sys_test/swarm_test_rig/test_logging.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +# +# || ____ _ __ +# +------+ / __ )(_) /_______________ _____ ___ +# | 0xBC | / __ / / __/ ___/ ___/ __ `/_ / / _ \ +# +------+ / /_/ / / /_/ /__/ / / /_/ / / /_/ __/ +# || || /_____/_/\__/\___/_/ \__,_/ /___/\___/ +# +# Copyright (C) 2019 Bitcraze AB +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, +# MA 02110-1301, USA. +import unittest + +import cflib.crtp +from cflib.crazyflie import Crazyflie +from cflib.crazyflie.log import LogConfig +from cflib.crazyflie.swarm import CachedCfFactory +from cflib.crazyflie.swarm import Swarm +from cflib.crazyflie.syncCrazyflie import SyncCrazyflie +from cflib.crazyflie.syncLogger import SyncLogger +from sys_test.swarm_test_rig.rig_support import RigSupport + + +class TestLogging(unittest.TestCase): + def setUp(self): + cflib.crtp.init_drivers(enable_debug_driver=False) + self.test_rig_support = RigSupport() + + def test_that_requested_logging_is_received_properly_from_one_cf(self): + # Fixture + uri = self.test_rig_support.all_uris[0] + self.test_rig_support.restart_devices([uri]) + cf = Crazyflie(rw_cache='./cache') + + # Test and Assert + with SyncCrazyflie(uri, cf=cf) as scf: + self.assert_add_logging_and_get_non_zero_value(scf) + + def test_that_requested_logging_is_received_properly_from_all_cfs(self): + # Fixture + uris = self.test_rig_support.all_uris + self.test_rig_support.restart_devices(uris) + factory = CachedCfFactory(rw_cache='./cache') + + # Test and Assert + with Swarm(uris, factory=factory) as swarm: + swarm.parallel_safe(self.assert_add_logging_and_get_non_zero_value) + + def assert_add_logging_and_get_non_zero_value(self, scf): + log_name = 'stabilizer.roll' + expected = 0.0 + + lg_conf = LogConfig(name='SysTest', period_in_ms=10) + lg_conf.add_variable(log_name, 'float') + + with SyncLogger(scf, lg_conf) as logger: + for log_entry in logger: + actual = log_entry[1][log_name] + break + + self.assertNotAlmostEqual(expected, actual, places=4) diff --git a/sys_test/swarm_test_rig/test_memory_map.py b/sys_test/swarm_test_rig/test_memory_map.py new file mode 100644 index 0000000000000000000000000000000000000000..6529fa9a86d962c09713ed07b425f0e4780059d5 --- /dev/null +++ b/sys_test/swarm_test_rig/test_memory_map.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +# +# || ____ _ __ +# +------+ / __ )(_) /_______________ _____ ___ +# | 0xBC | / __ / / __/ ___/ ___/ __ `/_ / / _ \ +# +------+ / /_/ / / /_/ /__/ / / /_/ / / /_/ __/ +# || || /_____/_/\__/\___/_/ \__,_/ /___/\___/ +# +# Copyright (C) 2019 Bitcraze AB +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, +# MA 02110-1301, USA. +import time +import unittest + +import cflib.crtp +from cflib.crazyflie import Crazyflie +from cflib.crazyflie.log import LogConfig +from cflib.crazyflie.mem import MemoryElement +from cflib.crazyflie.swarm import CachedCfFactory +from cflib.crazyflie.swarm import Swarm +from cflib.crazyflie.syncCrazyflie import SyncCrazyflie +from cflib.crazyflie.syncLogger import SyncLogger +from sys_test.swarm_test_rig.rig_support import RigSupport + + +class TestMemoryMapping(unittest.TestCase): + def setUp(self): + cflib.crtp.init_drivers(enable_debug_driver=False) + self.test_rig_support = RigSupport() + + def test_memory_mapping_with_one_cf(self): + # Fixture + uri = self.test_rig_support.all_uris[0] + self.test_rig_support.restart_devices([uri]) + cf = Crazyflie(rw_cache='./cache') + + # Test and Assert + with SyncCrazyflie(uri, cf=cf) as scf: + self.assert_memory_mapping(scf) + + def test_memory_mapping_with_all_cfs(self): + # Fixture + uris = self.test_rig_support.all_uris + self.test_rig_support.restart_devices(uris) + factory = CachedCfFactory(rw_cache='./cache') + + # Test and Assert + with Swarm(uris, factory=factory) as swarm: + swarm.parallel_safe(self.assert_memory_mapping) + + def assert_memory_mapping(self, scf): + mems = scf.cf.mem.get_mems(MemoryElement.TYPE_MEMORY_TESTER) + count = len(mems) + self.assertEqual(1, count, 'unexpected number of memories found') + + self.verify_reading_memory_data(mems) + self.verify_writing_memory_data(mems, scf) + + def verify_writing_memory_data(self, mems, scf): + self.wrote_data = False + scf.cf.param.set_value('memTst.resetW', '1') + time.sleep(0.1) + mems[0].write_data(5, 1000, self._data_written) + while not self.wrote_data: + time.sleep(1) + log_conf = LogConfig(name='memtester', period_in_ms=100) + log_conf.add_variable('memTst.errCntW', 'uint32_t') + with SyncLogger(scf, log_conf) as logger: + for log_entry in logger: + errorCount = log_entry[1]['memTst.errCntW'] + self.assertEqual(0, errorCount) + break + + def verify_reading_memory_data(self, mems): + self.got_data = False + mems[0].read_data(5, 1000, self._data_read) + while not self.got_data: + time.sleep(1) + + def _data_read(self, mem): + self.got_data = True + + def _data_written(self, mem, address): + self.wrote_data = True diff --git a/sys_test/swarm_test_rig/test_response_time.py b/sys_test/swarm_test_rig/test_response_time.py new file mode 100644 index 0000000000000000000000000000000000000000..197cc6b5a793a2db7dd20ddb0c2159b175f17281 --- /dev/null +++ b/sys_test/swarm_test_rig/test_response_time.py @@ -0,0 +1,144 @@ +# -*- coding: utf-8 -*- +# +# || ____ _ __ +# +------+ / __ )(_) /_______________ _____ ___ +# | 0xBC | / __ / / __/ ___/ ___/ __ `/_ / / _ \ +# +------+ / /_/ / / /_/ /__/ / / /_/ / / /_/ __/ +# || || /_____/_/\__/\___/_/ \__,_/ /___/\___/ +# +# Copyright (C) 2019 Bitcraze AB +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, +# MA 02110-1301, USA. +import time +import unittest + +import cflib.crtp +from cflib.crtp.crtpstack import CRTPPacket +from cflib.crtp.crtpstack import CRTPPort +from sys_test.swarm_test_rig.rig_support import RigSupport + + +class TestResponseTime(unittest.TestCase): + ECHO = 0 + + def setUp(self): + cflib.crtp.init_drivers(enable_debug_driver=False) + self.test_rig_support = RigSupport() + + self.links = [] + + def tearDown(self): + for link in self.links: + link.close() + self.links = [] + + def test_response_time_to_one_cf(self): + # Fixture + uri = self.test_rig_support.all_uris[0] + self.test_rig_support.restart_devices([uri]) + link = self.connect_link(uri) + seq_nr = 47 + expected_max_response_time = 0.01 + + # Test + time_send_echo = time.time() + self.request_echo_with_seq_nr(link, seq_nr) + response_timestamps = self.assert_wait_for_all_seq_nrs([link], seq_nr) + response_time = response_timestamps[uri] - time_send_echo + + # Assert + self.assertLess(response_time, expected_max_response_time) + + def test_response_time_to_all_cfs(self): + # Fixture + uris = self.test_rig_support.all_uris + self.test_rig_support.restart_devices(uris) + + for uri in uris: + self.connect_link(uri) + + seq_nr = 47 + expected_max_response_time = 0.1 + expected_mean_response_time = 0.05 + + # Test + time_send_echo = time.time() + for link in self.links: + self.request_echo_with_seq_nr(link, seq_nr) + + response_timestamps = self.assert_wait_for_all_seq_nrs( + self.links, seq_nr) + + # Assert + response_times = {} + for uri, response_time in response_timestamps.items(): + response_times[uri] = response_time - time_send_echo + + times = response_times.values() + max_time = max(times) + mean_time = float(sum(times)) / len(times) + + # print(max_time, mean_time, times) + self.assertLess(max_time, expected_max_response_time) + self.assertLess(mean_time, expected_mean_response_time) + + def request_echo_with_seq_nr(self, link, seq_nr): + pk = CRTPPacket() + pk.set_header(CRTPPort.LINKCTRL, self.ECHO) + pk.data = (seq_nr,) + link.send_packet(pk) + + def assert_wait_for_all_seq_nrs(self, links, seq_nr, timeout=1): + NO_BLOCKING = -1 + + time_end = time.time() + timeout + response_timestamps = {} + while time.time() < time_end: + for link in links: + if link.uri not in response_timestamps: + response = link.receive_packet(time=NO_BLOCKING) + if self._is_response_correct_seq_nr(response, seq_nr): + response_timestamps[link.uri] = time.time() + + if len(response_timestamps) == len(self.links): + return response_timestamps + + time.sleep(0.001) + + self.fail('Time out while waiting for seq nrs.') + + def _is_response_correct_seq_nr(self, response, seq_nr): + if response is not None: + if response._get_channel() == self.ECHO and \ + response._get_port() == CRTPPort.LINKCTRL: + received_seq = response._get_data_t()[0] + if received_seq == seq_nr: + return True + + return False + + def connect_link(self, uri): + link = cflib.crtp.get_link_driver(uri, self._link_quality_cb, + self._link_error_cb) + self.assertIsNotNone(link) + self.links.append(link) + + return link + + def _link_quality_cb(self, percentage): + pass + + def _link_error_cb(self, errmsg): + self.fail()