""" Tests that emulates the behaviour of a WorkbenchServer. """ from ereuse_devicehub.client import UserClient from ereuse_devicehub.resources.device.models import Device from ereuse_devicehub.resources.event.models import EraseSectors, Install, Snapshot, \ StressTest from tests.conftest import file def test_workbench_server_phases(user: UserClient): """ Tests the phases described in the docs section `Snapshots from Workbench `_. """ # 1. Snapshot with sync / rate / benchmarks / test data storage s = file('workbench-server-1.snapshot') snapshot, _ = user.post(res=Snapshot, data=s) assert not snapshot['closed'], 'Snapshot must be waiting for the new events' # 2. stress test st = file('workbench-server-2.stress-test') st['snapshot'] = snapshot['id'] stress_test, _ = user.post(res=StressTest, data=st) # 3. erase ssd_id, hdd_id = snapshot['components'][4]['id'], snapshot['components'][5]['id'] e = file('workbench-server-3.erase') e['snapshot'], e['device'] = snapshot['id'], ssd_id erase1, _ = user.post(res=EraseSectors, data=e) # 3 bis. a second erase e = file('workbench-server-3.erase') e['snapshot'], e['device'] = snapshot['id'], hdd_id erase2, _ = user.post(res=EraseSectors, data=e) # 4. Install i = file('workbench-server-4.install') i['snapshot'], i['device'] = snapshot['id'], ssd_id install, _ = user.post(res=Install, data=i) # Check events have been appended in Snapshot and devices # and that Snapshot is closed snapshot, _ = user.get(res=Snapshot, item=snapshot['id']) events = snapshot['events'] assert len(events) == 9 assert events[0]['type'] == 'Rate' assert events[0]['device'] == 1 assert events[0]['closed'] assert events[0]['type'] == 'WorkbenchRate' assert events[0]['device'] == 1 assert events[1]['type'] == 'BenchmarkProcessor' assert events[1]['device'] == 5 assert events[2]['type'] == 'BenchmarkProcessorSysbench' assert events[2]['device'] == 5 assert events[3]['type'] == 'BenchmarkDataStorage' assert events[3]['device'] == 6 assert events[4]['type'] == 'TestDataStorage' assert events[4]['device'] == 6 assert events[4]['type'] == 'BenchmarkDataStorage' assert events[4]['device'] == 7 assert events[5]['type'] == 'StressTest' assert events[5]['device'] == 1 assert events[6]['type'] == 'EraseSectors' assert events[6]['device'] == 6 assert events[7]['type'] == 'EraseSectors' assert events[7]['device'] == 7 assert events[8]['type'] == 'Install' assert events[8]['device'] == 6 assert snapshot['closed'] assert not snapshot['error'] pc, _ = user.get(res=Device, item=snapshot['id']) assert len(pc['events']) == 10 # todo shall I add child events? def test_workbench_server_condensed(user: UserClient): """ As :def:`.test_workbench_server_phases` but all the events condensed in only one big ``Snapshot`` file, as described in the docs. """ s = file('workbench-server-1.snapshot') s['events'].append(file('workbench-server-2.stress-test')) s['components'][5]['erasure'] = file('workbench-server-3.erase') s['components'][5]['installation'] = file('workbench-server-4.install') s['components'][6]['erasure'] = file('workbench-server-3.erase') snapshot, _ = user.post(res=Snapshot, data=s) events = snapshot['events'] assert events[0]['type'] == 'Rate' assert events[0]['device'] == 1 assert events[0]['closed'] assert events[0]['type'] == 'WorkbenchRate' assert events[0]['device'] == 1 assert events[1]['type'] == 'BenchmarkProcessor' assert events[1]['device'] == 5 assert events[2]['type'] == 'BenchmarkProcessorSysbench' assert events[2]['device'] == 5 assert events[3]['type'] == 'BenchmarkDataStorage' assert events[3]['device'] == 6 assert events[4]['type'] == 'TestDataStorage' assert events[4]['device'] == 6 assert events[4]['type'] == 'BenchmarkDataStorage' assert events[4]['device'] == 7 assert events[5]['type'] == 'StressTest' assert events[5]['device'] == 1 assert events[6]['type'] == 'EraseSectors' assert events[6]['device'] == 6 assert events[7]['type'] == 'EraseSectors' assert events[7]['device'] == 7 assert events[8]['type'] == 'Install' assert events[8]['device'] == 6 assert snapshot['closed'] assert not snapshot['error']