Merge branch 'testing' into bugfix/89-snapshots-in-hdd

This commit is contained in:
Cayo Puigdefabregas 2020-12-09 10:11:45 +01:00
commit e2446c5001
18 changed files with 945 additions and 163 deletions

View file

@ -14,6 +14,7 @@ from ereuse_devicehub.resources.device import definitions
from ereuse_devicehub.resources.documents import documents
from ereuse_devicehub.resources.enums import PriceSoftware
from ereuse_devicehub.resources.versions import versions
from ereuse_devicehub.resources.metric import definitions as metric_def
class DevicehubConfig(Config):
@ -27,8 +28,9 @@ class DevicehubConfig(Config):
import_resource(proof),
import_resource(documents),
import_resource(inventory),
import_resource(versions)),
)
import_resource(versions),
import_resource(metric_def),
),)
PASSWORD_SCHEMES = {'pbkdf2_sha256'} # type: Set[str]
DB_USER = config('DB_USER', 'dhub')
DB_PASSWORD = config('DB_PASSWORD', 'ereuse')

View file

@ -0,0 +1,73 @@
"""Added Assigned action
Revision ID: e93aec8fc41f
Revises: b9b0ee7d9dca
Create Date: 2020-11-17 13:22:56.790956
"""
from alembic import op
import sqlalchemy as sa
from alembic import context
import sqlalchemy_utils
import citext
import teal
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = 'e93aec8fc41f'
down_revision = 'b9b0ee7d9dca'
branch_labels = None
depends_on = None
def get_inv():
INV = context.get_x_argument(as_dictionary=True).get('inventory')
if not INV:
raise ValueError("Inventory value is not specified")
return INV
def upgrade():
# Allocate action
op.drop_table('allocate', schema=f'{get_inv()}')
op.create_table('allocate',
sa.Column('final_user_code', citext.CIText(), default='', nullable=True,
comment = "This is a internal code for mainteing the secrets of the personal datas of the new holder"),
sa.Column('transaction', citext.CIText(), nullable=True, comment='The code used from the owner for relation with external tool.'),
sa.Column('end_users', sa.Numeric(precision=4), nullable=True),
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
sa.ForeignKeyConstraint(['id'], [f'{get_inv()}.action.id'], ),
sa.PrimaryKeyConstraint('id'),
schema=f'{get_inv()}'
)
# Deallocate action
op.drop_table('deallocate', schema=f'{get_inv()}')
op.create_table('deallocate',
sa.Column('transaction', citext.CIText(), nullable=True, comment='The code used from the owner for relation with external tool.'),
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
sa.ForeignKeyConstraint(['id'], [f'{get_inv()}.action.id'], ),
sa.PrimaryKeyConstraint('id'),
schema=f'{get_inv()}'
)
# Add allocate as a column in device
op.add_column('device', sa.Column('allocated', sa.Boolean(), nullable=True), schema=f'{get_inv()}')
# Receive action
op.drop_table('receive', schema=f'{get_inv()}')
# Live action
op.drop_table('live', schema=f'{get_inv()}')
op.create_table('live',
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('serial_number', sa.Unicode(), nullable=True,
comment='The serial number of the Hard Disk in lower case.'),
sa.Column('usage_time_hdd', sa.Interval(), nullable=True),
sa.Column('snapshot_uuid', postgresql.UUID(as_uuid=True), nullable=False),
sa.ForeignKeyConstraint(['id'], [f'{get_inv()}.action.id'], ),
sa.PrimaryKeyConstraint('id'),
schema=f'{get_inv()}'
)
def downgrade():
op.drop_table('allocate', schema=f'{get_inv()}')

View file

@ -3,7 +3,7 @@ from typing import Callable, Iterable, Tuple
from teal.resource import Converters, Resource
from ereuse_devicehub.resources.action import schemas
from ereuse_devicehub.resources.action.views import ActionView
from ereuse_devicehub.resources.action.views import ActionView, AllocateView, DeallocateView
from ereuse_devicehub.resources.device.sync import Sync
@ -198,6 +198,16 @@ class ToPrepareDef(ActionDef):
SCHEMA = schemas.ToPrepare
class AllocateDef(ActionDef):
VIEW = AllocateView
SCHEMA = schemas.Allocate
class DeallocateDef(ActionDef):
VIEW = DeallocateView
SCHEMA = schemas.Deallocate
class PrepareDef(ActionDef):
VIEW = None
SCHEMA = schemas.Prepare
@ -253,11 +263,6 @@ class DisposeProductDef(ActionDef):
SCHEMA = schemas.DisposeProduct
class ReceiveDef(ActionDef):
VIEW = None
SCHEMA = schemas.Receive
class MigrateToDef(ActionDef):
VIEW = None
SCHEMA = schemas.MigrateTo

View file

