Skip to content
Snippets Groups Projects
Commit 7afd06c3 authored by Kristoffer Richardsson's avatar Kristoffer Richardsson
Browse files

#119 Refactoring. Separated sys test support functionality into class

parent 3e8ee31e
No related branches found
No related tags found
No related merge requests found
File moved
......@@ -30,36 +30,23 @@ from cflib.crazyflie.log import LogConfig
from cflib.crazyflie.swarm import CachedCfFactory
from cflib.crazyflie.swarm import Swarm
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie
from cflib.crtp import RadioDriver
from cflib.drivers.crazyradio import Crazyradio
from sys_test.swarm_test_rig.test_rig_support import TestRigSupport
class TestSwarmConnection(unittest.TestCase):
def setUp(self):
cflib.crtp.init_drivers(enable_debug_driver=False)
self.all_uris = [
'radio://0/42/2M/E7E7E74201',
'radio://0/42/2M/E7E7E74202',
'radio://0/42/2M/E7E7E74203',
'radio://0/42/2M/E7E7E74204',
'radio://0/42/2M/E7E7E74205',
'radio://0/42/2M/E7E7E74206',
'radio://0/42/2M/E7E7E74207',
'radio://0/42/2M/E7E7E74208',
'radio://0/42/2M/E7E7E74209',
'radio://0/42/2M/E7E7E7420A',
]
self.test_rig_support = TestRigSupport()
def test_that_connection_time_scales_with_more_devices_without_cache(self):
# Fixture
self.restart_devices(self.all_uris)
self.test_rig_support.restart_devices(self.test_rig_support.all_uris)
EXPECTED_CONNECTION_TIME = 5
for nr_of_devices in range(1, len(self.all_uris)):
for nr_of_devices in range(1, len(self.test_rig_support.all_uris)):
# Test
uris = self.all_uris[:nr_of_devices]
uris = self.testRigSupport.all_uris[:nr_of_devices]
start_time = time.time()
with Swarm(uris):
......@@ -75,18 +62,18 @@ class TestSwarmConnection(unittest.TestCase):
def test_that_connection_time_scales_with_more_devices_with_cache(self):
# Fixture
self.restart_devices(self.all_uris)
self.test_rig_support.restart_devices(self.test_rig_support.all_uris)
# Fill caches first by connecting to all devices
factory = CachedCfFactory(rw_cache='./cache')
with Swarm(self.all_uris, factory=factory):
with Swarm(self.test_rig_support.all_uris, factory=factory):
pass
EXPECTED_CONNECTION_TIME = 1.5
for nr_of_devices in range(1, len(self.all_uris)):
for nr_of_devices in range(1, len(self.test_rig_support.all_uris)):
# Test
uris = self.all_uris[:nr_of_devices]
uris = self.test_rig_support.all_uris[:nr_of_devices]
start_time = time.time()
with Swarm(uris, factory=factory):
......@@ -104,7 +91,7 @@ class TestSwarmConnection(unittest.TestCase):
# Fixture
# Test
# Assert
self.restart_devices(self.all_uris)
self.test_rig_support.restart_devices(self.test_rig_support.all_uris)
def test_that_the_same_cf_object_can_be_connected_multiple_times(self):
# Fixture
......@@ -114,37 +101,7 @@ class TestSwarmConnection(unittest.TestCase):
lg_conf.add_variable('stabilizer.roll', 'float')
# Test
for uri in self.all_uris:
for uri in self.test_rig_support.all_uris:
with SyncCrazyflie(uri, cf=cf):
pass
# Utility functions ---------------------------------------------
def restart_devices(self, uris):
def send_packets(uris, value):
for uri in uris:
devid, channel, datarate, address = RadioDriver.parse_uri(uri)
radio.set_channel(channel)
radio.set_data_rate(datarate)
radio.set_address(address)
received_packet = False
for i in range(100):
result = radio.send_packet((0xf3, 0xfe, value))
if result.ack is True:
received_packet = True
break
self.assertTrue(received_packet)
print('Restarting devices')
BOOTLOADER_CMD_SYSOFF = 0x02
BOOTLOADER_CMD_SYSON = 0x03
radio = Crazyradio()
send_packets(uris, BOOTLOADER_CMD_SYSOFF)
time.sleep(0.1)
send_packets(uris, BOOTLOADER_CMD_SYSON)
radio.close()
time.sleep(5)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment