diff --git a/CHANGELOG.md b/CHANGELOG.md index dcbe712c..f2f23d15 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ ml). ## [1.0.4-beta] - [addend] #95 adding endpoint for check the hash of one report +- [addend] #98 adding endpoint for insert a new live +- [addend] #98 adding endpoint for get all licences in one query ## [1.0.3-beta] - [addend] #85 add mac of network adapter to device hid diff --git a/ereuse_devicehub/config.py b/ereuse_devicehub/config.py index d8734f65..3534ba98 100644 --- a/ereuse_devicehub/config.py +++ b/ereuse_devicehub/config.py @@ -14,6 +14,7 @@ from ereuse_devicehub.resources.device import definitions from ereuse_devicehub.resources.documents import documents from ereuse_devicehub.resources.enums import PriceSoftware from ereuse_devicehub.resources.versions import versions +from ereuse_devicehub.resources.licences import licences from ereuse_devicehub.resources.metric import definitions as metric_def @@ -29,6 +30,7 @@ class DevicehubConfig(Config): import_resource(documents), import_resource(inventory), import_resource(versions), + import_resource(licences), import_resource(metric_def), ),) PASSWORD_SCHEMES = {'pbkdf2_sha256'} # type: Set[str] @@ -48,6 +50,7 @@ class DevicehubConfig(Config): """ TMP_SNAPSHOTS = config('TMP_SNAPSHOTS', '/tmp/snapshots') + TMP_LIVES = config('TMP_LIVES', '/tmp/lives') """This var is for save a snapshots in json format when fail something""" API_DOC_CONFIG_TITLE = 'Devicehub' API_DOC_CONFIG_VERSION = '0.2' diff --git a/ereuse_devicehub/resources/action/__init__.py b/ereuse_devicehub/resources/action/__init__.py index 1d70d631..6367a28e 100644 --- a/ereuse_devicehub/resources/action/__init__.py +++ b/ereuse_devicehub/resources/action/__init__.py @@ -3,7 +3,8 @@ from typing import Callable, Iterable, Tuple from teal.resource import Converters, Resource from ereuse_devicehub.resources.action import schemas -from ereuse_devicehub.resources.action.views import ActionView, AllocateView, DeallocateView +from ereuse_devicehub.resources.action.views import (ActionView, AllocateView, DeallocateView, + LiveView) from ereuse_devicehub.resources.device.sync import Sync @@ -214,8 +215,9 @@ class PrepareDef(ActionDef): class LiveDef(ActionDef): - VIEW = None + VIEW = LiveView SCHEMA = schemas.Live + AUTH = False class ReserveDef(ActionDef): diff --git a/ereuse_devicehub/resources/action/models.py b/ereuse_devicehub/resources/action/models.py index 9a3e8bbf..af5f1047 100644 --- a/ereuse_devicehub/resources/action/models.py +++ b/ereuse_devicehub/resources/action/models.py @@ -1304,6 +1304,9 @@ class Live(JoinedWithOneDeviceMixin, ActionWithOneDevice): serial_number.comment = """The serial number of the Hard Disk in lower case.""" usage_time_hdd = Column(Interval, nullable=True) snapshot_uuid = Column(UUID(as_uuid=True)) + software = Column(DBEnum(SnapshotSoftware), nullable=False) + software_version = Column(StrictVersionType(STR_SM_SIZE), nullable=False) + licence_version = Column(StrictVersionType(STR_SM_SIZE), nullable=False) @property def final_user_code(self): diff --git a/ereuse_devicehub/resources/action/schemas.py b/ereuse_devicehub/resources/action/schemas.py index b29aa06a..fcc3e767 100644 --- a/ereuse_devicehub/resources/action/schemas.py +++ b/ereuse_devicehub/resources/action/schemas.py @@ -420,10 +420,26 @@ class Prepare(ActionWithMultipleDevices): class Live(ActionWithOneDevice): __doc__ = m.Live.__doc__ + """ + The Snapshot updates the state of the device with information about + its components and actions performed at them. + + See docs for more info. + """ + uuid = UUID() + software = EnumField(SnapshotSoftware, + required=True, + description='The software that generated this Snapshot.') + version = Version(required=True, description='The version of the software.') final_user_code = SanitizedStr(data_key="finalUserCode", dump_only=True) - serial_number = SanitizedStr(data_key="serialNumber", dump_only=True) - usage_time_hdd = TimeDelta(data_key="usageTimeHdd", precision=TimeDelta.HOURS, dump_only=True) - usage_time_allocate = TimeDelta(data_key="usageTimeAllocate", + licence_version = Version(required=True, description='The version of the software.') + components = NestedOn(s_device.Component, + many=True, + description='A list of components that are inside of the device' + 'at the moment of this Snapshot.' + 'Order is preserved, so the component num 0 when' + 'submitting is the component num 0 when returning it back.') + usage_time_allocate = TimeDelta(data_key='usageTimeAllocate', required=False, precision=TimeDelta.HOURS, dump_only=True) diff --git a/ereuse_devicehub/resources/action/views.py b/ereuse_devicehub/resources/action/views.py index 25c744b8..f8234e32 100644 --- a/ereuse_devicehub/resources/action/views.py +++ b/ereuse_devicehub/resources/action/views.py @@ -6,21 +6,17 @@ import shutil from datetime import datetime, timedelta from distutils.version import StrictVersion from uuid import UUID -from flask.json import jsonify -from flask import current_app as app, request, g, redirect +from flask import current_app as app, request, g from sqlalchemy.util import OrderedSet from teal.marshmallow import ValidationError from teal.resource import View from teal.db import ResourceNotFound from ereuse_devicehub.db import db -from ereuse_devicehub.resources.device.models import Device, Computer -from ereuse_devicehub.resources.action.models import Action, RateComputer, Snapshot, VisualTest, \ - InitTransfer from ereuse_devicehub.query import things_response -from ereuse_devicehub.resources.action.models import (Action, RateComputer, Snapshot, VisualTest, - InitTransfer, Live, Allocate, Deallocate) +from ereuse_devicehub.resources.action.models import (Action, RateComputer, Snapshot, VisualTest, + InitTransfer, Live, Allocate, Deallocate) from ereuse_devicehub.resources.device.models import Device, Computer, DataStorage from ereuse_devicehub.resources.action.rate.v1_0 import CannotRate from ereuse_devicehub.resources.enums import SnapshotSoftware, Severity @@ -29,7 +25,7 @@ from ereuse_devicehub.resources.user.exceptions import InsufficientPermission SUPPORTED_WORKBENCH = StrictVersion('11.0') -def save_json(req_json, tmp_snapshots, user): +def save_json(req_json, tmp_snapshots, user, live=False): """ This function allow save a snapshot in json format un a TMP_SNAPSHOTS directory The file need to be saved with one name format with the stamptime and uuid joins @@ -44,6 +40,8 @@ def save_json(req_json, tmp_snapshots, user): name_file = f"{year}-{month}-{day}-{hour}-{minutes}_{user}_{uuid}.json" path_dir_base = os.path.join(tmp_snapshots, user) + if live: + path_dir_base = tmp_snapshots path_errors = os.path.join(path_dir_base, 'errors') path_fixeds = os.path.join(path_dir_base, 'fixeds') path_name = os.path.join(path_errors, name_file) @@ -58,11 +56,13 @@ def save_json(req_json, tmp_snapshots, user): return path_name -def move_json(tmp_snapshots, path_name, user): +def move_json(tmp_snapshots, path_name, user, live=False): """ This function move the json than it's correct """ path_dir_base = os.path.join(tmp_snapshots, user) + if live: + path_dir_base = tmp_snapshots if os.path.isfile(path_name): shutil.copy(path_name, path_dir_base) os.remove(path_name) @@ -96,10 +96,119 @@ class AllocateMix(): class AllocateView(AllocateMix, View): model = Allocate + class DeallocateView(AllocateMix, View): model = Deallocate +class LiveView(View): + def post(self): + """Posts an action.""" + res_json = request.get_json(validate=False) + tmp_snapshots = app.config['TMP_LIVES'] + path_live = save_json(res_json, tmp_snapshots, '', live=True) + res_json.pop('debug', None) + res_json_valid = self.schema.load(res_json) + live = self.live(res_json_valid) + db.session.add(live) + db.session().final_flush() + ret = self.schema.jsonify(live) + ret.status_code = 201 + db.session.commit() + move_json(tmp_snapshots, path_live, '', live=True) + return ret + + def get_hdd_details(self, snapshot, device): + """We get the liftime and serial_number of the disk""" + usage_time_hdd = None + serial_number = None + for hd in snapshot['components']: + if not isinstance(hd, DataStorage): + continue + + serial_number = hd.serial_number + for act in hd.actions: + if not act.type == "TestDataStorage": + continue + usage_time_hdd = act.lifetime + break + + if usage_time_hdd: + break + + if not serial_number: + """There aren't any disk""" + raise ResourceNotFound("There aren't any disk in this device {}".format(device)) + return usage_time_hdd, serial_number + + def get_hid(self, snapshot): + device = snapshot.get('device') # type: Computer + components = snapshot.get('components') + if not device: + return None + if not components: + return device.hid + macs = [c.serial_number for c in components + if c.type == 'NetworkAdapter' and c.serial_number is not None] + macs.sort() + mac = '' + hid = device.hid + if not hid: + return hid + if macs: + mac = "-{mac}".format(mac=macs[0]) + hid += mac + return hid + + def live(self, snapshot): + """If the device.allocated == True, then this snapshot create an action live.""" + hid = self.get_hid(snapshot) + if not hid or not Device.query.filter( + Device.hid==hid).count(): + raise ValidationError('Device not exist.') + + device = Device.query.filter( + Device.hid==hid).one() + # Is not necessary + if not device: + raise ValidationError('Device not exist.') + if not device.allocated: + raise ValidationError('Sorry this device is not allocated.') + + usage_time_hdd, serial_number = self.get_hdd_details(snapshot, device) + + data_live = {'usage_time_hdd': usage_time_hdd, + 'serial_number': serial_number, + 'snapshot_uuid': snapshot['uuid'], + 'description': '', + 'software': snapshot['software'], + 'software_version': snapshot['version'], + 'licence_version': snapshot['licence_version'], + 'author_id': device.owner_id, + 'agent_id': device.owner.individual.id, + 'device': device} + + live = Live(**data_live) + + if not usage_time_hdd: + warning = f"We don't found any TestDataStorage for disk sn: {serial_number}" + live.severity = Severity.Warning + live.description = warning + return live + + live.sort_actions() + diff_time = live.diff_time() + if diff_time is None: + warning = "Don't exist one previous live or snapshot as reference" + live.description += warning + live.severity = Severity.Warning + elif diff_time < timedelta(0): + warning = "The difference with the last live/snapshot is negative" + live.description += warning + live.severity = Severity.Warning + return live + + class ActionView(View): def post(self): """Posts an action.""" @@ -146,16 +255,6 @@ class ActionView(View): # model object, when we flush them to the db we will flush # snapshot, and we want to wait to flush snapshot at the end - # If the device is allocated, then snapshot is a live - live = self.live(snapshot_json) - if live: - db.session.add(live) - db.session().final_flush() - ret = self.schema.jsonify(live) # transform it back - ret.status_code = 201 - db.session.commit() - return ret - device = snapshot_json.pop('device') # type: Computer components = None if snapshot_json['software'] == (SnapshotSoftware.Workbench or SnapshotSoftware.WorkbenchAndroid): @@ -216,89 +315,6 @@ class ActionView(View): db.session.commit() return ret - def get_hdd_details(self, snapshot, device): - """We get the liftime and serial_number of the disk""" - usage_time_hdd = None - serial_number = None - for hd in snapshot['components']: - if not isinstance(hd, DataStorage): - continue - - serial_number = hd.serial_number - for act in hd.actions: - if not act.type == "TestDataStorage": - continue - usage_time_hdd = act.lifetime - break - - if usage_time_hdd: - break - - if not serial_number: - "There aren't any disk" - raise ResourceNotFound("There aren't any disk in this device {}".format(device)) - return usage_time_hdd, serial_number - - def get_hid(self, snapshot): - device = snapshot.get('device') # type: Computer - components = snapshot.get('components') - if not device: - return None - if not components: - return device.hid - macs = [c.serial_number for c in components - if c.type == 'NetworkAdapter' and c.serial_number is not None] - macs.sort() - mac = '' - hid = device.hid - if not hid: - return hid - if macs: - mac = "-{mac}".format(mac=macs[0]) - hid += mac - return hid - - def live(self, snapshot): - """If the device.allocated == True, then this snapshot create an action live.""" - hid = self.get_hid(snapshot) - if not hid or not Device.query.filter( - Device.hid==hid, Device.owner_id==g.user.id).count(): - return None - - device = Device.query.filter( - Device.hid==hid, Device.owner_id==g.user.id).one() - - if not device.allocated: - return None - - usage_time_hdd, serial_number = self.get_hdd_details(snapshot, device) - - data_live = {'usage_time_hdd': usage_time_hdd, - 'serial_number': serial_number, - 'snapshot_uuid': snapshot['uuid'], - 'description': '', - 'device': device} - - live = Live(**data_live) - - if not usage_time_hdd: - warning = f"We don't found any TestDataStorage for disk sn: {serial_number}" - live.severity = Severity.Warning - live.description = warning - return live - - live.sort_actions() - diff_time = live.diff_time() - if diff_time is None: - warning = "Don't exist one previous live or snapshot as reference" - live.description += warning - live.severity = Severity.Warning - elif diff_time < timedelta(0): - warning = "The difference with the last live/snapshot is negative" - live.description += warning - live.severity = Severity.Warning - return live - def transfer_ownership(self): """Perform a InitTransfer action to change author_id of device""" pass diff --git a/ereuse_devicehub/resources/licences/__init__.py b/ereuse_devicehub/resources/licences/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ereuse_devicehub/resources/licences/licences.py b/ereuse_devicehub/resources/licences/licences.py new file mode 100644 index 00000000..8742d29a --- /dev/null +++ b/ereuse_devicehub/resources/licences/licences.py @@ -0,0 +1,41 @@ +from typing import Callable, Iterable, Tuple +from flask.json import jsonify +from teal.resource import Resource, View + + +class LicenceView(View): + def get(self, *args, **kwargs): + """Get version of DeviceHub and ereuse-tag.""" + + with open('licences.txt') as f: + licences = f.read() + + ret = jsonify(licences) + ret.status_code = 200 + return ret + + +class LicencesDef(Resource): + __type__ = 'Licence' + SCHEMA = None + VIEW = None # We do not want to create default / documents endpoint + AUTH = False + + def __init__(self, app, + import_name=__name__, + static_folder=None, + static_url_path=None, + template_folder=None, + url_prefix=None, + subdomain=None, + url_defaults=None, + root_path=None, + cli_commands: Iterable[Tuple[Callable, str or None]] = tuple()): + super().__init__(app, import_name, static_folder, static_url_path, template_folder, + url_prefix, subdomain, url_defaults, root_path, cli_commands) + + get = {'GET'} + d = {} + + licence_view = LicenceView.as_view('LicenceView', definition=self) + self.add_url_rule('/', defaults=d, view_func=licence_view, methods=get) diff --git a/licences.txt b/licences.txt new file mode 100644 index 00000000..8137869c --- /dev/null +++ b/licences.txt @@ -0,0 +1,38 @@ +[ + { + "WorkbenchDesktopVersion": "v.1", + "USOdyPrivacyPolicyVersion": "v.1", + "Language": "CAT", + "Description": "Recollim informació bàsica bla bla bla" + }, + { + "WorkbenchesktopVersion": "v.1", + "USOdyPrivacyPolicyVersion": "v.2", + "Language": "CAT", + "Description": "Recollim informació bàsica bla bla bla i a partir de tal data les dades d’hores d’ús les usem també per estimar la durabilitat" + }, + { + "WorkbenchDesktopVersion": "v.1.1", + "USOdyPrivacyPolicyVersion": "v.3", + "Language": "CAT", + "Description": "Recollim informació bàsica bla bla bla pero ara també recollim la versió del sistema operatiu" + }, + { + "WorkbenchDesktopVersion": "v.1", + "USOdyPrivacyPolicyVersion": "v.1", + "Language": "EN", + "Description": "We collect basic information blah blah blah" + }, + { + "WorkbenchDesktopVersion": "v.1", + "USOdyPrivacyPolicyVersion": "v.2", + "Language": "EN", + "Description": "We collect basic information blah blah blah and from that date we also use the usage time data to estimate durability" + }, + { + "WorkbenchDesktopVersion": "v.1.1", + "USOdyPrivacyPolicyVersion": "v.3", + "Language": "EN", + "Description": "We collect basic information blah blah blah but now we also collect the operating system version" + } +] diff --git a/tests/conftest.py b/tests/conftest.py index 9969be22..d212a9b7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -31,6 +31,7 @@ class TestConfig(DevicehubConfig): TESTING = True SERVER_NAME = 'localhost' TMP_SNAPSHOTS = '/tmp/snapshots' + TMP_LIVES = '/tmp/lives' @pytest.fixture(scope='session') diff --git a/tests/test_action.py b/tests/test_action.py index 11b15f08..f5dd6dcb 100644 --- a/tests/test_action.py +++ b/tests/test_action.py @@ -1,4 +1,7 @@ +import os import ipaddress +import json +import shutil import copy import pytest @@ -12,7 +15,7 @@ from sqlalchemy.util import OrderedSet from teal.enums import Currency, Subdivision from ereuse_devicehub.db import db -from ereuse_devicehub.client import UserClient +from ereuse_devicehub.client import UserClient, Client from ereuse_devicehub.devicehub import Devicehub from ereuse_devicehub.resources import enums from ereuse_devicehub.resources.action import models @@ -248,7 +251,7 @@ def test_generic_action(action_model_state: Tuple[models.Action, states.Trading] @pytest.mark.mvp @pytest.mark.usefixtures(conftest.app_context.__name__) -def test_live(user: UserClient, app: Devicehub): +def test_live(user: UserClient, client: Client, app: Devicehub): """Tests inserting a Live into the database and GETting it.""" acer = file('acer.happy.battery.snapshot') snapshot, _ = user.post(acer, res=models.Snapshot) @@ -262,11 +265,12 @@ def test_live(user: UserClient, app: Devicehub): } user.post(res=models.Allocate, data=post_request) - acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3" hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0] hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0] hdd_action['lifetime'] += 1000 - snapshot, _ = user.post(acer, res=models.Snapshot) + acer.pop('elapsed') + acer['licence_version'] = '1.0.0' + snapshot, _ = client.post(acer, res=models.Live) db_device = Device.query.filter_by(id=1).one() action_live = [a for a in db_device.actions if a.type == 'Live'] assert len(action_live) == 1 @@ -274,11 +278,12 @@ def test_live(user: UserClient, app: Devicehub): assert action_live[0].usage_time_allocate == timedelta(hours=1000) assert action_live[0].final_user_code == post_request['finalUserCode'] assert action_live[0].serial_number == 'wd-wx11a80w7430' + assert action_live[0].licence_version == '1.0' assert str(action_live[0].snapshot_uuid) == acer['uuid'] @pytest.mark.mvp @pytest.mark.usefixtures(conftest.app_context.__name__) -def test_live_without_TestDataStorage(user: UserClient, app: Devicehub): +def test_live_without_TestDataStorage(user: UserClient, client: Client, app: Devicehub): """Tests inserting a Live into the database and GETting it. If the live don't have a TestDataStorage, then save live and response None """ @@ -297,9 +302,9 @@ def test_live_without_TestDataStorage(user: UserClient, app: Devicehub): acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3" actions = [a for a in acer['components'][7]['actions'] if a['type'] != 'TestDataStorage'] acer['components'][7]['actions'] = actions - live, _ = user.post(acer, res=models.Snapshot) - assert live['type'] == 'Live' - assert live['serialNumber'] == 'wd-wx11a80w7430' + acer.pop('elapsed') + acer['licence_version'] = '1.0.0' + live, _ = client.post(acer, res=models.Live) assert live['severity'] == 'Warning' description = "We don't found any TestDataStorage for disk sn: wd-wx11a80w7430" assert live['description'] == description @@ -309,7 +314,7 @@ def test_live_without_TestDataStorage(user: UserClient, app: Devicehub): @pytest.mark.mvp @pytest.mark.usefixtures(conftest.app_context.__name__) -def test_live_without_hdd_1(user: UserClient, app: Devicehub): +def test_live_without_hdd_1(user: UserClient, client: Client, app: Devicehub): """Tests inserting a Live into the database and GETting it. The snapshot have hdd but the live no, and response 404 """ @@ -328,13 +333,15 @@ def test_live_without_hdd_1(user: UserClient, app: Devicehub): acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3" components = [a for a in acer['components'] if a['type'] != 'HardDrive'] acer['components'] = components - response, _ = user.post(acer, res=models.Snapshot, status=404) + acer.pop('elapsed') + acer['licence_version'] = '1.0.0' + response, _ = client.post(acer, res=models.Live, status=404) assert "The There aren't any disk in this device" in response['message'] @pytest.mark.mvp @pytest.mark.usefixtures(conftest.app_context.__name__) -def test_live_without_hdd_2(user: UserClient, app: Devicehub): +def test_live_without_hdd_2(user: UserClient, client: Client, app: Devicehub): """Tests inserting a Live into the database and GETting it. The snapshot haven't hdd and the live neither, and response 404 """ @@ -353,13 +360,15 @@ def test_live_without_hdd_2(user: UserClient, app: Devicehub): user.post(res=models.Allocate, data=post_request) acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3" - response, _ = user.post(acer, res=models.Snapshot, status=404) + acer.pop('elapsed') + acer['licence_version'] = '1.0.0' + response, _ = client.post(acer, res=models.Live, status=404) assert "The There aren't any disk in this device" in response['message'] @pytest.mark.mvp @pytest.mark.usefixtures(conftest.app_context.__name__) -def test_live_without_hdd_3(user: UserClient, app: Devicehub): +def test_live_without_hdd_3(user: UserClient, client: Client, app: Devicehub): """Tests inserting a Live into the database and GETting it. The snapshot haven't hdd and the live have, and save the live with usage_time_allocate == 0 @@ -380,9 +389,9 @@ def test_live_without_hdd_3(user: UserClient, app: Devicehub): user.post(res=models.Allocate, data=post_request) acer = file('acer.happy.battery.snapshot') - live, _ = user.post(acer, res=models.Snapshot) - assert live['type'] == 'Live' - assert live['serialNumber'] == 'wd-wx11a80w7430' + acer.pop('elapsed') + acer['licence_version'] = '1.0.0' + live, _ = client.post(acer, res=models.Live) assert live['severity'] == 'Warning' description = "Don't exist one previous live or snapshot as reference" assert live['description'] == description @@ -393,7 +402,7 @@ def test_live_without_hdd_3(user: UserClient, app: Devicehub): @pytest.mark.mvp @pytest.mark.usefixtures(conftest.app_context.__name__) -def test_live_with_hdd_with_old_time(user: UserClient, app: Devicehub): +def test_live_with_hdd_with_old_time(user: UserClient, client: Client, app: Devicehub): """Tests inserting a Live into the database and GETting it. The snapshot hdd have a lifetime higher than lifetime of the live action save the live with usage_time_allocate == 0 @@ -414,9 +423,9 @@ def test_live_with_hdd_with_old_time(user: UserClient, app: Devicehub): acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3" action = [a for a in acer['components'][7]['actions'] if a['type'] == 'TestDataStorage'] action[0]['lifetime'] -= 100 - live, _ = user.post(acer, res=models.Snapshot) - assert live['type'] == 'Live' - assert live['serialNumber'] == 'wd-wx11a80w7430' + acer.pop('elapsed') + acer['licence_version'] = '1.0.0' + live, _ = client.post(acer, res=models.Live) assert live['severity'] == 'Warning' description = "The difference with the last live/snapshot is negative" assert live['description'] == description @@ -427,13 +436,12 @@ def test_live_with_hdd_with_old_time(user: UserClient, app: Devicehub): @pytest.mark.mvp @pytest.mark.usefixtures(conftest.app_context.__name__) -def test_live_search_last_allocate(user: UserClient, app: Devicehub): +def test_live_search_last_allocate(user: UserClient, client: Client, app: Devicehub): """Tests inserting a Live into the database and GETting it. """ acer = file('acer.happy.battery.snapshot') snapshot, _ = user.post(acer, res=models.Snapshot) device_id = snapshot['device']['id'] - db_device = Device.query.filter_by(id=1).one() post_request = {"transaction": "ccc", "name": "John", "endUsers": 1, "devices": [device_id], "description": "aaa", "finalUserCode": "abcdefjhi", @@ -446,14 +454,67 @@ def test_live_search_last_allocate(user: UserClient, app: Devicehub): hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0] hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0] hdd_action['lifetime'] += 1000 - live, _ = user.post(acer, res=models.Snapshot) + acer.pop('elapsed') + acer['licence_version'] = '1.0.0' + live, _ = client.post(acer, res=models.Live) acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec4" actions = [a for a in acer['components'][7]['actions'] if a['type'] != 'TestDataStorage'] acer['components'][7]['actions'] = actions - live, _ = user.post(acer, res=models.Snapshot) + live, _ = client.post(acer, res=models.Live) assert live['usageTimeAllocate'] == 1000 +@pytest.mark.mvp +def test_save_live_json(app: Devicehub, user: UserClient, client: Client): + """ This test check if works the function save_snapshot_in_file """ + acer = file('acer.happy.battery.snapshot') + snapshot, _ = user.post(acer, res=models.Snapshot) + debug = 'AAA' + acer['debug'] = debug + device_id = snapshot['device']['id'] + post_request = {"transaction": "ccc", "name": "John", "endUsers": 1, + "devices": [device_id], "description": "aaa", + "finalUserCode": "abcdefjhi", + "startTime": "2020-11-01T02:00:00+00:00", + "endTime": "2020-12-01T02:00:00+00:00" + } + + user.post(res=models.Allocate, data=post_request) + acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3" + hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0] + hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0] + hdd_action['lifetime'] += 1000 + acer.pop('elapsed') + acer['licence_version'] = '1.0.0' + live, _ = client.post(acer, res=models.Live) + + tmp_snapshots = app.config['TMP_LIVES'] + path_dir_base = os.path.join(tmp_snapshots) + + uuid = acer['uuid'] + files = [x for x in os.listdir(path_dir_base) if uuid in x] + + snapshot = {'debug': ''} + if files: + path_snapshot = os.path.join(path_dir_base, files[0]) + with open(path_snapshot) as file_snapshot: + snapshot = json.loads(file_snapshot.read()) + + shutil.rmtree(tmp_snapshots) + + assert snapshot['debug'] == debug + + +@pytest.mark.mvp +@pytest.mark.usefixtures(conftest.app_context.__name__) +def test_licences(client: Client): + """Tests inserting a Live into the database and GETting it. + """ + licences, _ = client.get('/licences/') + licences = json.loads(licences) + assert licences[0]['USOdyPrivacyPolicyVersion'] == 'v.1' + + @pytest.mark.mvp @pytest.mark.usefixtures(conftest.app_context.__name__) def test_allocate(user: UserClient): diff --git a/tests/test_basic.py b/tests/test_basic.py index ea8bac0d..978948af 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -105,6 +105,8 @@ def test_api_docs(client: Client): '/allocates/', '/deallocates/', '/metrics/', + '/licences/', + '/lives/', } assert docs['info'] == {'title': 'Devicehub', 'version': '0.2'} assert docs['components']['securitySchemes']['bearerAuth'] == { diff --git a/tests/test_metrics.py b/tests/test_metrics.py index 238b6132..81e966f4 100644 --- a/tests/test_metrics.py +++ b/tests/test_metrics.py @@ -29,21 +29,23 @@ def test_simple_metrics(user: UserClient): hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0] hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0] hdd_action['powerCycleCount'] += 1000 - user.post(acer, res=ma.Snapshot) + acer.pop('elapsed') + acer['licence_version'] = '1.0.0' + user.post(acer, res=ma.Live) # Create a live acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec4" hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0] hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0] hdd_action['powerCycleCount'] += 1000 - user.post(acer, res=ma.Snapshot) + user.post(acer, res=ma.Live) # Create an other live acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec5" hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0] hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0] hdd_action['powerCycleCount'] += 1000 - user.post(acer, res=ma.Snapshot) + user.post(acer, res=ma.Live) # Check metrics metrics = {'allocateds': 1, 'live': 1} @@ -72,20 +74,22 @@ def test_second_hdd_metrics(user: UserClient): hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0] hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0] hdd_action['powerCycleCount'] += 1000 - user.post(acer, res=ma.Snapshot) + acer.pop('elapsed') + acer['licence_version'] = '1.0.0' + user.post(acer, res=ma.Live) # Create a live acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec4" hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0] hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0] hdd_action['powerCycleCount'] += 1000 - user.post(acer, res=ma.Snapshot) + user.post(acer, res=ma.Live) # Create a second device acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec5" hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0] hdd['serialNumber'] = 'WD-WX11A80W7440' - user.post(acer, res=ma.Snapshot) + user.post(acer, res=ma.Live) # Check metrics if we change the hdd we need a result of one device metrics = {'allocateds': 1, 'live': 1}