@ -10,6 +10,7 @@ to a structure based on:
Within the above general classes are subclasses in A order.
"""
import copy
from collections import Iterable
from contextlib import suppress
from datetime import datetime, timedelta, timezone
@ -43,7 +44,7 @@ from ereuse_devicehub.resources.device.models import Component, Computer, DataSt
Device, Laptop, Server
from ereuse_devicehub.resources.enums import AppearanceRange, BatteryHealth, BiosAccessRange, \
ErasureStandards, FunctionalityRange, PhysicalErasureMethod, PriceSoftware, \
R_NEGATIVE, R_POSITIVE, RatingRange, ReceiverRole, Severity, SnapshotSoftware, \
R_NEGATIVE, R_POSITIVE, RatingRange, Severity, SnapshotSoftware, \
TestDataStorageLength
from ereuse_devicehub.resources.models import STR_SM_SIZE, Thing
from ereuse_devicehub.resources.user.models import User
@ -91,7 +92,7 @@ class Action(Thing):
end_time = Column(db.TIMESTAMP(timezone=True))
end_time.comment = """When the action ends. For some actions like reservations
the time when they expire, for others like renting
the time the end rents. For punctual actions it is the time
the time the end rents. For punctual actions it is the time
they are performed; it differs with ``created`` in which
created is the where the system received the action.
"""
@ -115,7 +116,7 @@ class Action(Thing):
backref=backref('authored_actions', lazy=True, collection_class=set),
primaryjoin=author_id == User.id)
author_id.comment = """The user that recorded this action in the system.
This does not necessarily has to be the person that produced
the action in the real world. For that purpose see
``agent``.
@ -129,9 +130,8 @@ class Action(Thing):
agent = relationship(Agent,
backref=backref('actions_agent', lazy=True, **_sorted_actions),
primaryjoin=agent_id == Agent.id)
agent_id.comment = """The direct performer or driver of the action.
e.g. John wrote a book.
agent_id.comment = """The direct performer or driver of the action. e.g. John wrote a book.
It can differ with the user that registered the action in the
system, which can be in their behalf.
"""
@ -142,14 +142,14 @@ class Action(Thing):
order_by=lambda: Component.id,
collection_class=OrderedSet)
components.comment = """The components that are affected by the action.
When performing actions to parent devices their components are
affected too.
For example: an ``Allocate`` is performed to a Computer and this
relationship is filled with the components the computer had
at the time of the action.
For Add and Remove though, this has another meaning: the components
that are added or removed.
"""
@ -157,9 +157,9 @@ class Action(Thing):
parent = relationship(Computer,
backref=backref('actions_parent', lazy=True, **_sorted_actions),
primaryjoin=parent_id == Computer.id)
parent_id.comment = """For actions that are performed to components,
parent_id.comment = """For actions that are performed to components,
the device parent at that time.
For example: for a ``EraseBasic`` performed on a data storage, this
would point to the computer that contained this data storage, if any.
"""
@ -312,15 +312,21 @@ class Remove(ActionWithOneDevice):
class Allocate(JoinedTableMixin, ActionWithMultipleDevices):
to_id = Column(UUID, ForeignKey(User.id))
to = relationship(User, primaryjoin=User.id == to_id)
organization = Column(CIText())
"""The act of allocate one list of devices to one person
"""
final_user_code = Column(CIText(), default='', nullable=True)
final_user_code.comment = """This is a internal code for mainteing the secrets of the
personal datas of the new holder"""
transaction = Column(CIText(), default='', nullable=True)
transaction.comment = "The code used from the owner for relation with external tool."
end_users = Column(Numeric(precision=4), check_range('end_users', 0), nullable=True)
class Deallocate(JoinedTableMixin, ActionWithMultipleDevices):
from_id = Column(UUID, ForeignKey(User.id))
from_rel = relationship(User, primaryjoin=User.id == from_id)
organization = Column(CIText())
"""The act of deallocate one list of devices to one person of the system or not
"""
transaction= Column(CIText(), default='', nullable=True)
transaction.comment = "The code used from the owner for relation with external tool."
class EraseBasic(JoinedWithOneDeviceMixin, ActionWithOneDevice):
@ -533,7 +539,7 @@ class Snapshot(JoinedWithOneDeviceMixin, ActionWithOneDevice):
version = Column(StrictVersionType(STR_SM_SIZE), nullable=False)
software = Column(DBEnum(SnapshotSoftware), nullable=False)
elapsed = Column(Interval)
elapsed.comment = """For Snapshots made with Workbench, the total amount
elapsed.comment = """For Snapshots made with Workbench, the total amount
of time it took to complete.
"""
@ -680,11 +686,11 @@ class MeasureBattery(TestMixin, Test):
voltage = db.Column(db.Integer, nullable=False)
voltage.comment = """The actual voltage of the battery, in mV."""
cycle_count = db.Column(db.Integer)
cycle_count.comment = """The number of full charges discharges
cycle_count.comment = """The number of full charges discharges
cycles.
"""
health = db.Column(db.Enum(BatteryHealth))
health.comment = """The health of the Battery.
health.comment = """The health of the Battery.
Only reported in Android.
"""
@ -883,12 +889,12 @@ class TestBios(TestMixin, Test):
beeps_power_on = Column(Boolean)
beeps_power_on.comment = """Whether there are no beeps or error
codes when booting up.
Reference: R2 provision 6 page 23.
"""
access_range = Column(DBEnum(BiosAccessRange))
access_range.comment = """Difficulty to modify the boot menu.
This is used as an usability measure for accessing and modifying
a bios, specially as something as important as modifying the boot
menu.
@ -1294,25 +1300,77 @@ class Live(JoinedWithOneDeviceMixin, ActionWithOneDevice):
information about its state (in the form of a ``Snapshot`` action)
and usage statistics.
"""
ip = Column(IP, nullable=False,
comment='The IP where the live was triggered.')
subdivision_confidence = Column(SmallInteger,
check_range('subdivision_confidence', 0, 100),
nullable=False)
subdivision = Column(DBEnum(Subdivision), nullable=False)
city = Column(Unicode(STR_SM_SIZE), check_lower('city'), nullable=False)
city_confidence = Column(SmallInteger,
check_range('city_confidence', 0, 100),
nullable=False)
isp = Column(Unicode(STR_SM_SIZE), check_lower('isp'), nullable=False)
organization = Column(Unicode(STR_SM_SIZE), check_lower('organization'))
organization_type = Column(Unicode(STR_SM_SIZE), check_lower('organization_type'))
serial_number = Column(Unicode(), check_lower('serial_number'))
serial_number.comment = """The serial number of the Hard Disk in lower case."""
usage_time_hdd = Column(Interval, nullable=True)
snapshot_uuid = Column(UUID(as_uuid=True))
@property
def country(self) -> Country:
return self.subdivision.country
# todo relate to snapshot
# todo testing
def final_user_code(self):
""" show the final_user_code of the last action Allocate."""
actions = self.device.actions
actions.sort(key=lambda x: x.created)
for e in reversed(actions):
if isinstance(e, Allocate) and e.created < self.created:
return e.final_user_code
return ''
@property
def usage_time_allocate(self):
"""Show how many hours is used one device from the last check"""
self.sort_actions()
if self.usage_time_hdd is None:
return self.last_usage_time_allocate()
delta_zero = timedelta(0)
diff_time = self.diff_time()
if diff_time is None:
return delta_zero
if diff_time < delta_zero:
return delta_zero
return diff_time
def sort_actions(self):
self.actions = copy.copy(self.device.actions)
self.actions.sort(key=lambda x: x.created)
self.actions.reverse()
def last_usage_time_allocate(self):
"""If we don't have self.usage_time_hdd then we need search the last
action Live with usage_time_allocate valid"""
for e in self.actions:
if isinstance(e, Live) and e.created < self.created:
if not e.usage_time_allocate:
continue
return e.usage_time_allocate
return timedelta(0)
def diff_time(self):
for e in self.actions:
if e.created > self.created:
continue
if isinstance(e, Snapshot):
last_time = self.get_last_lifetime(e)
if not last_time:
continue
return self.usage_time_hdd - last_time
if isinstance(e, Live):
if e.snapshot_uuid == self.snapshot_uuid:
continue
if not e.usage_time_hdd:
continue
return self.usage_time_hdd - e.usage_time_hdd
return None
def get_last_lifetime(self, snapshot):
for a in snapshot.actions:
if a.type == 'TestDataStorage' and a.device.serial_number == self.serial_number:
return a.lifetime
return None
class Organize(JoinedTableMixin, ActionWithMultipleDevices):
@ -1348,7 +1406,7 @@ class Trade(JoinedTableMixin, ActionWithMultipleDevices):
extend `Schema's Trade <http://schema.org/TradeAction>`_.
"""
shipping_date = Column(db.TIMESTAMP(timezone=True))
shipping_date.comment = """When are the devices going to be ready
shipping_date.comment = """When are the devices going to be ready
for shipping?
"""
invoice_number = Column(CIText())
@ -1357,7 +1415,7 @@ class Trade(JoinedTableMixin, ActionWithMultipleDevices):
price = relationship(Price,
backref=backref('trade', lazy=True, uselist=False),
primaryjoin=price_id == Price.id)
price_id.comment = """The price set for this trade.
price_id.comment = """The price set for this trade.
If no price is set it is supposed that the trade was
not payed, usual in donations.
"""
@ -1371,8 +1429,7 @@ class Trade(JoinedTableMixin, ActionWithMultipleDevices):
confirms = relationship(Organize,
backref=backref('confirmation', lazy=True, uselist=False),
primaryjoin=confirms_id == Organize.id)
confirms_id.comment = """An organize action that this association confirms.
confirms_id.comment = """An organize action that this association confirms.
For example, a ``Sell`` or ``Rent``
can confirm a ``Reserve`` action.
"""
@ -1434,26 +1491,6 @@ class MakeAvailable(ActionWithMultipleDevices):
pass
class Receive(JoinedTableMixin, ActionWithMultipleDevices):
"""The act of physically taking delivery of a device.
The receiver confirms that the devices have arrived, and thus,
they are the
:attr:`ereuse_devicehub.resources.device.models.Device.physical_possessor`.
This differs from :class:`.Trade` in that trading changes the
political possession. As an example, a transporter can *receive*
a device but it is not it's owner. After the delivery, the
transporter performs another *receive* to the final owner.
The receiver can optionally take a
:class:`ereuse_devicehub.resources.enums.ReceiverRole`.
"""
role = Column(DBEnum(ReceiverRole),
nullable=False,
default=ReceiverRole.Intermediary)
class Migrate(JoinedTableMixin, ActionWithMultipleDevices):
"""Moves the devices to a new database/inventory. Devices cannot be
modified anymore at the previous database.

