implementing the warnings and unnormal cases

This commit is contained in:
Cayo Puigdefabregas 2020-12-04 16:58:53 +01:00
parent c03d872d91
commit 7e41c66073
6 changed files with 291 additions and 36 deletions

View file

@ -62,7 +62,8 @@ def upgrade():
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False), sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('serial_number', sa.Unicode(), nullable=True, sa.Column('serial_number', sa.Unicode(), nullable=True,
comment='The serial number of the Hard Disk in lower case.'), comment='The serial number of the Hard Disk in lower case.'),
sa.Column('time', sa.SmallInteger(), nullable=True), sa.Column('usage_time_hdd', sa.Interval(), nullable=True),
sa.Column('snapshot_uuid', postgresql.UUID(as_uuid=True), nullable=False),
sa.ForeignKeyConstraint(['id'], [f'{get_inv()}.action.id'], ), sa.ForeignKeyConstraint(['id'], [f'{get_inv()}.action.id'], ),
sa.PrimaryKeyConstraint('id'), sa.PrimaryKeyConstraint('id'),
schema=f'{get_inv()}' schema=f'{get_inv()}'

View file

@ -10,6 +10,7 @@ to a structure based on:
Within the above general classes are subclasses in A order. Within the above general classes are subclasses in A order.
""" """
import copy
from collections import Iterable from collections import Iterable
from contextlib import suppress from contextlib import suppress
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
@ -1301,7 +1302,8 @@ class Live(JoinedWithOneDeviceMixin, ActionWithOneDevice):
""" """
serial_number = Column(Unicode(), check_lower('serial_number')) serial_number = Column(Unicode(), check_lower('serial_number'))
serial_number.comment = """The serial number of the Hard Disk in lower case.""" serial_number.comment = """The serial number of the Hard Disk in lower case."""
time = Column(SmallInteger, nullable=False) usage_time_hdd = Column(Interval, nullable=True)
snapshot_uuid = Column(UUID(as_uuid=True))
@property @property
def final_user_code(self): def final_user_code(self):
@ -1311,25 +1313,64 @@ class Live(JoinedWithOneDeviceMixin, ActionWithOneDevice):
for e in reversed(actions): for e in reversed(actions):
if isinstance(e, Allocate) and e.created < self.created: if isinstance(e, Allocate) and e.created < self.created:
return e.final_user_code return e.final_user_code
return ''
@property @property
def hours_of_use(self): def usage_time_allocate(self):
"""Show how many hours is used one device from the last check""" """Show how many hours is used one device from the last check"""
actions = self.device.actions self.sort_actions()
actions.sort(key=lambda x: x.created) if self.usage_time_hdd is None:
for e in reversed(actions): return self.last_usage_time_allocate()
if isinstance(e, Snapshot) and e.created < self.created:
return self.time - self.get_last_power_cycle(e)
delta_zero = timedelta(0)
diff_time = self.diff_time()
if diff_time is None:
return delta_zero
if diff_time < delta_zero:
return delta_zero
return diff_time
def sort_actions(self):
self.actions = copy.copy(self.device.actions)
self.actions.sort(key=lambda x: x.created)
self.actions.reverse()
def last_usage_time_allocate(self):
"""If we don't have self.usage_time_hdd then we need search the last
usage_time_allocate valid"""
for e in self.actions:
if isinstance(e, Live) and e.created < self.created: if isinstance(e, Live) and e.created < self.created:
return self.time - e.time if not e.usage_time_allocate:
continue
return e.usage_time_allocate
return timedelta(0)
def get_last_power_cycle(self, snapshot): def diff_time(self):
for e in self.actions:
if e.created > self.created:
continue
if isinstance(e, Snapshot):
last_time = self.get_last_lifetime(e)
if not last_time:
continue
return self.usage_time_hdd - last_time
if isinstance(e, Live):
if e.snapshot_uuid == self.snapshot_uuid:
continue
if not e.usage_time_hdd:
continue
return self.usage_time_hdd - e.usage_time_hdd
return None
def get_last_lifetime(self, snapshot):
for a in snapshot.actions: for a in snapshot.actions:
if a.type == 'TestDataStorage' and a.device.serial_number == self.serial_number: if a.type == 'TestDataStorage' and a.device.serial_number == self.serial_number:
return a.power_cycle_count return a.lifetime
return None
return 0
class Organize(JoinedTableMixin, ActionWithMultipleDevices): class Organize(JoinedTableMixin, ActionWithMultipleDevices):

View file

