This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
devicehub-teal/ereuse_devicehub/resources/device/models.py
Cayo Puigdefabregas 8e54f34519 fix set_hid
2023-05-07 19:43:30 +02:00

1899 lines
59 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import copy
import hashlib
import json
import logging
import os
import pathlib
import uuid
from contextlib import suppress
from fractions import Fraction
from itertools import chain
from operator import attrgetter
from typing import Dict, List, Set
from boltons import urlutils
from citext import CIText
from flask import current_app as app
from flask import g, request
from more_itertools import unique_everseen
from sqlalchemy import BigInteger, Boolean, Column
from sqlalchemy import Enum as DBEnum
from sqlalchemy import (
Float,
ForeignKey,
Integer,
Sequence,
SmallInteger,
Unicode,
inspect,
text,
)
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import ColumnProperty, backref, relationship, validates
from sqlalchemy.util import OrderedSet
from sqlalchemy_utils import ColorType
from stdnum import imei, meid
from ereuse_devicehub.db import db
from ereuse_devicehub.ereuse_utils.naming import HID_CONVERSION_DOC
from ereuse_devicehub.resources.device.metrics import Metrics
from ereuse_devicehub.resources.enums import (
BatteryTechnology,
CameraFacing,
ComputerChassis,
DataStorageInterface,
DisplayTech,
PrinterTechnology,
RamFormat,
RamInterface,
Severity,
TransferState,
)
from ereuse_devicehub.resources.models import (
STR_SM_SIZE,
Thing,
listener_reset_field_updated_in_actual_time,
)
from ereuse_devicehub.resources.user.models import User
from ereuse_devicehub.resources.utils import hashcode
from ereuse_devicehub.teal.db import (
CASCADE_DEL,
POLYMORPHIC_ID,
POLYMORPHIC_ON,
URL,
IntEnum,
ResourceNotFound,
check_lower,
check_range,
)
from ereuse_devicehub.teal.enums import Layouts
from ereuse_devicehub.teal.marshmallow import ValidationError
from ereuse_devicehub.teal.resource import url_for_resource
logger = logging.getLogger(__name__)
def create_code(context):
_id = Device.query.order_by(Device.id.desc()).first() or 3
if not _id == 3:
_id = _id.id + 1
return hashcode.encode(_id)
def create_phid(context, count=1):
phid = str(Placeholder.query.filter(Placeholder.owner == g.user).count() + count)
if (
Placeholder.query.filter(Placeholder.owner == g.user)
.filter(Placeholder.phid == phid)
.count()
):
return create_phid(context, count=count + 1)
return phid
class Device(Thing):
"""Base class for any type of physical object that can be identified.
Device partly extends `Schema's IndividualProduct <https
://schema.org/IndividualProduct>`_, adapting it to our
use case.
A device requires an identification method, ideally a serial number,
although it can be identified only with tags too. More ideally
both methods are used.
Devices can contain ``Components``, which are just a type of device
(it is a recursive relationship).
"""
id = Column(BigInteger, Sequence('device_seq'), primary_key=True)
id.comment = """The identifier of the device for this database. Used only
internally for software; users should not use this.
"""
type = Column(Unicode(STR_SM_SIZE), nullable=False)
hid = Column(Unicode(), check_lower('hid'), unique=False)
hid.comment = (
"""The Hardware ID (HID) is the ID traceability
systems use to ID a device globally. This field is auto-generated
from Devicehub using literal identifiers from the device,
so it can re-generated *offline*.
"""
+ HID_CONVERSION_DOC
)
model = Column(Unicode(), check_lower('model'))
model.comment = """The model of the device in lower case.
The model is the unambiguous, as technical as possible, denomination
for the product. This field, among others, is used to identify
the product.
"""
manufacturer = Column(Unicode(), check_lower('manufacturer'))
manufacturer.comment = """The normalized name of the manufacturer,
in lower case.
Although as of now Devicehub does not enforce normalization,
users can choose a list of normalized manufacturer names
from the own ``/manufacturers`` REST endpoint.
"""
serial_number = Column(Unicode(), check_lower('serial_number'))
serial_number.comment = """The serial number of the device in lower case."""
part_number = Column(Unicode(), check_lower('part_number'))
part_number.comment = """The part number of the device in lower case."""
brand = db.Column(CIText())
brand.comment = """A naming for consumers. This field can represent
several models, so it can be ambiguous, and it is not used to
identify the product.
"""
generation = db.Column(db.SmallInteger, check_range('generation', 0))
generation.comment = """The generation of the device."""
version = db.Column(db.CIText())
version.comment = """The version code of this device, like v1 or A001."""
weight = Column(Float(decimal_return_scale=4), check_range('weight', 0.1, 5))
weight.comment = """The weight of the device in Kg."""
width = Column(Float(decimal_return_scale=4), check_range('width', 0.1, 5))
width.comment = """The width of the device in meters."""
height = Column(Float(decimal_return_scale=4), check_range('height', 0.1, 5))
height.comment = """The height of the device in meters."""
depth = Column(Float(decimal_return_scale=4), check_range('depth', 0.1, 5))
depth.comment = """The depth of the device in meters."""
color = Column(ColorType)
color.comment = """The predominant color of the device."""
production_date = Column(db.DateTime)
production_date.comment = """The date of production of the device.
This is timezone naive, as Workbench cannot report this data with timezone information.
"""
variant = Column(db.CIText())
variant.comment = """A variant or sub-model of the device."""
sku = db.Column(db.CIText())
sku.comment = """The Stock Keeping Unit (SKU), i.e. a
merchant-specific identifier for a product or service.
"""
image = db.Column(db.URL)
image.comment = "An image of the device."
owner_id = db.Column(
UUID(as_uuid=True),
db.ForeignKey(User.id),
nullable=False,
default=lambda: g.user.id,
)
owner = db.relationship(User, primaryjoin=owner_id == User.id)
allocated = db.Column(Boolean, default=False)
allocated.comment = "device is allocated or not."
devicehub_id = db.Column(
db.CIText(), nullable=True, unique=True, default=create_code
)
devicehub_id.comment = "device have a unique code."
dhid_bk = db.Column(db.CIText(), nullable=True, unique=False)
phid_bk = db.Column(db.CIText(), nullable=True, unique=False)
active = db.Column(Boolean, default=True)
family = db.Column(db.CIText())
chid = db.Column(db.CIText())
_NON_PHYSICAL_PROPS = {
'id',
'type',
'created',
'updated',
'parent_id',
'owner_id',
'hid',
'production_date',
'color', # these are only user-input thus volatile
'width',
'height',
'depth',
'weight',
'brand',
'generation',
'production_date',
'variant',
'version',
'family',
'sku',
'image',
'allocated',
'devicehub_id',
'system_uuid',
'active',
'phid_bk',
'dhid_bk',
'chid',
'user_trusts',
'chassis',
'transfer_state',
'receiver_id',
}
__table_args__ = (
db.Index('device_id', id, postgresql_using='hash'),
db.Index('type_index', type, postgresql_using='hash'),
)
def __init__(self, **kw) -> None:
super().__init__(**kw)
self.set_hid()
@property
def reverse_actions(self) -> list:
return reversed(self.actions)
@property
def manual_actions(self) -> list:
mactions = [
'ActionDevice',
'Allocate',
'DataWipe',
'Deallocate',
'Management',
'Prepare',
'Ready',
'Recycling',
'Refurbish',
'ToPrepare',
'ToRepair',
'Use',
]
return [a for a in self.actions if a in mactions]
@property
def actions(self) -> list:
"""All the actions where the device participated, including:
1. Actions performed directly to the device.
2. Actions performed to a component.
3. Actions performed to a parent device.
Actions are returned by descending ``created`` time.
"""
actions_multiple = copy.copy(self.actions_multiple)
actions_one = copy.copy(self.actions_one)
actions = []
for ac in actions_multiple:
ac.real_created = ac.actions_device[0].created
actions.append(ac)
for ac in actions_one:
ac.real_created = ac.created
actions.append(ac)
return sorted(actions, key=lambda x: x.real_created)
@property
def problems(self):
"""Current actions with severity.Warning or higher.
There can be up to 3 actions: current Snapshot,
current Physical action, current Trading action.
"""
from ereuse_devicehub.resources.action.models import Snapshot
from ereuse_devicehub.resources.device import states
actions = set()
with suppress(LookupError, ValueError):
actions.add(self.last_action_of(Snapshot))
with suppress(LookupError, ValueError):
actions.add(self.last_action_of(*states.Physical.actions()))
with suppress(LookupError, ValueError):
actions.add(self.last_action_of(*states.Trading.actions()))
return self._warning_actions(actions)
@property
def physical_properties(self) -> Dict[str, object or None]:
"""Fields that describe the physical properties of a device.
:return A dictionary:
- Column.
- Actual value of the column or None.
"""
# todo ensure to remove materialized values when start using them
# todo or self.__table__.columns if inspect fails
return {
c.key: getattr(self, c.key, None)
for c in inspect(self.__class__).attrs
if isinstance(c, ColumnProperty)
and not getattr(c, 'foreign_keys', None)
and c.key not in self._NON_PHYSICAL_PROPS
}
@property
def public_properties(self) -> Dict[str, object or None]:
"""Fields that describe the properties of a device than next show
in the public page.
:return A dictionary:
- Column.
- Actual value of the column or None.
"""
non_public = ['amount', 'transfer_state', 'receiver_id']
hide_properties = list(self._NON_PHYSICAL_PROPS) + non_public
return {
c.key: getattr(self, c.key, None)
for c in inspect(self.__class__).attrs
if isinstance(c, ColumnProperty)
and not getattr(c, 'foreign_keys', None)
and c.key not in hide_properties
}
@property
def public_actions(self) -> List[object]:
"""Actions than we want show in public page as traceability log section
:return a list of actions:
"""
hide_actions = ['Price', 'EreusePrice']
actions = [ac for ac in self.actions if ac.t not in hide_actions]
actions.reverse()
return actions
@property
def public_link(self) -> str:
host_url = request.host_url.strip('/')
return "{}{}".format(host_url, self.url.to_text())
@property
def url(self) -> urlutils.URL:
"""The URL where to GET this device."""
return urlutils.URL(url_for_resource(Device, item_id=self.dhid))
@property
def rate(self):
"""The last Rate of the device."""
with suppress(LookupError, ValueError):
from ereuse_devicehub.resources.action.models import Rate
return self.last_action_of(Rate)
@property
def price(self):
"""The actual Price of the device, or None if no price has
ever been set."""
with suppress(LookupError, ValueError):
from ereuse_devicehub.resources.action.models import Price
return self.last_action_of(Price)
@property
def last_action_trading(self):
"""which is the last action trading"""
from ereuse_devicehub.resources.device import states
with suppress(LookupError, ValueError):
return self.last_action_of(*states.Trading.actions())
@property
def allocated_status(self):
"""Show the actual status of device.
The status depend of one of this 3 actions:
- Allocate
- Deallocate
- InUse (Live register)
"""
from ereuse_devicehub.resources.device import states
with suppress(LookupError, ValueError):
return self.last_action_of(*states.Usage.actions())
@property
def physical_status(self):
"""Show the actual status of device for this owner.
The status depend of one of this 4 actions:
- ToPrepare
- Prepare
- DataWipe
- ToRepair
- Ready
"""
from ereuse_devicehub.resources.device import states
with suppress(LookupError, ValueError):
return self.last_action_of(*states.Physical.actions())
@property
def status(self):
"""Show the actual status of device for this owner.
The status depend of one of this 4 actions:
- Use
- Refurbish
- Recycling
- Management
"""
from ereuse_devicehub.resources.device import states
with suppress(LookupError, ValueError):
return self.last_action_of(*states.Status.actions())
@property
def history_status(self):
"""Show the history of the status actions of the device.
The status depend of one of this 4 actions:
- Use
- Refurbish
- Recycling
- Management
"""
from ereuse_devicehub.resources.device import states
status_actions = [ac.t for ac in states.Status.actions()]
history = []
for ac in self.actions:
if ac.t not in status_actions:
continue
if not history:
history.append(ac)
continue
if ac.rol_user == history[-1].rol_user:
# get only the last action consecutive for the same user
history = history[:-1] + [ac]
continue
history.append(ac)
return history
@property
def sid(self):
actions = []
if self.placeholder and self.placeholder.binding:
actions = [
x
for x in self.placeholder.binding.actions
if x.t == 'Snapshot' and x.sid
]
else:
actions = [x for x in self.actions if x.t == 'Snapshot' and x.sid]
if actions:
return actions[0].sid
@property
def tradings(self):
return {str(x.id): self.trading(x.lot) for x in self.actions if x.t == 'Trade'}
def trading(self, lot, simple=None): # noqa: C901
"""The trading state, or None if no Trade action has
ever been performed to this device. This extract the posibilities for to do.
This method is performed for show in the web.
If you need to do one simple and generic response you can put simple=True for that."""
if not hasattr(lot, 'trade'):
return
Status = {
0: 'Trade',
1: 'Confirm',
2: 'NeedConfirmation',
3: 'TradeConfirmed',
4: 'Revoke',
5: 'NeedConfirmRevoke',
6: 'RevokeConfirmed',
}
trade = lot.trade
user_from = trade.user_from
user_to = trade.user_to
status = 0
last_user = None
if not hasattr(trade, 'acceptances'):
return Status[status]
for ac in self.actions:
if ac.t not in ['Confirm', 'Revoke']:
continue
if ac.user not in [user_from, user_to]:
continue
if ac.t == 'Confirm' and ac.action == trade:
if status in [0, 6]:
if simple:
status = 2
continue
status = 1
last_user = ac.user
if ac.user == user_from and user_to == g.user:
status = 2
if ac.user == user_to and user_from == g.user:
status = 2
continue
if status in [1, 2]:
if last_user != ac.user:
status = 3
last_user = ac.user
continue
if status in [4, 5]:
status = 3
last_user = ac.user
continue
if ac.t == 'Revoke' and ac.action == trade:
if status == 3:
if simple:
status = 5
continue
status = 4
last_user = ac.user
if ac.user == user_from and user_to == g.user:
status = 5
if ac.user == user_to and user_from == g.user:
status = 5
continue
if status in [4, 5]:
if last_user != ac.user:
status = 6
last_user = ac.user
continue
if status in [1, 2]:
status = 6
last_user = ac.user
continue
return Status[status]
@property
def revoke(self):
"""If the actual trading state is an revoke action, this property show
the id of that revoke"""
from ereuse_devicehub.resources.device import states
with suppress(LookupError, ValueError):
action = self.last_action_of(*states.Trading.actions())
if action.type == 'Revoke':
return action.id
@property
def physical(self):
"""The actual physical state, None otherwise."""
from ereuse_devicehub.resources.device import states
with suppress(LookupError, ValueError):
action = self.last_action_of(*states.Physical.actions())
return states.Physical(action.__class__)
@property
def traking(self):
"""The actual traking state, None otherwise."""
from ereuse_devicehub.resources.device import states
with suppress(LookupError, ValueError):
action = self.last_action_of(*states.Traking.actions())
return states.Traking(action.__class__)
@property
def usage(self):
"""The actual usage state, None otherwise."""
from ereuse_devicehub.resources.device import states
with suppress(LookupError, ValueError):
action = self.last_action_of(*states.Usage.actions())
return states.Usage(action.__class__)
@property
def physical_possessor(self):
"""The actual physical possessor or None.
The physical possessor is the Agent that has physically
the device. It differs from legal owners, usufructuarees
or reserves in that the physical possessor does not have
a legal relation per se with the device, but it is the one
that has it physically. As an example, a transporter could
be a physical possessor of a device although it does not
own it legally.
Note that there can only be one physical possessor per device,
and :class:`ereuse_devicehub.resources.action.models.Receive`
changes it.
"""
pass
# TODO @cayop uncomment this lines for link the possessor with the device
# from ereuse_devicehub.resources.action.models import Receive
# with suppress(LookupError):
# action = self.last_action_of(Receive)
# return action.agent_to
@property
def working(self):
"""A list of the current tests with warning or errors. A
device is working if the list is empty.
This property returns, for the last test performed of each type,
the one with the worst ``severity`` of them, or ``None`` if no
test has been executed.
"""
from ereuse_devicehub.resources.action.models import Test
current_tests = unique_everseen(
(e for e in reversed(self.actions) if isinstance(e, Test)),
key=attrgetter('type'),
) # last test of each type
return self._warning_actions(current_tests)
@property
def verbose_name(self):
type = self.type or ''
manufacturer = self.manufacturer or ''
model = self.model or ''
return f'{type} {manufacturer} {model}'
@property
def dhid(self):
if self.placeholder:
return self.placeholder.device.devicehub_id
if self.binding:
return self.binding.device.devicehub_id
return self.devicehub_id
@property
def my_partner(self):
if self.placeholder and self.placeholder.binding:
return self.placeholder.binding
if self.binding:
return self.binding.device
return self
@property
def get_updated(self):
if self.placeholder and self.placeholder.binding:
return max([self.updated, self.placeholder.binding.updated])
if self.binding:
return max([self.updated, self.binding.device.updated])
return self.updated
@declared_attr
def __mapper_args__(cls):
"""Defines inheritance.
From `the guide <http://docs.sqlalchemy.org/en/latest/orm/
extensions/declarative/api.html
#sqlalchemy.ext.declarative.declared_attr>`_
"""
args = {POLYMORPHIC_ID: cls.t}
if cls.t == 'Device':
args[POLYMORPHIC_ON] = cls.type
return args
def get_lots_for_template(self):
if self.binding:
return self.binding.device.get_lots_for_template()
if not self.lots and hasattr(self, 'parent') and self.parent:
return self.parent.get_lots_for_template()
lots = []
for lot in self.lots:
if lot.is_incoming:
name = "IN - " + lot.name
lots.append(name)
if lot.is_outgoing:
name = "OUT - " + lot.name
lots.append(name)
if lot.is_temporary:
name = "TEMP - " + lot.name
lots.append(name)
lots.sort()
return lots
def phid(self):
if self.placeholder:
return self.placeholder.phid
if self.binding:
return self.binding.phid
return ''
def list_tags(self):
return ', '.join([t.id for t in self.tags])
def appearance(self):
actions = copy.copy(self.actions)
actions.sort(key=lambda x: x.created)
with suppress(LookupError, ValueError, StopIteration):
action = next(e for e in reversed(actions) if e.type == 'VisualTest')
return action.appearance_range
def functionality(self):
actions = copy.copy(self.actions)
actions.sort(key=lambda x: x.created)
with suppress(LookupError, ValueError, StopIteration):
action = next(e for e in reversed(actions) if e.type == 'VisualTest')
return action.functionality_range
def set_appearance(self, value):
actions = copy.copy(self.actions)
actions.sort(key=lambda x: x.created)
with suppress(LookupError, ValueError, StopIteration):
action = next(e for e in reversed(actions) if e.type == 'VisualTest')
action.appearance_range = value
def set_functionality(self, value):
actions = copy.copy(self.actions)
actions.sort(key=lambda x: x.created)
with suppress(LookupError, ValueError, StopIteration):
action = next(e for e in reversed(actions) if e.type == 'VisualTest')
action.functionality_range = value
def is_abstract(self):
if self.placeholder:
if self.placeholder.is_abstract:
return 'Snapshot'
if self.placeholder.binding:
return 'Twin'
return 'Placeholder'
if self.binding:
if self.binding.is_abstract:
return 'Snapshot'
return 'Twin'
return ''
def get_lots_from_type(self, lot_type):
lots_type = {
'temporary': lambda x: x.is_temporary,
'incoming': lambda x: x.is_incoming,
'outgoing': lambda x: x.is_outgoing,
}
if lot_type not in lots_type:
return ''
get_lots_type = lots_type[lot_type]
lots = self.lots
if not lots and self.binding:
lots = self.binding.device.lots
if lots:
lots = [lot.name for lot in lots if get_lots_type(lot)]
return ", ".join(sorted(lots))
return ''
def is_status(self, action):
from ereuse_devicehub.resources.device import states
if action.type in states.Usage.__members__:
return "Allocate State: "
if action.type in states.Status.__members__:
return "Lifecycle State: "
if action.type in states.Physical.__members__:
return "Physical State: "
return ""
def get_exist_untrusted_device(self):
if isinstance(self, Computer):
if not self.system_uuid:
return True
return (
Computer.query.filter_by(
hid=self.hid,
user_trusts=False,
owner_id=g.user.id,
active=True,
placeholder=None,
).first()
or False
)
return False
def get_from_db(self):
if 'property_hid' in app.blueprints.keys():
try:
from ereuse_devicehub.modules.device.utils import get_from_db
return get_from_db(self)
except Exception:
pass
if not self.hid:
return
return Device.query.filter_by(
hid=self.hid,
owner_id=g.user.id,
active=True,
placeholder=None,
).first()
def set_hid(self):
if 'property_hid' in app.blueprints.keys():
try:
from ereuse_devicehub.modules.device.utils import set_hid
self.hid = set_hid(self)
self.set_chid()
return
except Exception as err:
logger.error(err)
self.hid = "{}-{}-{}-{}".format(
self._clean_string(self.type),
self._clean_string(self.manufacturer),
self._clean_string(self.model),
self._clean_string(self.serial_number),
).lower()
self.set_chid()
def _clean_string(self, s):
if not s:
return ''
return s.replace(' ', '_')
def set_chid(self):
if self.hid:
self.chid = hashlib.sha3_256(self.hid.encode()).hexdigest()
def last_action_of(self, *types):
"""Gets the last action of the given types.
:raise LookupError: Device has not an action of the given type.
"""
try:
# noinspection PyTypeHints
actions = copy.copy(self.actions)
actions.sort(key=lambda x: x.created)
return next(e for e in reversed(actions) if isinstance(e, types))
except StopIteration:
raise LookupError(
'{!r} does not contain actions of types {}.'.format(self, types)
)
def which_user_put_this_device_in_trace(self):
"""which is the user than put this device in this trade"""
actions = copy.copy(self.actions)
actions.reverse()
# search the automatic Confirm
for ac in actions:
if ac.type == 'Trade':
action_device = [x for x in ac.actions_device if x.device == self][0]
if action_device.author:
return action_device.author
return ac.author
def change_owner(self, new_user):
"""util for change the owner one device"""
if not new_user:
return
self.owner = new_user
if hasattr(self, 'components'):
for c in self.components:
c.owner = new_user
def reset_owner(self):
"""Change the owner with the user put the device into the trade"""
user = self.which_user_put_this_device_in_trace()
self.change_owner(user)
def _warning_actions(self, actions):
return sorted(ev for ev in actions if ev.severity >= Severity.Warning)
def get_metrics(self):
"""
This method get a list of values for calculate a metrics from a spreadsheet
"""
metrics = Metrics(device=self)
return metrics.get_metrics()
def get_type_logo(self):
# This is used for see one logo of type of device in the frontend
types = {
"Desktop": "bi bi-file-post-fill",
"Laptop": "bi bi-laptop",
"Server": "bi bi-server",
"Processor": "bi bi-cpu",
"RamModule": "bi bi-list",
"Motherboard": "bi bi-cpu-fill",
"NetworkAdapter": "bi bi-hdd-network",
"GraphicCard": "bi bi-brush",
"SoundCard": "bi bi-volume-up-fill",
"Monitor": "bi bi-display",
"Display": "bi bi-display",
"ComputerMonitor": "bi bi-display",
"TelevisionSet": "bi bi-easel",
"TV": "bi bi-easel",
"Projector": "bi bi-camera-video",
"Tablet": "bi bi-tablet-landscape",
"Smartphone": "bi bi-phone",
"Cellphone": "bi bi-telephone",
"HardDrive": "bi bi-hdd-stack",
"SolidStateDrive": "bi bi-hdd",
}
return types.get(self.type, '')
def unreliable(self):
self.user_trusts = False
i = 0
snapshot1 = None
snapshots = {}
for ac in self.actions:
if ac.type == 'Snapshot':
if i == 0:
snapshot1 = ac
if i > 0:
snapshots[ac] = self.get_snapshot_file(ac)
i += 1
if not snapshot1:
return
self.create_new_device(snapshots.values(), user_trusts=self.user_trusts)
self.remove_snapshot(snapshots.keys())
return
def get_snapshot_file(self, action):
uuid = action.uuid
user = g.user.email
name_file = f"*_{user}_{uuid}.json"
tmp_snapshots = app.config['TMP_SNAPSHOTS']
path_dir_base = os.path.join(tmp_snapshots, user)
for _file in pathlib.Path(path_dir_base).glob(name_file):
with open(_file) as file_snapshot:
snapshot = file_snapshot.read()
return json.loads(snapshot)
def create_new_device(self, snapshots, user_trusts=True):
from ereuse_devicehub.inventory.forms import UploadSnapshotForm
new_snapshots = []
for snapshot in snapshots:
snapshot['uuid'] = str(uuid.uuid4())
filename = "{}.json".format(snapshot['uuid'])
new_snapshots.append((filename, snapshot))
form = UploadSnapshotForm()
form.result = {}
form.snapshots = new_snapshots
form.create_new_devices = True
form.save(commit=False, user_trusts=user_trusts)
def remove_snapshot(self, snapshots):
from ereuse_devicehub.parser.models import SnapshotsLog
for ac in snapshots:
for slog in SnapshotsLog.query.filter_by(snapshot=ac):
slog.snapshot_id = None
slog.snapshot_uuid = None
db.session.delete(ac)
def remove_devices(self, devices):
from ereuse_devicehub.parser.models import SnapshotsLog
for dev in devices:
for ac in dev.actions:
if ac.type != 'Snapshot':
continue
for slog in SnapshotsLog.query.filter_by(snapshot=ac):
slog.snapshot_id = None
slog.snapshot_uuid = None
for c in dev.components:
c.parent_id = None
for tag in dev.tags:
tag.device_id = None
placeholder = dev.binding or dev.placeholder
if placeholder:
db.session.delete(placeholder.binding)
db.session.delete(placeholder.device)
db.session.delete(placeholder)
def reliable(self):
computers = Computer.query.filter_by(
hid=self.hid,
owner_id=g.user.id,
active=True,
placeholder=None,
).order_by(Device.created.asc())
i = 0
computer1 = None
computers_to_remove = []
for d in computers:
if i == 0:
d.user_trusts = True
computer1 = d
i += 1
continue
computers_to_remove.append(d)
self.remove_devices(computers_to_remove)
if not computer1:
return
snapshot1 = None
for ac in computer1.actions_one:
if ac.type == 'Snapshot':
snapshot1 = ac
break
if not snapshot1:
return
return
def get_last_incoming_lot(self):
lots = list(self.lots)
if hasattr(self, "orphan") and self.orphan:
lots = list(self.lots)
if self.binding:
lots = list(self.binding.device.lots)
elif hasattr(self, "parent") and self.parent:
lots = list(self.parent.lots)
if self.parent.binding:
lots = list(self.parent.binding.device.lots)
lots = sorted(lots, key=lambda x: x.created)
lots.reverse()
for lot in lots:
if lot.is_incoming:
return lot
return None
def __lt__(self, other):
return self.id < other.id
def __str__(self) -> str:
return '{0.t} {0.id}: model {0.model}, S/N {0.serial_number}'.format(self)
def __format__(self, format_spec):
if not format_spec:
return super().__format__(format_spec)
v = ''
if 't' in format_spec:
v += '{0.t} {0.model}'.format(self)
if 's' in format_spec:
superclass = self.__class__.mro()[1]
if not isinstance(self, Device) and superclass != Device:
assert issubclass(superclass, Thing)
v += superclass.__name__ + ' '
v += '{0.manufacturer}'.format(self)
if self.serial_number:
v += ' ' + self.serial_number.upper()
return v
class DisplayMixin:
"""Base class for the Display Component and the Monitor Device."""
size = Column(
Float(decimal_return_scale=1), check_range('size', 2, 150), nullable=True
)
size.comment = """The size of the monitor in inches."""
technology = Column(DBEnum(DisplayTech))
technology.comment = """The technology the monitor uses to display
the image.
"""
resolution_width = Column(
SmallInteger, check_range('resolution_width', 10, 20000), nullable=True
)
resolution_width.comment = """The maximum horizontal resolution the
monitor can natively support in pixels.
"""
resolution_height = Column(
SmallInteger, check_range('resolution_height', 10, 20000), nullable=True
)
resolution_height.comment = """The maximum vertical resolution the
monitor can natively support in pixels.
"""
refresh_rate = Column(SmallInteger, check_range('refresh_rate', 10, 1000))
contrast_ratio = Column(SmallInteger, check_range('contrast_ratio', 100, 100000))
touchable = Column(Boolean)
touchable.comment = """Whether it is a touchscreen."""
@hybrid_property
def aspect_ratio(self):
"""The aspect ratio of the display, as a fraction: ``X/Y``.
Regular values are ``4/3``, ``5/4``, ``16/9``, ``21/9``,
``14/10``, ``19/10``, ``16/10``.
"""
if self.resolution_height and self.resolution_width:
return Fraction(self.resolution_width, self.resolution_height)
return 0
# noinspection PyUnresolvedReferences
@aspect_ratio.expression
def aspect_ratio(cls):
# The aspect ratio to use as SQL in the DB
# This allows comparing resolutions
return db.func.round(cls.resolution_width / cls.resolution_height, 2)
@hybrid_property
def widescreen(self):
"""Whether the monitor is considered to be widescreen.
Widescreen monitors are those having a higher aspect ratio
greater than 4/3.
"""
# We add a tiny extra to 4/3 to avoid precision errors
return self.aspect_ratio > 4.001 / 3
def __str__(self) -> str:
if self.size:
return '{0.t} {0.serial_number} {0.size}in ({0.aspect_ratio}) {0.technology}'.format(
self
)
return '{0.t} {0.serial_number} 0in ({0.aspect_ratio}) {0.technology}'.format(
self
)
def __format__(self, format_spec: str) -> str:
v = ''
if 't' in format_spec:
v += '{0.t} {0.model}'.format(self)
if 's' in format_spec:
v += '({0.manufacturer}) S/N {0.serial_number}'.format(self)
if self.size:
v += ' {0.size}in ({0.aspect_ratio}) {0.technology}'.format(self)
else:
v += ' 0in ({0.aspect_ratio}) {0.technology}'.format(self)
return v
class Placeholder(Thing):
id = Column(BigInteger, Sequence('placeholder_seq'), primary_key=True)
phid = Column(Unicode(), nullable=False, default=create_phid)
pallet = Column(Unicode(), nullable=True)
pallet.comment = "used for identification where from where is this placeholders"
info = db.Column(CIText())
components = Column(CIText())
info.comment = "more info of placeholders"
is_abstract = db.Column(Boolean, default=False)
id_device_supplier = db.Column(CIText())
id_device_supplier.comment = (
"Identification used for one supplier of one placeholders"
)
id_device_internal = db.Column(CIText())
id_device_internal.comment = "Identification used internaly for the user"
kangaroo = db.Column(Boolean, default=False, nullable=True)
device_id = db.Column(
BigInteger,
db.ForeignKey(Device.id),
nullable=False,
)
device = db.relationship(
Device,
backref=backref(
'placeholder', lazy=True, cascade="all, delete-orphan", uselist=False
),
primaryjoin=device_id == Device.id,
)
device_id.comment = "datas of the placeholder"
binding_id = db.Column(
BigInteger,
db.ForeignKey(Device.id),
nullable=True,
)
binding = db.relationship(
Device,
backref=backref('binding', lazy=True, uselist=False),
primaryjoin=binding_id == Device.id,
)
binding_id.comment = "binding placeholder with workbench device"
owner_id = db.Column(
UUID(as_uuid=True),
db.ForeignKey(User.id),
nullable=False,
default=lambda: g.user.id,
)
owner = db.relationship(User, primaryjoin=owner_id == User.id)
@property
def actions(self):
actions = list(self.device.actions) or []
if self.binding:
actions.extend(list(self.binding.actions))
actions = sorted(actions, key=lambda x: x.created)
actions.reverse()
return actions
@property
def status(self):
if self.is_abstract:
return 'Snapshot'
if self.binding:
return 'Twin'
return 'Placeholder'
@property
def documents(self):
docs = self.device.documents
if self.binding:
return docs.union(self.binding.documents)
return docs
class Computer(Device):
"""A chassis with components inside that can be processed
automatically with Workbench Computer.
Computer is broadly extended by ``Desktop``, ``Laptop``, and
``Server``. The property ``chassis`` defines it more granularly.
"""
id = Column(BigInteger, ForeignKey(Device.id), primary_key=True)
chassis = Column(DBEnum(ComputerChassis), nullable=True)
chassis.comment = """The physical form of the computer.
It is a subset of the Linux definition of DMI / DMI decode.
"""
amount = Column(Integer, check_range('amount', min=0, max=100), default=0)
owner_id = db.Column(
UUID(as_uuid=True),
db.ForeignKey(User.id),
nullable=False,
default=lambda: g.user.id,
)
# author = db.relationship(User, primaryjoin=owner_id == User.id)
transfer_state = db.Column(
IntEnum(TransferState), default=TransferState.Initial, nullable=False
)
transfer_state.comment = TransferState.__doc__
receiver_id = db.Column(UUID(as_uuid=True), db.ForeignKey(User.id), nullable=True)
receiver = db.relationship(User, primaryjoin=receiver_id == User.id)
system_uuid = db.Column(UUID(as_uuid=True), nullable=True)
user_trusts = db.Column(Boolean(), default=True)
def __init__(self, *args, **kwargs) -> None:
if args:
chassis = ComputerChassis(args[0])
super().__init__(chassis=chassis, **kwargs)
else:
super().__init__(*args, **kwargs)
@property
def actions(self) -> list:
actions = copy.copy(super().actions)
actions_parent = copy.copy(self.actions_parent)
for ac in actions_parent:
ac.real_created = ac.created
return sorted(chain(actions, actions_parent), key=lambda x: x.real_created)
# return sorted(chain(super().actions, self.actions_parent))
@property
def ram_size(self) -> int:
"""The total of RAM memory the computer has."""
components = self.components
if self.placeholder and self.placeholder.binding:
components = self.placeholder.binding.components
return sum(ram.size or 0 for ram in components if isinstance(ram, RamModule))
@property
def data_storage_size(self) -> int:
"""The total of data storage the computer has."""
components = self.components
if self.placeholder and self.placeholder.binding:
components = self.placeholder.binding.components
return sum(ds.size or 0 for ds in components if isinstance(ds, DataStorage))
@property
def processor_model(self) -> str:
"""The model of one of the processors of the computer."""
return next(
(p.model for p in self.components if isinstance(p, Processor)), None
)
@property
def graphic_card_model(self) -> str:
"""The model of one of the graphic cards of the computer."""
return next(
(p.model for p in self.components if isinstance(p, GraphicCard)), None
)
@property
def network_speeds(self) -> List[int]:
"""Returns two values representing the speeds of the network
adapters of the device.
1. The max Ethernet speed of the computer, 0 if ethernet
adaptor exists but its speed is unknown, None if no eth
adaptor exists.
2. The max WiFi speed of the computer, 0 if computer has
WiFi but its speed is unknown, None if no WiFi adaptor
exists.
"""
speeds = [None, None]
for net in (c for c in self.components if isinstance(c, NetworkAdapter)):
speeds[net.wireless] = max(net.speed or 0, speeds[net.wireless] or 0)
return speeds
@property
def privacy(self):
"""Returns the privacy of all ``DataStorage`` components when
it is not None.
"""
components = self.components
if self.placeholder and self.placeholder.binding:
components = self.placeholder.binding.components
return set(
privacy
for privacy in (
hdd.privacy for hdd in components if isinstance(hdd, DataStorage)
)
if privacy
)
@property
def external_document_erasure(self):
"""Returns the external ``DataStorage`` proof of erasure."""
from ereuse_devicehub.resources.action.models import DataWipe
urls = set()
try:
ev = self.last_action_of(DataWipe)
urls.add(ev.document.url.to_text())
except LookupError:
pass
for comp in self.components:
if isinstance(comp, DataStorage):
doc = comp.external_document_erasure
if doc:
urls.add(doc)
return urls
def add_mac_to_hid(self, components_snap=None):
"""Returns the Naming.hid with the first mac of network adapter,
following an alphabetical order.
"""
self.set_hid()
if not self.hid:
return
components = self.components if components_snap is None else components_snap
macs_network = [
c.serial_number
for c in components
if c.type == 'NetworkAdapter' and c.serial_number is not None
]
macs_network.sort()
mac = macs_network[0] if macs_network else ''
if not mac or mac in self.hid:
return
mac = f"-{mac}"
self.hid += mac
def __format__(self, format_spec):
if not format_spec:
return super().__format__(format_spec)
v = ''
if 't' in format_spec:
v += '{0.chassis} {0.model}'.format(self)
elif 's' in format_spec:
v += '({0.manufacturer})'.format(self)
if self.serial_number:
v += ' S/N ' + self.serial_number.upper()
return v
class Desktop(Computer):
pass
class Laptop(Computer):
layout = Column(DBEnum(Layouts))
layout.comment = """Layout of a built-in keyboard of the computer,
if any.
"""
class Server(Computer):
pass
class Monitor(DisplayMixin, Device):
id = Column(BigInteger, ForeignKey(Device.id), primary_key=True)
class ComputerMonitor(Monitor):
pass
class TelevisionSet(Monitor):
pass
class Projector(Monitor):
pass
class Mobile(Device):
"""A mobile device consisting of smartphones, tablets, and cellphones."""
id = Column(BigInteger, ForeignKey(Device.id), primary_key=True)
imei = Column(BigInteger)
imei.comment = """The International Mobile Equipment Identity of
the smartphone as an integer.
"""
meid = Column(Unicode)
meid.comment = """The Mobile Equipment Identifier as a hexadecimal
string.
"""
ram_size = db.Column(db.Integer, check_range('ram_size', min=128, max=36000))
ram_size.comment = """The total of RAM of the device in MB."""
data_storage_size = db.Column(
db.Integer, check_range('data_storage_size', 0, 10**8)
)
data_storage_size.comment = """The total of data storage of the device in MB"""
display_size = db.Column(
db.Float(decimal_return_scale=1), check_range('display_size', min=0.1, max=30.0)
)
display_size.comment = """The total size of the device screen"""
@validates('imei')
def validate_imei(self, _, value: int):
if value and not imei.is_valid(str(value)):
raise ValidationError('{} is not a valid imei.'.format(value))
return value
@validates('meid')
def validate_meid(self, _, value: str):
if value and not meid.is_valid(value):
raise ValidationError('{} is not a valid meid.'.format(value))
return value
class Smartphone(Mobile):
pass
class Tablet(Mobile):
pass
class Cellphone(Mobile):
pass
class Component(Device):
"""A device that can be inside another device."""
id = Column(BigInteger, ForeignKey(Device.id), primary_key=True)
parent_id = Column(BigInteger, ForeignKey(Computer.id))
parent = relationship(
Computer,
backref=backref(
'components',
lazy=True,
cascade=CASCADE_DEL,
order_by=lambda: Component.id,
collection_class=OrderedSet,
),
primaryjoin=parent_id == Computer.id,
)
__table_args__ = (db.Index('parent_index', parent_id, postgresql_using='hash'),)
def similar_one(self, parent: Computer, blacklist: Set[int]) -> 'Component':
"""Gets a component that:
* has the same parent.
* Doesn't generate HID.
* Has same physical properties.
:param parent:
:param blacklist: A set of components to not to consider
when looking for similar ones.
"""
assert self.hid is None, 'Don\'t use this method with a component that has HID'
component = (
self.__class__.query.filter_by(
parent=parent,
hid=None,
owner_id=self.owner_id,
**self.physical_properties,
)
.filter(~Component.id.in_(blacklist))
.first()
)
if not component:
raise ResourceNotFound(self.type)
return component
@property
def actions(self) -> list:
return sorted(chain(super().actions, self.actions_components))
class JoinedComponentTableMixin:
@declared_attr
def id(cls):
return Column(BigInteger, ForeignKey(Component.id), primary_key=True)
class GraphicCard(JoinedComponentTableMixin, Component):
memory = Column(SmallInteger, check_range('memory', min=1, max=10000))
memory.comment = """The amount of memory of the Graphic Card in MB."""
class DataStorage(JoinedComponentTableMixin, Component):
"""A device that stores information."""
size = Column(Integer, check_range('size', min=1, max=10**8))
size.comment = """The size of the data-storage in MB."""
interface = Column(DBEnum(DataStorageInterface))
@property
def privacy(self):
"""Returns the privacy compliance state of the data storage.
This is, the last erasure performed to the data storage.
"""
from ereuse_devicehub.resources.action.models import EraseBasic
try:
ev = self.last_action_of(EraseBasic)
except LookupError:
ev = None
return ev
def __format__(self, format_spec):
v = super().__format__(format_spec)
if 's' in format_spec:
v += ' {} GB'.format(self.size // 1000 if self.size else '?')
return v
@property
def external_document_erasure(self):
"""Returns the external ``DataStorage`` proof of erasure."""
from ereuse_devicehub.resources.action.models import DataWipe
try:
ev = self.last_action_of(DataWipe)
return ev.document.url.to_text()
except LookupError:
return None
@property
def orphan(self):
if not self.parent:
return True
if self.parent.placeholder and self.parent.placeholder.kangaroo:
return True
if self.parent.binding and self.parent.binding.kangaroo:
return True
return False
class HardDrive(DataStorage):
pass
class SolidStateDrive(DataStorage):
pass
class Motherboard(JoinedComponentTableMixin, Component):
slots = Column(SmallInteger, check_range('slots', min=0))
slots.comment = """PCI slots the motherboard has."""
usb = Column(SmallInteger, check_range('usb', min=0))
firewire = Column(SmallInteger, check_range('firewire', min=0))
serial = Column(SmallInteger, check_range('serial', min=0))
pcmcia = Column(SmallInteger, check_range('pcmcia', min=0))
bios_date = Column(db.Date)
bios_date.comment = """The date of the BIOS version."""
ram_slots = Column(db.SmallInteger, check_range('ram_slots'))
ram_max_size = Column(db.Integer, check_range('ram_max_size'))
class NetworkMixin:
speed = Column(SmallInteger, check_range('speed', min=10, max=10000))
speed.comment = """The maximum speed this network adapter can handle,
in mbps.
"""
wireless = Column(Boolean, nullable=False, default=False)
wireless.comment = """Whether it is a wireless interface."""
def __format__(self, format_spec):
v = super().__format__(format_spec)
if 's' in format_spec:
v += ' {} Mbps'.format(self.speed)
return v
class NetworkAdapter(JoinedComponentTableMixin, NetworkMixin, Component):
pass
class Processor(JoinedComponentTableMixin, Component):
"""The CPU."""
speed = Column(Float, check_range('speed', 0.1, 15))
speed.comment = """The regular CPU speed."""
cores = Column(SmallInteger, check_range('cores', 1, 10))
cores.comment = """The number of regular cores."""
threads = Column(SmallInteger, check_range('threads', 1, 20))
threads.comment = """The number of threads per core."""
address = Column(SmallInteger, check_range('address', 8, 256))
address.comment = """The address of the CPU: 8, 16, 32, 64, 128 or 256 bits."""
abi = Column(Unicode, check_lower('abi'))
abi.comment = """The Application Binary Interface of the processor."""
class RamModule(JoinedComponentTableMixin, Component):
"""A stick of RAM."""
size = Column(SmallInteger, check_range('size', min=128, max=17000))
size.comment = """The capacity of the RAM stick."""
speed = Column(SmallInteger, check_range('speed', min=100, max=10000))
interface = Column(DBEnum(RamInterface))
format = Column(DBEnum(RamFormat))
class SoundCard(JoinedComponentTableMixin, Component):
pass
class Display(JoinedComponentTableMixin, DisplayMixin, Component):
"""The display of a device. This is used in all devices that have
displays but that it is not their main part, like laptops,
mobiles, smart-watches, and so on; excluding ``ComputerMonitor``
and ``TelevisionSet``.
"""
pass
class Battery(JoinedComponentTableMixin, Component):
wireless = db.Column(db.Boolean)
wireless.comment = """If the battery can be charged wirelessly."""
technology = db.Column(db.Enum(BatteryTechnology))
size = db.Column(db.Integer, nullable=False)
size.comment = """Maximum battery capacity by design, in mAh.
Use BatteryTest's "size" to get the actual size of the battery.
"""
@property
def capacity(self) -> float:
"""The quantity of"""
from ereuse_devicehub.resources.action.models import MeasureBattery
real_size = self.last_action_of(MeasureBattery).size
return real_size / self.size if real_size and self.size else None
class Camera(Component):
"""The camera of a device."""
focal_length = db.Column(db.SmallInteger)
video_height = db.Column(db.SmallInteger)
video_width = db.Column(db.Integer)
horizontal_view_angle = db.Column(db.Integer)
facing = db.Column(db.Enum(CameraFacing))
vertical_view_angle = db.Column(db.SmallInteger)
video_stabilization = db.Column(db.Boolean)
flash = db.Column(db.Boolean)
class ComputerAccessory(Device):
"""Computer peripherals and similar accessories."""
id = Column(BigInteger, ForeignKey(Device.id), primary_key=True)
pass
class SAI(ComputerAccessory):
pass
class Keyboard(ComputerAccessory):
layout = Column(DBEnum(Layouts)) # If we want to do it not null
class Mouse(ComputerAccessory):
pass
class MemoryCardReader(ComputerAccessory):
pass
class Networking(NetworkMixin, Device):
"""Routers, switches, hubs..."""
id = Column(BigInteger, ForeignKey(Device.id), primary_key=True)
class Router(Networking):
pass
class Switch(Networking):
pass
class Hub(Networking):
pass
class WirelessAccessPoint(Networking):
pass
class Printer(Device):
id = Column(BigInteger, ForeignKey(Device.id), primary_key=True)
wireless = Column(Boolean, nullable=False, default=False)
wireless.comment = """Whether it is a wireless printer."""
scanning = Column(Boolean, nullable=False, default=False)
scanning.comment = """Whether the printer has scanning capabilities."""
technology = Column(DBEnum(PrinterTechnology))
technology.comment = """Technology used to print."""
monochrome = Column(Boolean, nullable=False, default=True)
monochrome.comment = """Whether the printer is only monochrome."""
class LabelPrinter(Printer):
pass
class Sound(Device):
pass
class Microphone(Sound):
pass
class Video(Device):
"""Devices related to video treatment."""
pass
class VideoScaler(Video):
pass
class Videoconference(Video):
pass
class Cooking(Device):
"""Cooking devices."""
pass
class Mixer(Cooking):
pass
class DIYAndGardening(Device):
pass
class Drill(DIYAndGardening):
max_drill_bit_size = db.Column(db.SmallInteger)
class PackOfScrewdrivers(Device):
pass
class Home(Device):
pass
class Dehumidifier(Home):
size = db.Column(db.SmallInteger)
size.comment = """The capacity in Liters."""
class Stairs(Home):
max_allowed_weight = db.Column(db.Integer)
class Recreation(Device):
pass
class Bike(Recreation):
wheel_size = db.Column(db.SmallInteger)
gears = db.Column(db.SmallInteger)
class Racket(Recreation):
pass
class Manufacturer(db.Model):
"""The normalized information about a manufacturer.
Ideally users should use the names from this list when submitting
devices.
"""
name = db.Column(CIText(), primary_key=True)
name.comment = """The normalized name of the manufacturer."""
url = db.Column(URL(), unique=True)
url.comment = """An URL to a page describing the manufacturer."""
logo = db.Column(URL())
logo.comment = """An URL pointing to the logo of the manufacturer."""
__table_args__ = (
# from https://niallburkley.com/blog/index-columns-for-like-in-postgres/
db.Index('name_index', text('name gin_trgm_ops'), postgresql_using='gin'),
{'schema': 'common'},
)
@classmethod
def add_all_to_session(cls, session: db.Session):
"""Adds all manufacturers to session."""
cursor = session.connection().connection.cursor()
#: Dialect used to write the CSV
with pathlib.Path(__file__).parent.joinpath('manufacturers.csv').open() as f:
cursor.copy_expert('COPY common.manufacturer FROM STDIN (FORMAT csv)', f)
listener_reset_field_updated_in_actual_time(Device)
def create_code_tag(mapper, connection, device):
"""
This function create a new tag every time than one device is create.
this tag is the same of devicehub_id.
"""
from ereuse_devicehub.resources.tag.model import Tag
if isinstance(device, Computer) and not device.placeholder:
tag = Tag(device_id=device.id, id=device.devicehub_id)
db.session.add(tag)
# from flask_sqlalchemy import event
# event.listen(Device, 'after_insert', create_code_tag, propagate=True)
class Other(Device):
"""
Used for put in there all devices than not have actualy a class
"""
id = Column(BigInteger, ForeignKey(Device.id), primary_key=True)