View file

@ -447,26 +447,8 @@ class Prepare(ActionWithMultipleDevices):
class Live(ActionWithOneDevice):
ip = ... # type: Column
subdivision_confidence = ... # type: Column
subdivision = ... # type: Column
city = ... # type: Column
city_confidence = ... # type: Column
isp = ... # type: Column
organization = ... # type: Column
organization_type = ... # type: Column
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.ip = ... # type: Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
self.subdivision_confidence = ... # type: int
self.subdivision = ... # type: enums.Subdivision
self.city = ... # type: str
self.city_confidence = ... # type: int
self.isp = ... # type: str
self.organization = ... # type: str
self.organization_type = ... # type: str
self.country = ... # type: Country
serial_number = ... # type: Column
time = ... # type: Column
class Organize(ActionWithMultipleDevices):
@ -527,14 +509,15 @@ class DisposeProduct(Trade):
class TransferOwnershipBlockchain(Trade):
pass
class Allocate(ActionWithMultipleDevices):
code = ... # type: Column
end_users = ... # type: Column
class Receive(ActionWithMultipleDevices):
role = ... # type:Column
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.role = ... # type: ReceiverRole
class Deallocate(ActionWithMultipleDevices):
code = ... # type: Column
class Migrate(ActionWithMultipleDevices):

View file

@ -1,3 +1,5 @@
from datetime import datetime, timedelta
from dateutil.tz import tzutc
from flask import current_app as app
from marshmallow import Schema as MarshmallowSchema, ValidationError, fields as f, validates_schema
from marshmallow.fields import Boolean, DateTime, Decimal, Float, Integer, Nested, String, \
@ -14,7 +16,7 @@ from ereuse_devicehub.resources.action import models as m
from ereuse_devicehub.resources.agent import schemas as s_agent
from ereuse_devicehub.resources.device import schemas as s_device
from ereuse_devicehub.resources.enums import AppearanceRange, BiosAccessRange, FunctionalityRange, \
PhysicalErasureMethod, R_POSITIVE, RatingRange, ReceiverRole, \
PhysicalErasureMethod, R_POSITIVE, RatingRange, \
Severity, SnapshotSoftware, TestDataStorageLength
from ereuse_devicehub.resources.models import STR_BIG_SIZE, STR_SIZE
from ereuse_devicehub.resources.schemas import Thing
@ -64,21 +66,62 @@ class Remove(ActionWithOneDevice):
class Allocate(ActionWithMultipleDevices):
__doc__ = m.Allocate.__doc__
to = NestedOn(s_user.User,
description='The user the devices are allocated to.')
organization = SanitizedStr(validate=Length(max=STR_SIZE),
description='The organization where the '
'user was when this happened.')
start_time = DateTime(data_key='startTime', required=True,
description=m.Action.start_time.comment)
end_time = DateTime(data_key='endTime', required=False,
description=m.Action.end_time.comment)
final_user_code = SanitizedStr(data_key="finalUserCode",
validate=Length(min=1, max=STR_BIG_SIZE),
required=False,
description='This is a internal code for mainteing the secrets of the \
personal datas of the new holder')
transaction = SanitizedStr(validate=Length(min=1, max=STR_BIG_SIZE),
required=False,
description='The code used from the owner for \
relation with external tool.')
end_users = Integer(data_key='endUsers', validate=[Range(min=1, error="Value must be greater than 0")])
@validates_schema
def validate_allocate(self, data: dict):
txt = "You need to allocate for a day before today"
delay = timedelta(days=1)
today = datetime.now().replace(tzinfo=tzutc()) + delay
start_time = data['start_time'].replace(tzinfo=tzutc())
if start_time > today:
raise ValidationError(txt)
txt = "You need deallocate before allocate this device again"
for device in data['devices']:
if device.allocated:
raise ValidationError(txt)
device.allocated = True
class Deallocate(ActionWithMultipleDevices):
__doc__ = m.Deallocate.__doc__
from_rel = Nested(s_user.User,
data_key='from',
description='The user where the devices are not allocated to anymore.')
organization = SanitizedStr(validate=Length(max=STR_SIZE),
description='The organization where the '
'user was when this happened.')
start_time = DateTime(data_key='startTime', required=True,
description=m.Action.start_time.comment)
transaction = SanitizedStr(validate=Length(min=1, max=STR_BIG_SIZE),
required=False,
description='The code used from the owner for \
relation with external tool.')
@validates_schema
def validate_deallocate(self, data: dict):
txt = "You need to deallocate for a day before today"
delay = timedelta(days=1)
today = datetime.now().replace(tzinfo=tzutc()) + delay
start_time = data['start_time'].replace(tzinfo=tzutc())
if start_time > today:
raise ValidationError(txt)
txt = "Sorry some of this devices are actually deallocate"
for device in data['devices']:
if not device.allocated:
raise ValidationError(txt)
device.allocated = False
class EraseBasic(ActionWithOneDevice):
@ -369,15 +412,11 @@ class Prepare(ActionWithMultipleDevices):
class Live(ActionWithOneDevice):
__doc__ = m.Live.__doc__
ip = IP(dump_only=True)
subdivision_confidence = Integer(dump_only=True, data_key='subdivisionConfidence')
subdivision = EnumField(Subdivision, dump_only=True)
country = EnumField(Country, dump_only=True)
city = SanitizedStr(lower=True, dump_only=True)
city_confidence = Integer(dump_only=True, data_key='cityConfidence')
isp = SanitizedStr(lower=True, dump_only=True)
organization = SanitizedStr(lower=True, dump_only=True)
organization_type = SanitizedStr(lower=True, dump_only=True, data_key='organizationType')
final_user_code = SanitizedStr(data_key="finalUserCode", dump_only=True)
serial_number = SanitizedStr(data_key="serialNumber", dump_only=True)
usage_time_hdd = TimeDelta(data_key="usageTimeHdd", precision=TimeDelta.HOURS, dump_only=True)
usage_time_allocate = TimeDelta(data_key="usageTimeAllocate",
precision=TimeDelta.HOURS, dump_only=True)
class Organize(ActionWithMultipleDevices):
@ -437,11 +476,6 @@ class TransferOwnershipBlockchain(Trade):
__doc__ = m.TransferOwnershipBlockchain.__doc__
class Receive(ActionWithMultipleDevices):
__doc__ = m.Receive.__doc__
role = EnumField(ReceiverRole)
class Migrate(ActionWithMultipleDevices):
__doc__ = m.Migrate.__doc__
other = URL()