@ -414,8 +414,9 @@ class Live(ActionWithOneDevice):
__doc__ = m.Live.__doc__ __doc__ = m.Live.__doc__
final_user_code = SanitizedStr(data_key="finalUserCode", dump_only=True) final_user_code = SanitizedStr(data_key="finalUserCode", dump_only=True)
serial_number = SanitizedStr(data_key="serialNumber", dump_only=True) serial_number = SanitizedStr(data_key="serialNumber", dump_only=True)
time = Integer(dump_only=False) usage_time_hdd = TimeDelta(data_key="usageTimeHdd", precision=TimeDelta.HOURS, dump_only=True)
hours_of_use = Integer(dump_only=False) usage_time_allocate = TimeDelta(data_key="usageTimeAllocate",
precision=TimeDelta.HOURS, dump_only=True)
class Organize(ActionWithMultipleDevices): class Organize(ActionWithMultipleDevices):

View file

@ -3,7 +3,7 @@
import os import os
import json import json
import shutil import shutil
from datetime import datetime from datetime import datetime, timedelta
from distutils.version import StrictVersion from distutils.version import StrictVersion
from uuid import UUID from uuid import UUID
from flask.json import jsonify from flask.json import jsonify
@ -12,6 +12,7 @@ from flask import current_app as app, request, g, redirect
from sqlalchemy.util import OrderedSet from sqlalchemy.util import OrderedSet
from teal.marshmallow import ValidationError from teal.marshmallow import ValidationError
from teal.resource import View from teal.resource import View
from teal.db import ResourceNotFound
from ereuse_devicehub.db import db from ereuse_devicehub.db import db
from ereuse_devicehub.query import things_response from ereuse_devicehub.query import things_response
@ -211,38 +212,69 @@ class ActionView(View):
db.session.commit() db.session.commit()
return ret return ret
def get_hdd_details(self, snapshot, device):
"""We get the liftime and serial_number of the disk"""
usage_time_hdd = None
serial_number = None
for hd in snapshot['components']:
if not isinstance(hd, DataStorage):
continue
serial_number = hd.serial_number
for act in hd.actions:
if not act.type == "TestDataStorage":
continue
usage_time_hdd = act.lifetime
break
if usage_time_hdd:
break
if not serial_number:
"There aren't any disk"
raise ResourceNotFound("There aren't any disk in this device {}".format(device))
return usage_time_hdd, serial_number
def live(self, snapshot): def live(self, snapshot):
"""If the device.allocated == True, then this snapshot create an action live.""" """If the device.allocated == True, then this snapshot create an action live."""
device = snapshot.get('device') # type: Computer device = snapshot.get('device') # type: Computer
# TODO @cayop dependency of pulls 85 and 83 # TODO @cayop dependency of pulls 85 and 83
# if the pr/85 and pr/83 is merged, then you need change this way for get the device # if the pr/85 and pr/83 is merged, then you need change this way for get the device
if not device.hid or not Device.query.filter(Device.hid==device.hid).count(): if not device.hid or not Device.query.filter(Device.hid==device.hid).count():
return return None
device = Device.query.filter(Device.hid==device.hid).one() device = Device.query.filter(Device.hid==device.hid).one()
if not device.allocated: if not device.allocated:
return return None
time = 0 usage_time_hdd, serial_number = self.get_hdd_details(snapshot, device)
serial_number = ''
for hd in snapshot['components']:
if not isinstance(hd, DataStorage):
continue
for act in hd.actions:
if not act.type == "TestDataStorage":
continue
time = act.power_cycle_count
serial_number = hd.serial_number
if not serial_number: data_live = {'usage_time_hdd': usage_time_hdd,
return
data_live = {'time': time,
'serial_number': serial_number, 'serial_number': serial_number,
'snapshot_uuid': snapshot['uuid'],
'description': '',
'device': device} 'device': device}
return Live(**data_live) live = Live(**data_live)
if not usage_time_hdd:
warning = f"We don't found any TestDataStorage for disk sn: {serial_number}"
live.severity = Severity.Warning
live.description = warning
return live
live.sort_actions()
diff_time = live.diff_time()
if diff_time is None:
warning = "Don't exist one previus live or snapshot as reference"
live.description += warning
live.severity = Severity.Warning
elif diff_time < timedelta(0):
warning = "The difference with the last live/snapshot is negative"
live.description += warning
live.severity = Severity.Warning
return live
def transfer_ownership(self): def transfer_ownership(self):
"""Perform a InitTransfer action to change author_id of device""" """Perform a InitTransfer action to change author_id of device"""

View file

