Merge pull request #98 from eReuse/feature/#97-endpoint-live-licences
Feature/#97 endpoint live licences
This commit is contained in:
commit
da6bb4087d
|
@ -13,6 +13,8 @@ ml).
|
|||
|
||||
## [1.0.4-beta]
|
||||
- [addend] #95 adding endpoint for check the hash of one report
|
||||
- [addend] #98 adding endpoint for insert a new live
|
||||
- [addend] #98 adding endpoint for get all licences in one query
|
||||
|
||||
## [1.0.3-beta]
|
||||
- [addend] #85 add mac of network adapter to device hid
|
||||
|
|
|
@ -14,6 +14,7 @@ from ereuse_devicehub.resources.device import definitions
|
|||
from ereuse_devicehub.resources.documents import documents
|
||||
from ereuse_devicehub.resources.enums import PriceSoftware
|
||||
from ereuse_devicehub.resources.versions import versions
|
||||
from ereuse_devicehub.resources.licences import licences
|
||||
from ereuse_devicehub.resources.metric import definitions as metric_def
|
||||
|
||||
|
||||
|
@ -29,6 +30,7 @@ class DevicehubConfig(Config):
|
|||
import_resource(documents),
|
||||
import_resource(inventory),
|
||||
import_resource(versions),
|
||||
import_resource(licences),
|
||||
import_resource(metric_def),
|
||||
),)
|
||||
PASSWORD_SCHEMES = {'pbkdf2_sha256'} # type: Set[str]
|
||||
|
@ -48,6 +50,7 @@ class DevicehubConfig(Config):
|
|||
"""
|
||||
|
||||
TMP_SNAPSHOTS = config('TMP_SNAPSHOTS', '/tmp/snapshots')
|
||||
TMP_LIVES = config('TMP_LIVES', '/tmp/lives')
|
||||
"""This var is for save a snapshots in json format when fail something"""
|
||||
API_DOC_CONFIG_TITLE = 'Devicehub'
|
||||
API_DOC_CONFIG_VERSION = '0.2'
|
||||
|
|
|
@ -3,7 +3,8 @@ from typing import Callable, Iterable, Tuple
|
|||
from teal.resource import Converters, Resource
|
||||
|
||||
from ereuse_devicehub.resources.action import schemas
|
||||
from ereuse_devicehub.resources.action.views import ActionView, AllocateView, DeallocateView
|
||||
from ereuse_devicehub.resources.action.views import (ActionView, AllocateView, DeallocateView,
|
||||
LiveView)
|
||||
from ereuse_devicehub.resources.device.sync import Sync
|
||||
|
||||
|
||||
|
@ -214,8 +215,9 @@ class PrepareDef(ActionDef):
|
|||
|
||||
|
||||
class LiveDef(ActionDef):
|
||||
VIEW = None
|
||||
VIEW = LiveView
|
||||
SCHEMA = schemas.Live
|
||||
AUTH = False
|
||||
|
||||
|
||||
class ReserveDef(ActionDef):
|
||||
|
|
|
@ -1304,6 +1304,9 @@ class Live(JoinedWithOneDeviceMixin, ActionWithOneDevice):
|
|||
serial_number.comment = """The serial number of the Hard Disk in lower case."""
|
||||
usage_time_hdd = Column(Interval, nullable=True)
|
||||
snapshot_uuid = Column(UUID(as_uuid=True))
|
||||
software = Column(DBEnum(SnapshotSoftware), nullable=False)
|
||||
software_version = Column(StrictVersionType(STR_SM_SIZE), nullable=False)
|
||||
licence_version = Column(StrictVersionType(STR_SM_SIZE), nullable=False)
|
||||
|
||||
@property
|
||||
def final_user_code(self):
|
||||
|
|
|
@ -420,10 +420,26 @@ class Prepare(ActionWithMultipleDevices):
|
|||
|
||||
class Live(ActionWithOneDevice):
|
||||
__doc__ = m.Live.__doc__
|
||||
"""
|
||||
The Snapshot updates the state of the device with information about
|
||||
its components and actions performed at them.
|
||||
|
||||
See docs for more info.
|
||||
"""
|
||||
uuid = UUID()
|
||||
software = EnumField(SnapshotSoftware,
|
||||
required=True,
|
||||
description='The software that generated this Snapshot.')
|
||||
version = Version(required=True, description='The version of the software.')
|
||||
final_user_code = SanitizedStr(data_key="finalUserCode", dump_only=True)
|
||||
serial_number = SanitizedStr(data_key="serialNumber", dump_only=True)
|
||||
usage_time_hdd = TimeDelta(data_key="usageTimeHdd", precision=TimeDelta.HOURS, dump_only=True)
|
||||
usage_time_allocate = TimeDelta(data_key="usageTimeAllocate",
|
||||
licence_version = Version(required=True, description='The version of the software.')
|
||||
components = NestedOn(s_device.Component,
|
||||
many=True,
|
||||
description='A list of components that are inside of the device'
|
||||
'at the moment of this Snapshot.'
|
||||
'Order is preserved, so the component num 0 when'
|
||||
'submitting is the component num 0 when returning it back.')
|
||||
usage_time_allocate = TimeDelta(data_key='usageTimeAllocate', required=False,
|
||||
precision=TimeDelta.HOURS, dump_only=True)
|
||||
|
||||
|
||||
|
|
|
@ -6,21 +6,17 @@ import shutil
|
|||
from datetime import datetime, timedelta
|
||||
from distutils.version import StrictVersion
|
||||
from uuid import UUID
|
||||
from flask.json import jsonify
|
||||
|
||||
from flask import current_app as app, request, g, redirect
|
||||
from flask import current_app as app, request, g
|
||||
from sqlalchemy.util import OrderedSet
|
||||
from teal.marshmallow import ValidationError
|
||||
from teal.resource import View
|
||||
from teal.db import ResourceNotFound
|
||||
|
||||
from ereuse_devicehub.db import db
|
||||
from ereuse_devicehub.resources.device.models import Device, Computer
|
||||
from ereuse_devicehub.resources.action.models import Action, RateComputer, Snapshot, VisualTest, \
|
||||
InitTransfer
|
||||
from ereuse_devicehub.query import things_response
|
||||
from ereuse_devicehub.resources.action.models import (Action, RateComputer, Snapshot, VisualTest,
|
||||
InitTransfer, Live, Allocate, Deallocate)
|
||||
from ereuse_devicehub.resources.action.models import (Action, RateComputer, Snapshot, VisualTest,
|
||||
InitTransfer, Live, Allocate, Deallocate)
|
||||
from ereuse_devicehub.resources.device.models import Device, Computer, DataStorage
|
||||
from ereuse_devicehub.resources.action.rate.v1_0 import CannotRate
|
||||
from ereuse_devicehub.resources.enums import SnapshotSoftware, Severity
|
||||
|
@ -29,7 +25,7 @@ from ereuse_devicehub.resources.user.exceptions import InsufficientPermission
|
|||
SUPPORTED_WORKBENCH = StrictVersion('11.0')
|
||||
|
||||
|
||||
def save_json(req_json, tmp_snapshots, user):
|
||||
def save_json(req_json, tmp_snapshots, user, live=False):
|
||||
"""
|
||||
This function allow save a snapshot in json format un a TMP_SNAPSHOTS directory
|
||||
The file need to be saved with one name format with the stamptime and uuid joins
|
||||
|
@ -44,6 +40,8 @@ def save_json(req_json, tmp_snapshots, user):
|
|||
|
||||
name_file = f"{year}-{month}-{day}-{hour}-{minutes}_{user}_{uuid}.json"
|
||||
path_dir_base = os.path.join(tmp_snapshots, user)
|
||||
if live:
|
||||
path_dir_base = tmp_snapshots
|
||||
path_errors = os.path.join(path_dir_base, 'errors')
|
||||
path_fixeds = os.path.join(path_dir_base, 'fixeds')
|
||||
path_name = os.path.join(path_errors, name_file)
|
||||
|
@ -58,11 +56,13 @@ def save_json(req_json, tmp_snapshots, user):
|
|||
return path_name
|
||||
|
||||
|
||||
def move_json(tmp_snapshots, path_name, user):
|
||||
def move_json(tmp_snapshots, path_name, user, live=False):
|
||||
"""
|
||||
This function move the json than it's correct
|
||||
"""
|
||||
path_dir_base = os.path.join(tmp_snapshots, user)
|
||||
if live:
|
||||
path_dir_base = tmp_snapshots
|
||||
if os.path.isfile(path_name):
|
||||
shutil.copy(path_name, path_dir_base)
|
||||
os.remove(path_name)
|
||||
|
@ -96,10 +96,119 @@ class AllocateMix():
|
|||
class AllocateView(AllocateMix, View):
|
||||
model = Allocate
|
||||
|
||||
|
||||
class DeallocateView(AllocateMix, View):
|
||||
model = Deallocate
|
||||
|
||||
|
||||
class LiveView(View):
|
||||
def post(self):
|
||||
"""Posts an action."""
|
||||
res_json = request.get_json(validate=False)
|
||||
tmp_snapshots = app.config['TMP_LIVES']
|
||||
path_live = save_json(res_json, tmp_snapshots, '', live=True)
|
||||
res_json.pop('debug', None)
|
||||
res_json_valid = self.schema.load(res_json)
|
||||
live = self.live(res_json_valid)
|
||||
db.session.add(live)
|
||||
db.session().final_flush()
|
||||
ret = self.schema.jsonify(live)
|
||||
ret.status_code = 201
|
||||
db.session.commit()
|
||||
move_json(tmp_snapshots, path_live, '', live=True)
|
||||
return ret
|
||||
|
||||
def get_hdd_details(self, snapshot, device):
|
||||
"""We get the liftime and serial_number of the disk"""
|
||||
usage_time_hdd = None
|
||||
serial_number = None
|
||||
for hd in snapshot['components']:
|
||||
if not isinstance(hd, DataStorage):
|
||||
continue
|
||||
|
||||
serial_number = hd.serial_number
|
||||
for act in hd.actions:
|
||||
if not act.type == "TestDataStorage":
|
||||
continue
|
||||
usage_time_hdd = act.lifetime
|
||||
break
|
||||
|
||||
if usage_time_hdd:
|
||||
break
|
||||
|
||||
if not serial_number:
|
||||
"""There aren't any disk"""
|
||||
raise ResourceNotFound("There aren't any disk in this device {}".format(device))
|
||||
return usage_time_hdd, serial_number
|
||||
|
||||
def get_hid(self, snapshot):
|
||||
device = snapshot.get('device') # type: Computer
|
||||
components = snapshot.get('components')
|
||||
if not device:
|
||||
return None
|
||||
if not components:
|
||||
return device.hid
|
||||
macs = [c.serial_number for c in components
|
||||
if c.type == 'NetworkAdapter' and c.serial_number is not None]
|
||||
macs.sort()
|
||||
mac = ''
|
||||
hid = device.hid
|
||||
if not hid:
|
||||
return hid
|
||||
if macs:
|
||||
mac = "-{mac}".format(mac=macs[0])
|
||||
hid += mac
|
||||
return hid
|
||||
|
||||
def live(self, snapshot):
|
||||
"""If the device.allocated == True, then this snapshot create an action live."""
|
||||
hid = self.get_hid(snapshot)
|
||||
if not hid or not Device.query.filter(
|
||||
Device.hid==hid).count():
|
||||
raise ValidationError('Device not exist.')
|
||||
|
||||
device = Device.query.filter(
|
||||
Device.hid==hid).one()
|
||||
# Is not necessary
|
||||
if not device:
|
||||
raise ValidationError('Device not exist.')
|
||||
if not device.allocated:
|
||||
raise ValidationError('Sorry this device is not allocated.')
|
||||
|
||||
usage_time_hdd, serial_number = self.get_hdd_details(snapshot, device)
|
||||
|
||||
data_live = {'usage_time_hdd': usage_time_hdd,
|
||||
'serial_number': serial_number,
|
||||
'snapshot_uuid': snapshot['uuid'],
|
||||
'description': '',
|
||||
'software': snapshot['software'],
|
||||
'software_version': snapshot['version'],
|
||||
'licence_version': snapshot['licence_version'],
|
||||
'author_id': device.owner_id,
|
||||
'agent_id': device.owner.individual.id,
|
||||
'device': device}
|
||||
|
||||
live = Live(**data_live)
|
||||
|
||||
if not usage_time_hdd:
|
||||
warning = f"We don't found any TestDataStorage for disk sn: {serial_number}"
|
||||
live.severity = Severity.Warning
|
||||
live.description = warning
|
||||
return live
|
||||
|
||||
live.sort_actions()
|
||||
diff_time = live.diff_time()
|
||||
if diff_time is None:
|
||||
warning = "Don't exist one previous live or snapshot as reference"
|
||||
live.description += warning
|
||||
live.severity = Severity.Warning
|
||||
elif diff_time < timedelta(0):
|
||||
warning = "The difference with the last live/snapshot is negative"
|
||||
live.description += warning
|
||||
live.severity = Severity.Warning
|
||||
return live
|
||||
|
||||
|
||||
class ActionView(View):
|
||||
def post(self):
|
||||
"""Posts an action."""
|
||||
|
@ -146,16 +255,6 @@ class ActionView(View):
|
|||
# model object, when we flush them to the db we will flush
|
||||
# snapshot, and we want to wait to flush snapshot at the end
|
||||
|
||||
# If the device is allocated, then snapshot is a live
|
||||
live = self.live(snapshot_json)
|
||||
if live:
|
||||
db.session.add(live)
|
||||
db.session().final_flush()
|
||||
ret = self.schema.jsonify(live) # transform it back
|
||||
ret.status_code = 201
|
||||
db.session.commit()
|
||||
return ret
|
||||
|
||||
device = snapshot_json.pop('device') # type: Computer
|
||||
components = None
|
||||
if snapshot_json['software'] == (SnapshotSoftware.Workbench or SnapshotSoftware.WorkbenchAndroid):
|
||||
|
@ -216,89 +315,6 @@ class ActionView(View):
|
|||
db.session.commit()
|
||||
return ret
|
||||
|
||||
def get_hdd_details(self, snapshot, device):
|
||||
"""We get the liftime and serial_number of the disk"""
|
||||
usage_time_hdd = None
|
||||
serial_number = None
|
||||
for hd in snapshot['components']:
|
||||
if not isinstance(hd, DataStorage):
|
||||
continue
|
||||
|
||||
serial_number = hd.serial_number
|
||||
for act in hd.actions:
|
||||
if not act.type == "TestDataStorage":
|
||||
continue
|
||||
usage_time_hdd = act.lifetime
|
||||
break
|
||||
|
||||
if usage_time_hdd:
|
||||
break
|
||||
|
||||
if not serial_number:
|
||||
"There aren't any disk"
|
||||
raise ResourceNotFound("There aren't any disk in this device {}".format(device))
|
||||
return usage_time_hdd, serial_number
|
||||
|
||||
def get_hid(self, snapshot):
|
||||
device = snapshot.get('device') # type: Computer
|
||||
components = snapshot.get('components')
|
||||
if not device:
|
||||
return None
|
||||
if not components:
|
||||
return device.hid
|
||||
macs = [c.serial_number for c in components
|
||||
if c.type == 'NetworkAdapter' and c.serial_number is not None]
|
||||
macs.sort()
|
||||
mac = ''
|
||||
hid = device.hid
|
||||
if not hid:
|
||||
return hid
|
||||
if macs:
|
||||
mac = "-{mac}".format(mac=macs[0])
|
||||
hid += mac
|
||||
return hid
|
||||
|
||||
def live(self, snapshot):
|
||||
"""If the device.allocated == True, then this snapshot create an action live."""
|
||||
hid = self.get_hid(snapshot)
|
||||
if not hid or not Device.query.filter(
|
||||
Device.hid==hid, Device.owner_id==g.user.id).count():
|
||||
return None
|
||||
|
||||
device = Device.query.filter(
|
||||
Device.hid==hid, Device.owner_id==g.user.id).one()
|
||||
|
||||
if not device.allocated:
|
||||
return None
|
||||
|
||||
usage_time_hdd, serial_number = self.get_hdd_details(snapshot, device)
|
||||
|
||||
data_live = {'usage_time_hdd': usage_time_hdd,
|
||||
'serial_number': serial_number,
|
||||
'snapshot_uuid': snapshot['uuid'],
|
||||
'description': '',
|
||||
'device': device}
|
||||
|
||||
live = Live(**data_live)
|
||||
|
||||
if not usage_time_hdd:
|
||||
warning = f"We don't found any TestDataStorage for disk sn: {serial_number}"
|
||||
live.severity = Severity.Warning
|
||||
live.description = warning
|
||||
return live
|
||||
|
||||
live.sort_actions()
|
||||
diff_time = live.diff_time()
|
||||
if diff_time is None:
|
||||
warning = "Don't exist one previous live or snapshot as reference"
|
||||
live.description += warning
|
||||
live.severity = Severity.Warning
|
||||
elif diff_time < timedelta(0):
|
||||
warning = "The difference with the last live/snapshot is negative"
|
||||
live.description += warning
|
||||
live.severity = Severity.Warning
|
||||
return live
|
||||
|
||||
def transfer_ownership(self):
|
||||
"""Perform a InitTransfer action to change author_id of device"""
|
||||
pass
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
from typing import Callable, Iterable, Tuple
|
||||
from flask.json import jsonify
|
||||
from teal.resource import Resource, View
|
||||
|
||||
|
||||
class LicenceView(View):
|
||||
def get(self, *args, **kwargs):
|
||||
"""Get version of DeviceHub and ereuse-tag."""
|
||||
|
||||
with open('licences.txt') as f:
|
||||
licences = f.read()
|
||||
|
||||
ret = jsonify(licences)
|
||||
ret.status_code = 200
|
||||
return ret
|
||||
|
||||
|
||||
class LicencesDef(Resource):
|
||||
__type__ = 'Licence'
|
||||
SCHEMA = None
|
||||
VIEW = None # We do not want to create default / documents endpoint
|
||||
AUTH = False
|
||||
|
||||
def __init__(self, app,
|
||||
import_name=__name__,
|
||||
static_folder=None,
|
||||
static_url_path=None,
|
||||
template_folder=None,
|
||||
url_prefix=None,
|
||||
subdomain=None,
|
||||
url_defaults=None,
|
||||
root_path=None,
|
||||
cli_commands: Iterable[Tuple[Callable, str or None]] = tuple()):
|
||||
super().__init__(app, import_name, static_folder, static_url_path, template_folder,
|
||||
url_prefix, subdomain, url_defaults, root_path, cli_commands)
|
||||
|
||||
get = {'GET'}
|
||||
d = {}
|
||||
|
||||
licence_view = LicenceView.as_view('LicenceView', definition=self)
|
||||
self.add_url_rule('/', defaults=d, view_func=licence_view, methods=get)
|
|
@ -0,0 +1,38 @@
|
|||
[
|
||||
{
|
||||
"WorkbenchDesktopVersion": "v.1",
|
||||
"USOdyPrivacyPolicyVersion": "v.1",
|
||||
"Language": "CAT",
|
||||
"Description": "Recollim informació bàsica bla bla bla"
|
||||
},
|
||||
{
|
||||
"WorkbenchesktopVersion": "v.1",
|
||||
"USOdyPrivacyPolicyVersion": "v.2",
|
||||
"Language": "CAT",
|
||||
"Description": "Recollim informació bàsica bla bla bla i a partir de tal data les dades d’hores d’ús les usem també per estimar la durabilitat"
|
||||
},
|
||||
{
|
||||
"WorkbenchDesktopVersion": "v.1.1",
|
||||
"USOdyPrivacyPolicyVersion": "v.3",
|
||||
"Language": "CAT",
|
||||
"Description": "Recollim informació bàsica bla bla bla pero ara també recollim la versió del sistema operatiu"
|
||||
},
|
||||
{
|
||||
"WorkbenchDesktopVersion": "v.1",
|
||||
"USOdyPrivacyPolicyVersion": "v.1",
|
||||
"Language": "EN",
|
||||
"Description": "We collect basic information blah blah blah"
|
||||
},
|
||||
{
|
||||
"WorkbenchDesktopVersion": "v.1",
|
||||
"USOdyPrivacyPolicyVersion": "v.2",
|
||||
"Language": "EN",
|
||||
"Description": "We collect basic information blah blah blah and from that date we also use the usage time data to estimate durability"
|
||||
},
|
||||
{
|
||||
"WorkbenchDesktopVersion": "v.1.1",
|
||||
"USOdyPrivacyPolicyVersion": "v.3",
|
||||
"Language": "EN",
|
||||
"Description": "We collect basic information blah blah blah but now we also collect the operating system version"
|
||||
}
|
||||
]
|
|
@ -31,6 +31,7 @@ class TestConfig(DevicehubConfig):
|
|||
TESTING = True
|
||||
SERVER_NAME = 'localhost'
|
||||
TMP_SNAPSHOTS = '/tmp/snapshots'
|
||||
TMP_LIVES = '/tmp/lives'
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
|
|
|
@ -1,4 +1,7 @@
|
|||
import os
|
||||
import ipaddress
|
||||
import json
|
||||
import shutil
|
||||
import copy
|
||||
import pytest
|
||||
|
||||
|
@ -12,7 +15,7 @@ from sqlalchemy.util import OrderedSet
|
|||
from teal.enums import Currency, Subdivision
|
||||
|
||||
from ereuse_devicehub.db import db
|
||||
from ereuse_devicehub.client import UserClient
|
||||
from ereuse_devicehub.client import UserClient, Client
|
||||
from ereuse_devicehub.devicehub import Devicehub
|
||||
from ereuse_devicehub.resources import enums
|
||||
from ereuse_devicehub.resources.action import models
|
||||
|
@ -248,7 +251,7 @@ def test_generic_action(action_model_state: Tuple[models.Action, states.Trading]
|
|||
|
||||
@pytest.mark.mvp
|
||||
@pytest.mark.usefixtures(conftest.app_context.__name__)
|
||||
def test_live(user: UserClient, app: Devicehub):
|
||||
def test_live(user: UserClient, client: Client, app: Devicehub):
|
||||
"""Tests inserting a Live into the database and GETting it."""
|
||||
acer = file('acer.happy.battery.snapshot')
|
||||
snapshot, _ = user.post(acer, res=models.Snapshot)
|
||||
|
@ -262,11 +265,12 @@ def test_live(user: UserClient, app: Devicehub):
|
|||
}
|
||||
|
||||
user.post(res=models.Allocate, data=post_request)
|
||||
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3"
|
||||
hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0]
|
||||
hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0]
|
||||
hdd_action['lifetime'] += 1000
|
||||
snapshot, _ = user.post(acer, res=models.Snapshot)
|
||||
acer.pop('elapsed')
|
||||
acer['licence_version'] = '1.0.0'
|
||||
snapshot, _ = client.post(acer, res=models.Live)
|
||||
db_device = Device.query.filter_by(id=1).one()
|
||||
action_live = [a for a in db_device.actions if a.type == 'Live']
|
||||
assert len(action_live) == 1
|
||||
|
@ -274,11 +278,12 @@ def test_live(user: UserClient, app: Devicehub):
|
|||
assert action_live[0].usage_time_allocate == timedelta(hours=1000)
|
||||
assert action_live[0].final_user_code == post_request['finalUserCode']
|
||||
assert action_live[0].serial_number == 'wd-wx11a80w7430'
|
||||
assert action_live[0].licence_version == '1.0'
|
||||
assert str(action_live[0].snapshot_uuid) == acer['uuid']
|
||||
|
||||
@pytest.mark.mvp
|
||||
@pytest.mark.usefixtures(conftest.app_context.__name__)
|
||||
def test_live_without_TestDataStorage(user: UserClient, app: Devicehub):
|
||||
def test_live_without_TestDataStorage(user: UserClient, client: Client, app: Devicehub):
|
||||
"""Tests inserting a Live into the database and GETting it.
|
||||
If the live don't have a TestDataStorage, then save live and response None
|
||||
"""
|
||||
|
@ -297,9 +302,9 @@ def test_live_without_TestDataStorage(user: UserClient, app: Devicehub):
|
|||
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3"
|
||||
actions = [a for a in acer['components'][7]['actions'] if a['type'] != 'TestDataStorage']
|
||||
acer['components'][7]['actions'] = actions
|
||||
live, _ = user.post(acer, res=models.Snapshot)
|
||||
assert live['type'] == 'Live'
|
||||
assert live['serialNumber'] == 'wd-wx11a80w7430'
|
||||
acer.pop('elapsed')
|
||||
acer['licence_version'] = '1.0.0'
|
||||
live, _ = client.post(acer, res=models.Live)
|
||||
assert live['severity'] == 'Warning'
|
||||
description = "We don't found any TestDataStorage for disk sn: wd-wx11a80w7430"
|
||||
assert live['description'] == description
|
||||
|
@ -309,7 +314,7 @@ def test_live_without_TestDataStorage(user: UserClient, app: Devicehub):
|
|||
|
||||
@pytest.mark.mvp
|
||||
@pytest.mark.usefixtures(conftest.app_context.__name__)
|
||||
def test_live_without_hdd_1(user: UserClient, app: Devicehub):
|
||||
def test_live_without_hdd_1(user: UserClient, client: Client, app: Devicehub):
|
||||
"""Tests inserting a Live into the database and GETting it.
|
||||
The snapshot have hdd but the live no, and response 404
|
||||
"""
|
||||
|
@ -328,13 +333,15 @@ def test_live_without_hdd_1(user: UserClient, app: Devicehub):
|
|||
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3"
|
||||
components = [a for a in acer['components'] if a['type'] != 'HardDrive']
|
||||
acer['components'] = components
|
||||
response, _ = user.post(acer, res=models.Snapshot, status=404)
|
||||
acer.pop('elapsed')
|
||||
acer['licence_version'] = '1.0.0'
|
||||
response, _ = client.post(acer, res=models.Live, status=404)
|
||||
assert "The There aren't any disk in this device" in response['message']
|
||||
|
||||
|
||||
@pytest.mark.mvp
|
||||
@pytest.mark.usefixtures(conftest.app_context.__name__)
|
||||
def test_live_without_hdd_2(user: UserClient, app: Devicehub):
|
||||
def test_live_without_hdd_2(user: UserClient, client: Client, app: Devicehub):
|
||||
"""Tests inserting a Live into the database and GETting it.
|
||||
The snapshot haven't hdd and the live neither, and response 404
|
||||
"""
|
||||
|
@ -353,13 +360,15 @@ def test_live_without_hdd_2(user: UserClient, app: Devicehub):
|
|||
|
||||
user.post(res=models.Allocate, data=post_request)
|
||||
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3"
|
||||
response, _ = user.post(acer, res=models.Snapshot, status=404)
|
||||
acer.pop('elapsed')
|
||||
acer['licence_version'] = '1.0.0'
|
||||
response, _ = client.post(acer, res=models.Live, status=404)
|
||||
assert "The There aren't any disk in this device" in response['message']
|
||||
|
||||
|
||||
@pytest.mark.mvp
|
||||
@pytest.mark.usefixtures(conftest.app_context.__name__)
|
||||
def test_live_without_hdd_3(user: UserClient, app: Devicehub):
|
||||
def test_live_without_hdd_3(user: UserClient, client: Client, app: Devicehub):
|
||||
"""Tests inserting a Live into the database and GETting it.
|
||||
The snapshot haven't hdd and the live have, and save the live
|
||||
with usage_time_allocate == 0
|
||||
|
@ -380,9 +389,9 @@ def test_live_without_hdd_3(user: UserClient, app: Devicehub):
|
|||
|
||||
user.post(res=models.Allocate, data=post_request)
|
||||
acer = file('acer.happy.battery.snapshot')
|
||||
live, _ = user.post(acer, res=models.Snapshot)
|
||||
assert live['type'] == 'Live'
|
||||
assert live['serialNumber'] == 'wd-wx11a80w7430'
|
||||
acer.pop('elapsed')
|
||||
acer['licence_version'] = '1.0.0'
|
||||
live, _ = client.post(acer, res=models.Live)
|
||||
assert live['severity'] == 'Warning'
|
||||
description = "Don't exist one previous live or snapshot as reference"
|
||||
assert live['description'] == description
|
||||
|
@ -393,7 +402,7 @@ def test_live_without_hdd_3(user: UserClient, app: Devicehub):
|
|||
|
||||
@pytest.mark.mvp
|
||||
@pytest.mark.usefixtures(conftest.app_context.__name__)
|
||||
def test_live_with_hdd_with_old_time(user: UserClient, app: Devicehub):
|
||||
def test_live_with_hdd_with_old_time(user: UserClient, client: Client, app: Devicehub):
|
||||
"""Tests inserting a Live into the database and GETting it.
|
||||
The snapshot hdd have a lifetime higher than lifetime of the live action
|
||||
save the live with usage_time_allocate == 0
|
||||
|
@ -414,9 +423,9 @@ def test_live_with_hdd_with_old_time(user: UserClient, app: Devicehub):
|
|||
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3"
|
||||
action = [a for a in acer['components'][7]['actions'] if a['type'] == 'TestDataStorage']
|
||||
action[0]['lifetime'] -= 100
|
||||
live, _ = user.post(acer, res=models.Snapshot)
|
||||
assert live['type'] == 'Live'
|
||||
assert live['serialNumber'] == 'wd-wx11a80w7430'
|
||||
acer.pop('elapsed')
|
||||
acer['licence_version'] = '1.0.0'
|
||||
live, _ = client.post(acer, res=models.Live)
|
||||
assert live['severity'] == 'Warning'
|
||||
description = "The difference with the last live/snapshot is negative"
|
||||
assert live['description'] == description
|
||||
|
@ -427,13 +436,12 @@ def test_live_with_hdd_with_old_time(user: UserClient, app: Devicehub):
|
|||
|
||||
@pytest.mark.mvp
|
||||
@pytest.mark.usefixtures(conftest.app_context.__name__)
|
||||
def test_live_search_last_allocate(user: UserClient, app: Devicehub):
|
||||
def test_live_search_last_allocate(user: UserClient, client: Client, app: Devicehub):
|
||||
"""Tests inserting a Live into the database and GETting it.
|
||||
"""
|
||||
acer = file('acer.happy.battery.snapshot')
|
||||
snapshot, _ = user.post(acer, res=models.Snapshot)
|
||||
device_id = snapshot['device']['id']
|
||||
db_device = Device.query.filter_by(id=1).one()
|
||||
post_request = {"transaction": "ccc", "name": "John", "endUsers": 1,
|
||||
"devices": [device_id], "description": "aaa",
|
||||
"finalUserCode": "abcdefjhi",
|
||||
|
@ -446,14 +454,67 @@ def test_live_search_last_allocate(user: UserClient, app: Devicehub):
|
|||
hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0]
|
||||
hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0]
|
||||
hdd_action['lifetime'] += 1000
|
||||
live, _ = user.post(acer, res=models.Snapshot)
|
||||
acer.pop('elapsed')
|
||||
acer['licence_version'] = '1.0.0'
|
||||
live, _ = client.post(acer, res=models.Live)
|
||||
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec4"
|
||||
actions = [a for a in acer['components'][7]['actions'] if a['type'] != 'TestDataStorage']
|
||||
acer['components'][7]['actions'] = actions
|
||||
live, _ = user.post(acer, res=models.Snapshot)
|
||||
live, _ = client.post(acer, res=models.Live)
|
||||
assert live['usageTimeAllocate'] == 1000
|
||||
|
||||
|
||||
@pytest.mark.mvp
|
||||
def test_save_live_json(app: Devicehub, user: UserClient, client: Client):
|
||||
""" This test check if works the function save_snapshot_in_file """
|
||||
acer = file('acer.happy.battery.snapshot')
|
||||
snapshot, _ = user.post(acer, res=models.Snapshot)
|
||||
debug = 'AAA'
|
||||
acer['debug'] = debug
|
||||
device_id = snapshot['device']['id']
|
||||
post_request = {"transaction": "ccc", "name": "John", "endUsers": 1,
|
||||
"devices": [device_id], "description": "aaa",
|
||||
"finalUserCode": "abcdefjhi",
|
||||
"startTime": "2020-11-01T02:00:00+00:00",
|
||||
"endTime": "2020-12-01T02:00:00+00:00"
|
||||
}
|
||||
|
||||
user.post(res=models.Allocate, data=post_request)
|
||||
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3"
|
||||
hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0]
|
||||
hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0]
|
||||
hdd_action['lifetime'] += 1000
|
||||
acer.pop('elapsed')
|
||||
acer['licence_version'] = '1.0.0'
|
||||
live, _ = client.post(acer, res=models.Live)
|
||||
|
||||
tmp_snapshots = app.config['TMP_LIVES']
|
||||
path_dir_base = os.path.join(tmp_snapshots)
|
||||
|
||||
uuid = acer['uuid']
|
||||
files = [x for x in os.listdir(path_dir_base) if uuid in x]
|
||||
|
||||
snapshot = {'debug': ''}
|
||||
if files:
|
||||
path_snapshot = os.path.join(path_dir_base, files[0])
|
||||
with open(path_snapshot) as file_snapshot:
|
||||
snapshot = json.loads(file_snapshot.read())
|
||||
|
||||
shutil.rmtree(tmp_snapshots)
|
||||
|
||||
assert snapshot['debug'] == debug
|
||||
|
||||
|
||||
@pytest.mark.mvp
|
||||
@pytest.mark.usefixtures(conftest.app_context.__name__)
|
||||
def test_licences(client: Client):
|
||||
"""Tests inserting a Live into the database and GETting it.
|
||||
"""
|
||||
licences, _ = client.get('/licences/')
|
||||
licences = json.loads(licences)
|
||||
assert licences[0]['USOdyPrivacyPolicyVersion'] == 'v.1'
|
||||
|
||||
|
||||
@pytest.mark.mvp
|
||||
@pytest.mark.usefixtures(conftest.app_context.__name__)
|
||||
def test_allocate(user: UserClient):
|
||||
|
|
|
@ -105,6 +105,8 @@ def test_api_docs(client: Client):
|
|||
'/allocates/',
|
||||
'/deallocates/',
|
||||
'/metrics/',
|
||||
'/licences/',
|
||||
'/lives/',
|
||||
}
|
||||
assert docs['info'] == {'title': 'Devicehub', 'version': '0.2'}
|
||||
assert docs['components']['securitySchemes']['bearerAuth'] == {
|
||||
|
|
|
@ -29,21 +29,23 @@ def test_simple_metrics(user: UserClient):
|
|||
hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0]
|
||||
hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0]
|
||||
hdd_action['powerCycleCount'] += 1000
|
||||
user.post(acer, res=ma.Snapshot)
|
||||
acer.pop('elapsed')
|
||||
acer['licence_version'] = '1.0.0'
|
||||
user.post(acer, res=ma.Live)
|
||||
|
||||
# Create a live
|
||||
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec4"
|
||||
hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0]
|
||||
hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0]
|
||||
hdd_action['powerCycleCount'] += 1000
|
||||
user.post(acer, res=ma.Snapshot)
|
||||
user.post(acer, res=ma.Live)
|
||||
|
||||
# Create an other live
|
||||
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec5"
|
||||
hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0]
|
||||
hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0]
|
||||
hdd_action['powerCycleCount'] += 1000
|
||||
user.post(acer, res=ma.Snapshot)
|
||||
user.post(acer, res=ma.Live)
|
||||
|
||||
# Check metrics
|
||||
metrics = {'allocateds': 1, 'live': 1}
|
||||
|
@ -72,20 +74,22 @@ def test_second_hdd_metrics(user: UserClient):
|
|||
hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0]
|
||||
hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0]
|
||||
hdd_action['powerCycleCount'] += 1000
|
||||
user.post(acer, res=ma.Snapshot)
|
||||
acer.pop('elapsed')
|
||||
acer['licence_version'] = '1.0.0'
|
||||
user.post(acer, res=ma.Live)
|
||||
|
||||
# Create a live
|
||||
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec4"
|
||||
hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0]
|
||||
hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0]
|
||||
hdd_action['powerCycleCount'] += 1000
|
||||
user.post(acer, res=ma.Snapshot)
|
||||
user.post(acer, res=ma.Live)
|
||||
|
||||
# Create a second device
|
||||
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec5"
|
||||
hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0]
|
||||
hdd['serialNumber'] = 'WD-WX11A80W7440'
|
||||
user.post(acer, res=ma.Snapshot)
|
||||
user.post(acer, res=ma.Live)
|
||||
|
||||
# Check metrics if we change the hdd we need a result of one device
|
||||
metrics = {'allocateds': 1, 'live': 1}
|
||||
|
|
Reference in New Issue