View file

@ -3,18 +3,22 @@
import os
import json
import shutil
from datetime import datetime
from datetime import datetime, timedelta
from distutils.version import StrictVersion
from uuid import UUID
from flask.json import jsonify
from flask import current_app as app, request, g
from flask import current_app as app, request, g, redirect
from sqlalchemy.util import OrderedSet
from teal.marshmallow import ValidationError
from teal.resource import View
from teal.db import ResourceNotFound
from ereuse_devicehub.db import db
from ereuse_devicehub.resources.action.models import Action, RateComputer, Snapshot, VisualTest, \
InitTransfer
from ereuse_devicehub.query import things_response
from ereuse_devicehub.resources.action.models import (Action, RateComputer, Snapshot, VisualTest,
InitTransfer, Live, Allocate, Deallocate)
from ereuse_devicehub.resources.device.models import Device, Computer, DataStorage
from ereuse_devicehub.resources.action.rate.v1_0 import CannotRate
from ereuse_devicehub.resources.enums import SnapshotSoftware, Severity
from ereuse_devicehub.resources.user.exceptions import InsufficientPermission
@ -61,6 +65,38 @@ def move_json(tmp_snapshots, path_name, user):
os.remove(path_name)
class AllocateMix():
model = None
def post(self):
""" Create one res_obj """
res_json = request.get_json()
res_obj = self.model(**res_json)
db.session.add(res_obj)
db.session().final_flush()
ret = self.schema.jsonify(res_obj)
ret.status_code = 201
db.session.commit()
return ret
def find(self, args: dict):
res_objs = self.model.query.filter_by(author=g.user) \
.order_by(self.model.created.desc()) \
.paginate(per_page=200)
return things_response(
self.schema.dump(res_objs.items, many=True, nested=0),
res_objs.page, res_objs.per_page, res_objs.total,
res_objs.prev_num, res_objs.next_num
)
class AllocateView(AllocateMix, View):
model = Allocate
class DeallocateView(AllocateMix, View):
model = Deallocate
class ActionView(View):
def post(self):
"""Posts an action."""
@ -107,6 +143,16 @@ class ActionView(View):
# model object, when we flush them to the db we will flush
# snapshot, and we want to wait to flush snapshot at the end
# If the device is allocated, then snapshot is a live
live = self.live(snapshot_json)
if live:
db.session.add(live)
db.session().final_flush()
ret = self.schema.jsonify(live) # transform it back
ret.status_code = 201
db.session.commit()
return ret
device = snapshot_json.pop('device') # type: Computer
components = None
if snapshot_json['software'] == (SnapshotSoftware.Workbench or SnapshotSoftware.WorkbenchAndroid):
@ -157,6 +203,7 @@ class ActionView(View):
# Check if HID is null and add Severity:Warning to Snapshot
if snapshot.device.hid is None:
snapshot.severity = Severity.Warning
db.session.add(snapshot)
db.session().final_flush()
ret = self.schema.jsonify(snapshot) # transform it back
@ -164,6 +211,70 @@ class ActionView(View):
db.session.commit()
return ret
def get_hdd_details(self, snapshot, device):
"""We get the liftime and serial_number of the disk"""
usage_time_hdd = None
serial_number = None
for hd in snapshot['components']:
if not isinstance(hd, DataStorage):
continue
serial_number = hd.serial_number
for act in hd.actions:
if not act.type == "TestDataStorage":
continue
usage_time_hdd = act.lifetime
break
if usage_time_hdd:
break
if not serial_number:
"There aren't any disk"
raise ResourceNotFound("There aren't any disk in this device {}".format(device))
return usage_time_hdd, serial_number
def live(self, snapshot):
"""If the device.allocated == True, then this snapshot create an action live."""
device = snapshot.get('device') # type: Computer
# TODO @cayop dependency of pulls 85 and 83
# if the pr/85 and pr/83 is merged, then you need change this way for get the device
if not device.hid or not Device.query.filter(Device.hid==device.hid).count():
return None
device = Device.query.filter(Device.hid==device.hid).one()
if not device.allocated:
return None
usage_time_hdd, serial_number = self.get_hdd_details(snapshot, device)
data_live = {'usage_time_hdd': usage_time_hdd,
'serial_number': serial_number,
'snapshot_uuid': snapshot['uuid'],
'description': '',
'device': device}
live = Live(**data_live)
if not usage_time_hdd:
warning = f"We don't found any TestDataStorage for disk sn: {serial_number}"
live.severity = Severity.Warning
live.description = warning
return live
live.sort_actions()
diff_time = live.diff_time()
if diff_time is None:
warning = "Don't exist one previous live or snapshot as reference"
live.description += warning
live.severity = Severity.Warning
elif diff_time < timedelta(0):
warning = "The difference with the last live/snapshot is negative"
live.description += warning
live.severity = Severity.Warning
return live
def transfer_ownership(self):
"""Perform a InitTransfer action to change author_id of device"""
pass