@ -154,6 +154,8 @@ class Sync:
if device.hid: if device.hid:
with suppress(ResourceNotFound): with suppress(ResourceNotFound):
db_device = Device.query.filter_by(hid=device.hid).one() db_device = Device.query.filter_by(hid=device.hid).one()
if db_device and db_device.allocated:
raise ResourceNotFound('device is actually allocated {}'.format(device))
try: try:
tags = {Tag.from_an_id(tag.id).one() for tag in device.tags} # type: Set[Tag] tags = {Tag.from_an_id(tag.id).one() for tag in device.tags} # type: Set[Tag]
except ResourceNotFound: except ResourceNotFound:

View file

@ -264,15 +264,193 @@ def test_live(user: UserClient, app: Devicehub):
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3" acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3"
hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0] hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0]
hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0] hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0]
hdd_action['powerCycleCount'] += 1000 hdd_action['lifetime'] += 1000
snapshot, _ = user.post(acer, res=models.Snapshot) snapshot, _ = user.post(acer, res=models.Snapshot)
db_device = Device.query.filter_by(id=1).one() db_device = Device.query.filter_by(id=1).one()
action_live = [a for a in db_device.actions if a.type == 'Live'] action_live = [a for a in db_device.actions if a.type == 'Live']
assert len(action_live) == 1 assert len(action_live) == 1
assert action_live[0].time == 6293 assert action_live[0].usage_time_hdd == timedelta(hours=hdd_action['lifetime'])
assert action_live[0].hours_of_use == 1000 assert action_live[0].usage_time_allocate == timedelta(hours=1000)
assert action_live[0].final_user_code == post_request['finalUserCode'] assert action_live[0].final_user_code == post_request['finalUserCode']
assert action_live[0].serial_number == 'wd-wx11a80w7430' assert action_live[0].serial_number == 'wd-wx11a80w7430'
assert str(action_live[0].snapshot_uuid) == acer['uuid']
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_live_without_TestDataStorage(user: UserClient, app: Devicehub):
"""Tests inserting a Live into the database and GETting it.
If the live don't have a TestDataStorage, then save live and response None
"""
acer = file('acer.happy.battery.snapshot')
snapshot, _ = user.post(acer, res=models.Snapshot)
device_id = snapshot['device']['id']
db_device = Device.query.filter_by(id=1).one()
post_request = {"transaction": "ccc", "name": "John", "endUsers": 1,
"devices": [device_id], "description": "aaa",
"finalUserCode": "abcdefjhi",
"startTime": "2020-11-01T02:00:00+00:00",
"endTime": "2020-12-01T02:00:00+00:00"
}
user.post(res=models.Allocate, data=post_request)
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3"
actions = [a for a in acer['components'][7]['actions'] if a['type'] != 'TestDataStorage']
acer['components'][7]['actions'] = actions
live, _ = user.post(acer, res=models.Snapshot)
assert live['type'] == 'Live'
assert live['serialNumber'] == 'wd-wx11a80w7430'
assert live['severity'] == 'Warning'
description = "We don't found any TestDataStorage for disk sn: wd-wx11a80w7430"
assert live['description'] == description
db_live = models.Live.query.filter_by(id=live['id']).one()
assert db_live.usage_time_hdd is None
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_live_without_hdd_1(user: UserClient, app: Devicehub):
"""Tests inserting a Live into the database and GETting it.
The snapshot have hdd but the live no, and response 404
"""
acer = file('acer.happy.battery.snapshot')
snapshot, _ = user.post(acer, res=models.Snapshot)
device_id = snapshot['device']['id']
db_device = Device.query.filter_by(id=1).one()
post_request = {"transaction": "ccc", "name": "John", "endUsers": 1,
"devices": [device_id], "description": "aaa",
"finalUserCode": "abcdefjhi",
"startTime": "2020-11-01T02:00:00+00:00",
"endTime": "2020-12-01T02:00:00+00:00"
}
user.post(res=models.Allocate, data=post_request)
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3"
components = [a for a in acer['components'] if a['type'] != 'HardDrive']
acer['components'] = components
response, _ = user.post(acer, res=models.Snapshot, status=404)
assert "The There aren't any disk in this device" in response['message']
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_live_without_hdd_2(user: UserClient, app: Devicehub):
"""Tests inserting a Live into the database and GETting it.
The snapshot haven't hdd and the live neither, and response 404
"""
acer = file('acer.happy.battery.snapshot')
components = [a for a in acer['components'] if a['type'] != 'HardDrive']
acer['components'] = components
snapshot, _ = user.post(acer, res=models.Snapshot)
device_id = snapshot['device']['id']
db_device = Device.query.filter_by(id=1).one()
post_request = {"transaction": "ccc", "name": "John", "endUsers": 1,
"devices": [device_id], "description": "aaa",
"finalUserCode": "abcdefjhi",
"startTime": "2020-11-01T02:00:00+00:00",
"endTime": "2020-12-01T02:00:00+00:00"
}
user.post(res=models.Allocate, data=post_request)
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3"
response, _ = user.post(acer, res=models.Snapshot, status=404)
assert "The There aren't any disk in this device" in response['message']
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_live_without_hdd_3(user: UserClient, app: Devicehub):
"""Tests inserting a Live into the database and GETting it.
The snapshot haven't hdd and the live have, and save the live
with usage_time_allocate == 0
"""
acer = file('acer.happy.battery.snapshot')
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3"
components = [a for a in acer['components'] if a['type'] != 'HardDrive']
acer['components'] = components
snapshot, _ = user.post(acer, res=models.Snapshot)
device_id = snapshot['device']['id']
db_device = Device.query.filter_by(id=1).one()
post_request = {"transaction": "ccc", "name": "John", "endUsers": 1,
"devices": [device_id], "description": "aaa",
"finalUserCode": "abcdefjhi",
"startTime": "2020-11-01T02:00:00+00:00",
"endTime": "2020-12-01T02:00:00+00:00"
}
user.post(res=models.Allocate, data=post_request)
acer = file('acer.happy.battery.snapshot')
live, _ = user.post(acer, res=models.Snapshot)
assert live['type'] == 'Live'
assert live['serialNumber'] == 'wd-wx11a80w7430'
assert live['severity'] == 'Warning'
description = "Don't exist one previus live or snapshot as reference"
assert live['description'] == description
db_live = models.Live.query.filter_by(id=live['id']).one()
assert str(db_live.usage_time_hdd) == '195 days, 12:00:00'
assert str(db_live.usage_time_allocate) == '0:00:00'
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_live_with_hdd_with_old_time(user: UserClient, app: Devicehub):
"""Tests inserting a Live into the database and GETting it.
The snapshot hdd have a lifetime higher than lifetime of the live action
save the live with usage_time_allocate == 0
"""
acer = file('acer.happy.battery.snapshot')
snapshot, _ = user.post(acer, res=models.Snapshot)
device_id = snapshot['device']['id']
db_device = Device.query.filter_by(id=1).one()
post_request = {"transaction": "ccc", "name": "John", "endUsers": 1,
"devices": [device_id], "description": "aaa",
"finalUserCode": "abcdefjhi",
"startTime": "2020-11-01T02:00:00+00:00",
"endTime": "2020-12-01T02:00:00+00:00"
}
user.post(res=models.Allocate, data=post_request)
acer = file('acer.happy.battery.snapshot')
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3"
action = [a for a in acer['components'][7]['actions'] if a['type'] == 'TestDataStorage']
action[0]['lifetime'] -= 100
live, _ = user.post(acer, res=models.Snapshot)
assert live['type'] == 'Live'
assert live['serialNumber'] == 'wd-wx11a80w7430'
assert live['severity'] == 'Warning'
description = "The difference with the last live/snapshot is negative"
assert live['description'] == description
db_live = models.Live.query.filter_by(id=live['id']).one()
assert str(db_live.usage_time_hdd) == '191 days, 8:00:00'
assert str(db_live.usage_time_allocate) == '0:00:00'
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_live_search_last_allocate(user: UserClient, app: Devicehub):
"""Tests inserting a Live into the database and GETting it.
"""
acer = file('acer.happy.battery.snapshot')
snapshot, _ = user.post(acer, res=models.Snapshot)
device_id = snapshot['device']['id']
db_device = Device.query.filter_by(id=1).one()
post_request = {"transaction": "ccc", "name": "John", "endUsers": 1,
"devices": [device_id], "description": "aaa",
"finalUserCode": "abcdefjhi",
"startTime": "2020-11-01T02:00:00+00:00",
"endTime": "2020-12-01T02:00:00+00:00"
}
user.post(res=models.Allocate, data=post_request)
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3"
hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0]
hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0]
hdd_action['lifetime'] += 1000
live, _ = user.post(acer, res=models.Snapshot)
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec4"
actions = [a for a in acer['components'][7]['actions'] if a['type'] != 'TestDataStorage']
acer['components'][7]['actions'] = actions
live, _ = user.post(acer, res=models.Snapshot)
assert live['usageTimeAllocate'] == 1000
@pytest.mark.mvp @pytest.mark.mvp