View file

@ -106,6 +106,9 @@ class Device(Thing):
image = db.Column(db.URL)
image.comment = "An image of the device."
allocated = db.Column(Boolean, default=False)
allocated.comment = "device is allocated or not."
_NON_PHYSICAL_PROPS = {
'id',
'type',
@ -125,7 +128,8 @@ class Device(Thing):
'variant',
'version',
'sku',
'image'
'image',
'allocated'
}
__table_args__ = (
@ -148,7 +152,7 @@ class Device(Thing):
Actions are returned by descending ``created`` time.
"""
return sorted(chain(self.actions_multiple, self.actions_one))
return sorted(chain(self.actions_multiple, self.actions_one), key=lambda x: x.created)
@property
def problems(self):
@ -221,6 +225,22 @@ class Device(Thing):
action = self.last_action_of(*states.Physical.actions())
return states.Physical(action.__class__)
@property
def traking(self):
"""The actual traking state, None otherwise."""
from ereuse_devicehub.resources.device import states
with suppress(LookupError, ValueError):
action = self.last_action_of(*states.Traking.actions())
return states.Traking(action.__class__)
@property
def usage(self):
"""The actual usage state, None otherwise."""
from ereuse_devicehub.resources.device import states
with suppress(LookupError, ValueError):
action = self.last_action_of(*states.Usage.actions())
return states.Usage(action.__class__)
@property
def physical_possessor(self):
"""The actual physical possessor or None.
@ -237,10 +257,12 @@ class Device(Thing):
and :class:`ereuse_devicehub.resources.action.models.Receive`
changes it.
"""
from ereuse_devicehub.resources.action.models import Receive
with suppress(LookupError):
action = self.last_action_of(Receive)
return action.agent
pass
# TODO @cayop uncomment this lines for link the possessor with the device
# from ereuse_devicehub.resources.action.models import Receive
# with suppress(LookupError):
# action = self.last_action_of(Receive)
# return action.agent_to
@property
def working(self):
@ -276,7 +298,9 @@ class Device(Thing):
"""
try:
# noinspection PyTypeHints
return next(e for e in reversed(self.actions) if isinstance(e, types))
actions = self.actions
actions.sort(key=lambda x: x.created)
return next(e for e in reversed(actions) if isinstance(e, types))
except StopIteration:
raise LookupError('{!r} does not contain actions of types {}.'.format(self, types))

View file

@ -52,6 +52,8 @@ class Device(Thing):
price = NestedOn('Price', dump_only=True, description=m.Device.price.__doc__)
trading = EnumField(states.Trading, dump_only=True, description=m.Device.trading.__doc__)
physical = EnumField(states.Physical, dump_only=True, description=m.Device.physical.__doc__)
traking= EnumField(states.Traking, dump_only=True, description=m.Device.physical.__doc__)
usage = EnumField(states.Usage, dump_only=True, description=m.Device.physical.__doc__)
physical_possessor = NestedOn('Agent', dump_only=True, data_key='physicalPossessor')
production_date = DateTime('iso',
description=m.Device.updated.comment,
@ -63,6 +65,7 @@ class Device(Thing):
variant = SanitizedStr(description=m.Device.variant.comment)
sku = SanitizedStr(description=m.Device.sku.comment)
image = URL(description=m.Device.image.comment)
allocated = Boolean(description=m.Device.allocated.comment)
@pre_load
def from_actions_to_actions_one(self, data: dict):

View file

@ -51,11 +51,30 @@ class Physical(State):
:cvar Preparing: The device is going to be or being prepared.
:cvar Prepared: The device has been prepared.
:cvar Ready: The device is in working conditions.
:cvar InUse: The device is being reported to be in active use.
"""
ToBeRepaired = e.ToRepair
Repaired = e.Repair
Preparing = e.ToPrepare
Prepared = e.Prepare
Ready = e.Ready
class Traking(State):
"""Traking states.
:cvar Receive: The device changes hands
"""
# Receive = e.Receive
pass
class Usage(State):
"""Usage states.
:cvar Allocate: The device is allocate in other Agent (organization, person ...)
:cvar Deallocate: The device is deallocate and return to the owner
:cvar InUse: The device is being reported to be in active use.
"""
Allocate = e.Allocate
Deallocate = e.Deallocate
InUse = e.Live

View file

@ -154,6 +154,8 @@ class Sync:
if device.hid:
with suppress(ResourceNotFound):
db_device = Device.query.filter_by(hid=device.hid).one()
if db_device and db_device.allocated:
raise ResourceNotFound('device is actually allocated {}'.format(device))
try:
tags = {Tag.from_an_id(tag.id).one() for tag in device.tags} # type: Set[Tag]
except ResourceNotFound:

View file

@ -0,0 +1,10 @@
from teal.resource import Resource
from ereuse_devicehub.resources.metric.schema import Metric
from ereuse_devicehub.resources.metric.views import MetricsView
class MetricDef(Resource):
__type__ = 'Metric'
VIEW = MetricsView
SCHEMA = Metric
AUTH = True

View file

@ -0,0 +1,11 @@
from teal.resource import Schema
from marshmallow.fields import DateTime
class Metric(Schema):
"""
This schema filter dates for search the metrics
"""
start_time = DateTime(data_key='start_time', required=True,
description="Start date for search metrics")
end_time = DateTime(data_key='end_time', required=True,
description="End date for search metrics")

View file

@ -0,0 +1,44 @@
from flask import request, g, jsonify
from contextlib import suppress
from teal.resource import View
from ereuse_devicehub.resources.action import schemas
from ereuse_devicehub.resources.action.models import Allocate, Live, Action, ToRepair, ToPrepare
from ereuse_devicehub.resources.device import models as m
from ereuse_devicehub.resources.metric.schema import Metric
class MetricsView(View):
def find(self, args: dict):
metrics = {
"allocateds": self.allocated(),
"live": self.live(),
}
return jsonify(metrics)
def allocated(self):
# TODO @cayop we need uncomment when the pr/83 is approved
# return m.Device.query.filter(m.Device.allocated==True, owner==g.user).count()
return m.Device.query.filter(m.Device.allocated==True).count()
def live(self):
# TODO @cayop we need uncomment when the pr/83 is approved
# devices = m.Device.query.filter(m.Device.allocated==True, owner==g.user)
devices = m.Device.query.filter(m.Device.allocated==True)
count = 0
for dev in devices:
live = allocate = None
with suppress(LookupError):
live = dev.last_action_of(Live)
with suppress(LookupError):
allocate = dev.last_action_of(Allocate)
if not live:
continue
if allocate and allocate.created > live.created:
continue
count += 1
return count

View file

@ -1,15 +1,18 @@
import ipaddress
from datetime import timedelta
import copy
import pytest
from datetime import datetime, timedelta
from decimal import Decimal
from typing import Tuple, Type
import pytest
from flask import current_app as app, g
from sqlalchemy.util import OrderedSet
from teal.enums import Currency, Subdivision
from ereuse_devicehub.client import UserClient
from ereuse_devicehub.db import db
from ereuse_devicehub.client import UserClient
from ereuse_devicehub.devicehub import Devicehub
from ereuse_devicehub.resources import enums
from ereuse_devicehub.resources.action import models
from ereuse_devicehub.resources.device import states
@ -243,29 +246,329 @@ def test_generic_action(action_model_state: Tuple[models.Action, states.Trading]
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.auth_app_context.__name__)
def test_live():
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_live(user: UserClient, app: Devicehub):
"""Tests inserting a Live into the database and GETting it."""
db_live = models.Live(ip=ipaddress.ip_address('79.147.10.10'),
subdivision_confidence=84,
subdivision=Subdivision['ES-CA'],
city='barcelona',
city_confidence=20,
isp='acme',
device=Desktop(serial_number='sn1', model='ml1', manufacturer='mr1',
chassis=ComputerChassis.Docking),
organization='acme1',
organization_type='acme1bis')
db.session.add(db_live)
db.session.commit()
client = UserClient(app, 'foo@foo.com', 'foo', response_wrapper=app.response_class)
client.login()
live, _ = client.get(res=models.Action, item=str(db_live.id))
assert live['ip'] == '79.147.10.10'
assert live['subdivision'] == 'ES-CA'
assert live['country'] == 'ES'
device, _ = client.get(res=Device, item=live['device']['id'])
assert device['physical'] == states.Physical.InUse.name
acer = file('acer.happy.battery.snapshot')
snapshot, _ = user.post(acer, res=models.Snapshot)
device_id = snapshot['device']['id']
db_device = Device.query.filter_by(id=1).one()
post_request = {"transaction": "ccc", "name": "John", "endUsers": 1,
"devices": [device_id], "description": "aaa",
"finalUserCode": "abcdefjhi",
"startTime": "2020-11-01T02:00:00+00:00",
"endTime": "2020-12-01T02:00:00+00:00"
}
user.post(res=models.Allocate, data=post_request)
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3"
hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0]
hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0]
hdd_action['lifetime'] += 1000
snapshot, _ = user.post(acer, res=models.Snapshot)
db_device = Device.query.filter_by(id=1).one()
action_live = [a for a in db_device.actions if a.type == 'Live']
assert len(action_live) == 1
assert action_live[0].usage_time_hdd == timedelta(hours=hdd_action['lifetime'])
assert action_live[0].usage_time_allocate == timedelta(hours=1000)
assert action_live[0].final_user_code == post_request['finalUserCode']
assert action_live[0].serial_number == 'wd-wx11a80w7430'
assert str(action_live[0].snapshot_uuid) == acer['uuid']
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_live_without_TestDataStorage(user: UserClient, app: Devicehub):
"""Tests inserting a Live into the database and GETting it.
If the live don't have a TestDataStorage, then save live and response None
"""
acer = file('acer.happy.battery.snapshot')
snapshot, _ = user.post(acer, res=models.Snapshot)
device_id = snapshot['device']['id']
db_device = Device.query.filter_by(id=1).one()
post_request = {"transaction": "ccc", "name": "John", "endUsers": 1,
"devices": [device_id], "description": "aaa",
"finalUserCode": "abcdefjhi",
"startTime": "2020-11-01T02:00:00+00:00",
"endTime": "2020-12-01T02:00:00+00:00"
}
user.post(res=models.Allocate, data=post_request)
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3"
actions = [a for a in acer['components'][7]['actions'] if a['type'] != 'TestDataStorage']
acer['components'][7]['actions'] = actions
live, _ = user.post(acer, res=models.Snapshot)
assert live['type'] == 'Live'
assert live['serialNumber'] == 'wd-wx11a80w7430'
assert live['severity'] == 'Warning'
description = "We don't found any TestDataStorage for disk sn: wd-wx11a80w7430"
assert live['description'] == description
db_live = models.Live.query.filter_by(id=live['id']).one()
assert db_live.usage_time_hdd is None
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_live_without_hdd_1(user: UserClient, app: Devicehub):
"""Tests inserting a Live into the database and GETting it.
The snapshot have hdd but the live no, and response 404
"""
acer = file('acer.happy.battery.snapshot')
snapshot, _ = user.post(acer, res=models.Snapshot)
device_id = snapshot['device']['id']
db_device = Device.query.filter_by(id=1).one()
post_request = {"transaction": "ccc", "name": "John", "endUsers": 1,
"devices": [device_id], "description": "aaa",
"finalUserCode": "abcdefjhi",
"startTime": "2020-11-01T02:00:00+00:00",
"endTime": "2020-12-01T02:00:00+00:00"
}
user.post(res=models.Allocate, data=post_request)
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3"
components = [a for a in acer['components'] if a['type'] != 'HardDrive']
acer['components'] = components
response, _ = user.post(acer, res=models.Snapshot, status=404)
assert "The There aren't any disk in this device" in response['message']
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_live_without_hdd_2(user: UserClient, app: Devicehub):
"""Tests inserting a Live into the database and GETting it.
The snapshot haven't hdd and the live neither, and response 404
"""
acer = file('acer.happy.battery.snapshot')
components = [a for a in acer['components'] if a['type'] != 'HardDrive']
acer['components'] = components
snapshot, _ = user.post(acer, res=models.Snapshot)
device_id = snapshot['device']['id']
db_device = Device.query.filter_by(id=1).one()
post_request = {"transaction": "ccc", "name": "John", "endUsers": 1,
"devices": [device_id], "description": "aaa",
"finalUserCode": "abcdefjhi",
"startTime": "2020-11-01T02:00:00+00:00",
"endTime": "2020-12-01T02:00:00+00:00"
}
user.post(res=models.Allocate, data=post_request)
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3"
response, _ = user.post(acer, res=models.Snapshot, status=404)
assert "The There aren't any disk in this device" in response['message']
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_live_without_hdd_3(user: UserClient, app: Devicehub):
"""Tests inserting a Live into the database and GETting it.
The snapshot haven't hdd and the live have, and save the live
with usage_time_allocate == 0
"""
acer = file('acer.happy.battery.snapshot')
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3"
components = [a for a in acer['components'] if a['type'] != 'HardDrive']
acer['components'] = components
snapshot, _ = user.post(acer, res=models.Snapshot)
device_id = snapshot['device']['id']
db_device = Device.query.filter_by(id=1).one()
post_request = {"transaction": "ccc", "name": "John", "endUsers": 1,
"devices": [device_id], "description": "aaa",
"finalUserCode": "abcdefjhi",
"startTime": "2020-11-01T02:00:00+00:00",
"endTime": "2020-12-01T02:00:00+00:00"
}
user.post(res=models.Allocate, data=post_request)
acer = file('acer.happy.battery.snapshot')
live, _ = user.post(acer, res=models.Snapshot)
assert live['type'] == 'Live'
assert live['serialNumber'] == 'wd-wx11a80w7430'
assert live['severity'] == 'Warning'
description = "Don't exist one previous live or snapshot as reference"
assert live['description'] == description
db_live = models.Live.query.filter_by(id=live['id']).one()
assert str(db_live.usage_time_hdd) == '195 days, 12:00:00'
assert str(db_live.usage_time_allocate) == '0:00:00'
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_live_with_hdd_with_old_time(user: UserClient, app: Devicehub):
"""Tests inserting a Live into the database and GETting it.
The snapshot hdd have a lifetime higher than lifetime of the live action
save the live with usage_time_allocate == 0
"""
acer = file('acer.happy.battery.snapshot')
snapshot, _ = user.post(acer, res=models.Snapshot)
device_id = snapshot['device']['id']
db_device = Device.query.filter_by(id=1).one()
post_request = {"transaction": "ccc", "name": "John", "endUsers": 1,
"devices": [device_id], "description": "aaa",
"finalUserCode": "abcdefjhi",
"startTime": "2020-11-01T02:00:00+00:00",
"endTime": "2020-12-01T02:00:00+00:00"
}
user.post(res=models.Allocate, data=post_request)
acer = file('acer.happy.battery.snapshot')
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3"
action = [a for a in acer['components'][7]['actions'] if a['type'] == 'TestDataStorage']
action[0]['lifetime'] -= 100
live, _ = user.post(acer, res=models.Snapshot)
assert live['type'] == 'Live'
assert live['serialNumber'] == 'wd-wx11a80w7430'
assert live['severity'] == 'Warning'
description = "The difference with the last live/snapshot is negative"
assert live['description'] == description
db_live = models.Live.query.filter_by(id=live['id']).one()
assert str(db_live.usage_time_hdd) == '191 days, 8:00:00'
assert str(db_live.usage_time_allocate) == '0:00:00'
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_live_search_last_allocate(user: UserClient, app: Devicehub):
"""Tests inserting a Live into the database and GETting it.
"""
acer = file('acer.happy.battery.snapshot')
snapshot, _ = user.post(acer, res=models.Snapshot)
device_id = snapshot['device']['id']
db_device = Device.query.filter_by(id=1).one()
post_request = {"transaction": "ccc", "name": "John", "endUsers": 1,
"devices": [device_id], "description": "aaa",
"finalUserCode": "abcdefjhi",
"startTime": "2020-11-01T02:00:00+00:00",
"endTime": "2020-12-01T02:00:00+00:00"
}
user.post(res=models.Allocate, data=post_request)
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3"
hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0]
hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0]
hdd_action['lifetime'] += 1000
live, _ = user.post(acer, res=models.Snapshot)
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec4"
actions = [a for a in acer['components'][7]['actions'] if a['type'] != 'TestDataStorage']
acer['components'][7]['actions'] = actions
live, _ = user.post(acer, res=models.Snapshot)
assert live['usageTimeAllocate'] == 1000
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_allocate(user: UserClient):
""" Tests allocate """
snapshot, _ = user.post(file('basic.snapshot'), res=models.Snapshot)
device_id = snapshot['device']['id']
post_request = {"transaction": "ccc",
"finalUserCode": "aabbcc",
"name": "John",
"severity": "Info",
"endUsers": 1,
"devices": [device_id],
"description": "aaa",
"startTime": "2020-11-01T02:00:00+00:00",
"endTime": "2020-12-01T02:00:00+00:00",
}
allocate, _ = user.post(res=models.Allocate, data=post_request)
# Normal allocate
device, _ = user.get(res=Device, item=device_id)
assert device['allocated'] == True
action = [a for a in device['actions'] if a['type'] == 'Allocate'][0]
assert action['transaction'] == allocate['transaction']
assert action['finalUserCode'] == allocate['finalUserCode']
assert action['created'] == allocate['created']
assert action['startTime'] == allocate['startTime']
assert action['endUsers'] == allocate['endUsers']
assert action['name'] == allocate['name']
post_bad_request1 = copy.copy(post_request)
post_bad_request1['endUsers'] = 2
post_bad_request2 = copy.copy(post_request)
post_bad_request2['startTime'] = "2020-11-01T02:00:00+00:01"
post_bad_request3 = copy.copy(post_request)
post_bad_request3['transaction'] = "aaa"
res1, _ = user.post(res=models.Allocate, data=post_bad_request1, status=422)
res2, _ = user.post(res=models.Allocate, data=post_bad_request2, status=422)
res3, _ = user.post(res=models.Allocate, data=post_bad_request3, status=422)
for r in (res1, res2, res3):
assert r['code'] == 422
assert r['type'] == 'ValidationError'
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_allocate_bad_dates(user: UserClient):
""" Tests allocate """
snapshot, _ = user.post(file('basic.snapshot'), res=models.Snapshot)
device_id = snapshot['device']['id']
delta = timedelta(days=30)
future = datetime.now() + delta
post_request = {"transaction": "ccc",
"finalUserCode": "aabbcc",
"name": "John",
"severity": "Info",
"end_users": 1,
"devices": [device_id],
"description": "aaa",
"start_time": future,
}
res, _ = user.post(res=models.Allocate, data=post_request, status=422)
assert res['code'] == 422
assert res['type'] == 'ValidationError'
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_deallocate(user: UserClient):
""" Tests deallocate """
snapshot, _ = user.post(file('basic.snapshot'), res=models.Snapshot)
device_id = snapshot['device']['id']
post_deallocate = {"startTime": "2020-11-01T02:00:00+00:00",
"transaction": "ccc",
"devices": [device_id]
}
res, _ = user.post(res=models.Deallocate, data=post_deallocate, status=422)
assert res['code'] == 422
assert res['type'] == 'ValidationError'
post_allocate = {"transaction": "ccc", "name": "John", "endUsers": 1,
"devices": [device_id], "description": "aaa",
"startTime": "2020-11-01T02:00:00+00:00",
"endTime": "2020-12-01T02:00:00+00:00"
}
user.post(res=models.Allocate, data=post_allocate)
device, _ = user.get(res=Device, item=device_id)
assert device['allocated'] == True
deallocate, _ = user.post(res=models.Deallocate, data=post_deallocate)
assert deallocate['startTime'] == post_deallocate['startTime']
assert deallocate['devices'][0]['id'] == device_id
assert deallocate['devices'][0]['allocated'] == False
res, _ = user.post(res=models.Deallocate, data=post_deallocate, status=422)
assert res['code'] == 422
assert res['type'] == 'ValidationError'
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_deallocate_bad_dates(user: UserClient):
""" Tests deallocate with bad date of start_time """
snapshot, _ = user.post(file('basic.snapshot'), res=models.Snapshot)
device_id = snapshot['device']['id']
delta = timedelta(days=30)
future = datetime.now() + delta
post_deallocate = {"startTime": future,
"devices": [device_id]
}
post_allocate = {"devices": [device_id], "description": "aaa",
"startTime": "2020-11-01T02:00:00+00:00"
}
user.post(res=models.Allocate, data=post_allocate)
res, _ = user.post(res=models.Deallocate, data=post_deallocate, status=422)
assert res['code'] == 422
assert res['type'] == 'ValidationError'
@pytest.mark.mvp

View file

@ -100,7 +100,10 @@ def test_api_docs(client: Client):
'/videoconferences/{dev1_id}/merge/{dev2_id}',
'/videos/{dev1_id}/merge/{dev2_id}',
'/wireless-access-points/{dev1_id}/merge/{dev2_id}',
'/versions/'
'/versions/',
'/allocates/',
'/deallocates/',
'/metrics/',
}
assert docs['info'] == {'title': 'Devicehub', 'version': '0.2'}
assert docs['components']['securitySchemes']['bearerAuth'] == {
@ -111,4 +114,4 @@ def test_api_docs(client: Client):
'scheme': 'basic',
'name': 'Authorization'
}
assert len(docs['definitions']) == 122
assert len(docs['definitions']) == 124

118
tests/test_metrics.py Normal file
View file

@ -0,0 +1,118 @@
import pytest
from ereuse_devicehub.client import UserClient
from ereuse_devicehub.resources.action import models as ma
from tests import conftest
from tests.conftest import file
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_simple_metrics(user: UserClient):
""" Checks one standard query of metrics """
# Insert computer
lenovo = file('desktop-9644w8n-lenovo-0169622.snapshot')
acer = file('acer.happy.battery.snapshot')
user.post(lenovo, res=ma.Snapshot)
snapshot, _ = user.post(acer, res=ma.Snapshot)
device_id = snapshot['device']['id']
post_request = {"transaction": "ccc", "name": "John", "endUsers": 1,
"finalUserCode": "abcdefjhi",
"devices": [device_id], "description": "aaa",
"startTime": "2020-11-01T02:00:00+00:00",
"endTime": "2020-12-01T02:00:00+00:00"
}
# Create Allocate
user.post(res=ma.Allocate, data=post_request)
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3"
hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0]
hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0]
hdd_action['powerCycleCount'] += 1000
user.post(acer, res=ma.Snapshot)
# Create a live
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec4"
hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0]
hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0]
hdd_action['powerCycleCount'] += 1000
user.post(acer, res=ma.Snapshot)
# Create an other live
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec5"
hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0]
hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0]
hdd_action['powerCycleCount'] += 1000
user.post(acer, res=ma.Snapshot)
# Check metrics
metrics = {'allocateds': 1, 'live': 1}
res, _ = user.get("/metrics/")
assert res == metrics
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_second_hdd_metrics(user: UserClient):
""" Checks one standard query of metrics """
# Insert computer
acer = file('acer.happy.battery.snapshot')
snapshot, _ = user.post(acer, res=ma.Snapshot)
device_id = snapshot['device']['id']
post_request = {"transaction": "ccc", "name": "John", "endUsers": 1,
"finalUserCode": "abcdefjhi",
"devices": [device_id], "description": "aaa",
"startTime": "2020-11-01T02:00:00+00:00",
"endTime": "2020-12-01T02:00:00+00:00"
}
# Create Allocate
user.post(res=ma.Allocate, data=post_request)
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec3"
hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0]
hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0]
hdd_action['powerCycleCount'] += 1000
user.post(acer, res=ma.Snapshot)
# Create a live
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec4"
hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0]
hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0]
hdd_action['powerCycleCount'] += 1000
user.post(acer, res=ma.Snapshot)
# Create a second device
acer['uuid'] = "490fb8c0-81a1-42e9-95e0-5e7db7038ec5"
hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0]
hdd['serialNumber'] = 'WD-WX11A80W7440'
user.post(acer, res=ma.Snapshot)
# Check metrics if we change the hdd we need a result of one device
metrics = {'allocateds': 1, 'live': 1}
res, _ = user.get("/metrics/")
assert res == metrics
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_metrics_with_live_null(user: UserClient):
""" Checks one standard query of metrics """
# Insert computer
acer = file('acer.happy.battery.snapshot')
snapshot, _ = user.post(acer, res=ma.Snapshot)
device_id = snapshot['device']['id']
post_request = {"transaction": "ccc", "name": "John", "endUsers": 1,
"finalUserCode": "abcdefjhi",
"devices": [device_id], "description": "aaa",
"startTime": "2020-11-01T02:00:00+00:00",
"endTime": "2020-12-01T02:00:00+00:00"
}
# Create Allocate
user.post(res=ma.Allocate, data=post_request)
# Check metrics if we change the hdd we need a result of one device
metrics = {'allocateds': 1, 'live': 0}
res, _ = user.get("/metrics/")
assert res == metrics