Merge branch 'testing'

This commit is contained in:
Santiago L 2022-07-12 10:15:50 +02:00
commit 6350bcd6a4
34 changed files with 5326 additions and 325 deletions

View file

@ -19,7 +19,7 @@ jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- uses: actions/setup-node@v1
with:
node-version: '16'

View file

@ -32,12 +32,12 @@ jobs:
strategy:
max-parallel: 4
matrix:
python-version: [3.7]
python-version: [3.9]
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
cache: 'pip'

78
.github/workflows/selenium.yml vendored Normal file
View file

@ -0,0 +1,78 @@
name: Selenium
on:
push:
branches: [master, testing]
pull_request:
branches: [master, testing]
jobs:
build:
runs-on: ubuntu-latest
# Service containers to run with `container-job`
services:
# Label used to access the service container
postgres:
# Docker Hub image
image: postgres:11
ports:
- 5432:5432
# Set health checks to wait until postgres has started
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
env:
POSTGRES_DB: dh_test
POSTGRES_USER: dhub
POSTGRES_PASSWORD: ereuse
strategy:
max-parallel: 4
matrix:
python-version: [3.9]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
cache: 'pip'
- name: Install dependencies
run: |
sudo apt-get update -qy
sudo apt-get -y install postgresql-client --no-install-recommends
python -m pip install --upgrade pip
pip install flake8 pytest coverage
pip install -r requirements.txt
pip install -e .
mkdir bin
wget https://github.com/mozilla/geckodriver/releases/download/v0.30.0/geckodriver-v0.30.0-linux64.tar.gz
tar xf geckodriver-v0.30.0-linux64.tar.gz -C bin/
- name: Prepare database
env:
POSTGRES_DB: dh_test
POSTGRES_USER: dhub
POSTGRES_PASSWORD: ereuse
run: |
export PGPASSWORD=$POSTGRES_PASSWORD
psql -h "localhost" -U "$POSTGRES_USER" -d "$POSTGRES_DB" -c "CREATE EXTENSION pgcrypto SCHEMA public;"
psql -h "localhost" -U "$POSTGRES_USER" -d "$POSTGRES_DB" -c "CREATE EXTENSION ltree SCHEMA public;"
psql -h "localhost" -U "$POSTGRES_USER" -d "$POSTGRES_DB" -c "CREATE EXTENSION citext SCHEMA public;"
psql -h "localhost" -U "$POSTGRES_USER" -d "$POSTGRES_DB" -c "CREATE EXTENSION pg_trgm SCHEMA public;"
- name: Selenium tests
env:
SECRET_KEY: 'f00046306835001b55c230092e3a7990485beda0bc3bf732088d1ba1b5b74110e22e3f9ec3a24890272554b37d4'
DB_DATABASE: dh_test
FLASK_APP: examples/app.py
dhi: dbtest
run: |
alembic -x inventory=dbtest upgrade head
dh dummy --yes
flask run & pytest tests/test_selenium.py

View file

@ -5,15 +5,32 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.ht
ml).
## master
## testing
## [2.3.0] - 2022-07-12
- [added] #281 Add selenium test.
- [added] #305 Add button to download ISO Workbench.
- [added] #306 Add link to download JSON snapshot.
- [added] #308 Add sentry.
- [changed] #302 Add system uuid to check the identity of one device.
- [fixed] #309 Column lifecycle status is always empty.
**IMPORTANT**: PR #302 involves some changes in the deployment process:
```bash
# First, run script `extract_uuids.sh` before applying alembic migrations (e.g. with schema `dbtest`)
sh scripts/extract_uuids.sh
# Then, apply alembic migrations
alembic -x inventory=dbtest upgrade head
```
**NOTE**: If you forget (or don't need) to run this script before applying new migration it will work but any device will be updated.
## [2.2.0] - 2022-06-24
- [changed] #304 change anchor of link devices lots.
## [2.2.0 rc2] - 2022-06-22
- [added] #299 Multy select with Shift.
- [added] #299 Multiselect with Shift.
- [added] #300 Add Sid in label.
- [added] #301 Add logo in label.
- [added] #303 Add export Lots.

View file

@ -1 +1 @@
__version__ = "2.2.0"
__version__ = "2.3.0"

View file

@ -29,7 +29,6 @@ from wtforms.fields import FormField
from ereuse_devicehub.db import db
from ereuse_devicehub.inventory.models import DeliveryNote, ReceiverNote, Transfer
from ereuse_devicehub.parser.models import SnapshotsLog
from ereuse_devicehub.parser.parser import ParseSnapshotLsHw
from ereuse_devicehub.parser.schemas import Snapshot_lite
from ereuse_devicehub.resources.action.models import Snapshot, Trade
@ -260,45 +259,35 @@ class UploadSnapshotForm(SnapshotMixin, FlaskForm):
self.tmp_snapshots = app.config['TMP_SNAPSHOTS']
for filename, snapshot_json in self.snapshots:
path_snapshot = save_json(snapshot_json, self.tmp_snapshots, g.user.email)
snapshot_json.pop('debug', None)
version = snapshot_json.get('schema_api')
uuid = snapshot_json.get('uuid')
sid = snapshot_json.get('sid')
software_version = snapshot_json.get('version')
if self.is_wb_lite_snapshot(version):
debug = snapshot_json.pop('debug', None)
self.version = snapshot_json.get('schema_api')
self.uuid = snapshot_json.get('uuid')
self.sid = snapshot_json.get('sid')
if self.is_wb_lite_snapshot(self.version):
self.snapshot_json = schema_lite.load(snapshot_json)
snapshot_json = ParseSnapshotLsHw(self.snapshot_json).snapshot_json
else:
self.version = snapshot_json.get('version')
system_uuid = self.get_uuid(debug)
if system_uuid:
snapshot_json['device']['system_uuid'] = system_uuid
try:
snapshot_json = schema.load(snapshot_json)
except ValidationError as err:
txt = "{}".format(err)
error = SnapshotsLog(
description=txt,
snapshot_uuid=uuid,
severity=Severity.Error,
sid=sid,
version=software_version,
)
error.save(commit=True)
self.errors(txt=txt)
self.result[filename] = 'Error'
continue
response = self.build(snapshot_json)
db.session.add(response)
devices.append(response.device)
snap_log = SnapshotsLog(
description='Ok',
snapshot_uuid=uuid,
severity=Severity.Info,
sid=sid,
version=software_version,
snapshot=response,
)
snap_log.save()
if hasattr(response, 'type'):
self.result[filename] = 'Ok'
self.errors(txt="Ok", severity=Severity.Info, snapshot=response)
else:
self.result[filename] = 'Error'

View file

@ -1,11 +1,15 @@
import csv
import logging
import os
from distutils.util import strtobool
from io import StringIO
from pathlib import Path
import flask
import flask_weasyprint
from flask import Blueprint, g, make_response, request, url_for
from flask import Blueprint
from flask import current_app as app
from flask import g, make_response, request, url_for
from flask.views import View
from flask_login import current_user, login_required
from werkzeug.exceptions import NotFound
@ -479,6 +483,7 @@ class ExportsView(View):
'certificates': self.erasure,
'lots': self.lots_export,
'devices_lots': self.devices_lots_export,
'snapshot': self.snapshot,
}
if export_id not in export_ids:
@ -685,6 +690,33 @@ class ExportsView(View):
data, "Devices_Incoming_and_Outgoing_Lots_Spreadsheet.csv"
)
def snapshot(self):
uuid = request.args.get('id')
if not uuid:
messages.error('Snapshot not exist!')
return flask.redirect(request.referrer)
user = g.user.email
name_file = f"*_{user}_{uuid}.json"
tmp_snapshots = app.config['TMP_SNAPSHOTS']
path_dir_base = os.path.join(tmp_snapshots, user)
for _file in Path(path_dir_base).glob(name_file):
with open(_file) as file_snapshot:
snapshot = file_snapshot.read()
data = StringIO()
data.write(snapshot)
bfile = data.getvalue().encode('utf-8')
output = make_response(bfile)
output.headers['Content-Disposition'] = 'attachment; filename={}'.format(
name_file
)
output.headers['Content-type'] = 'text/json'
return output
messages.error('Snapshot not exist!')
return flask.redirect(request.referrer)
class SnapshotListView(GenericMixin):
template_name = 'inventory/snapshots_list.html'
@ -702,12 +734,18 @@ class SnapshotListView(GenericMixin):
).order_by(SnapshotsLog.created.desc())
logs = {}
for snap in snapshots_log:
try:
system_uuid = snap.snapshot.device.system_uuid or ''
except AttributeError:
system_uuid = ''
if snap.snapshot_uuid not in logs:
logs[snap.snapshot_uuid] = {
'sid': snap.sid,
'snapshot_uuid': snap.snapshot_uuid,
'version': snap.version,
'device': snap.get_device(),
'system_uuid': system_uuid,
'status': snap.get_status(),
'severity': snap.severity,
'created': snap.created,

View file

@ -0,0 +1,33 @@
"""system_uuid instead of uuid
Revision ID: 73348969a583
Revises: dac62da1621a
Create Date: 2022-06-15 12:27:23.170313
"""
from alembic import context, op
# revision identifiers, used by Alembic.
revision = '73348969a583'
down_revision = 'dac62da1621a'
branch_labels = None
depends_on = None
def get_inv():
INV = context.get_x_argument(as_dictionary=True).get('inventory')
if not INV:
raise ValueError("Inventory value is not specified")
return INV
def upgrade():
op.alter_column(
'computer', 'uuid', new_column_name="system_uuid", schema=f'{get_inv()}'
)
def downgrade():
op.alter_column(
'computer', 'system_uuid', new_column_name="uuid", schema=f'{get_inv()}'
)

View file

@ -0,0 +1,67 @@
"""add system uuid to old registers
Revision ID: 8d4fe4b497b3
Revises: 73348969a583
Create Date: 2022-06-15 15:52:39.205192
"""
import os
from uuid import UUID
from alembic import context, op
# revision identifiers, used by Alembic.
revision = '8d4fe4b497b3'
down_revision = '73348969a583'
branch_labels = None
depends_on = None
def get_inv():
INV = context.get_x_argument(as_dictionary=True).get('inventory')
if not INV:
raise ValueError("Inventory value is not specified")
return INV
def update_db(con, system_uuid, snapshot_uuid):
sql_snapshot = f'select id from {get_inv()}.snapshot where uuid=\'{snapshot_uuid}\''
sql_device_id = f'select device_id from {get_inv()}.action_with_one_device where id in ({sql_snapshot})'
sql = f'select id, system_uuid from {get_inv()}.computer where id in ({sql_device_id})'
for device_id, db_system_uuid in con.execute(sql):
if db_system_uuid:
return
sql = f'update {get_inv()}.computer set system_uuid=\'{system_uuid}\' where id=\'{device_id}\''
con.execute(sql)
def update_to_little_endian(uuid):
uuid = UUID(uuid)
return UUID(bytes_le=uuid.bytes)
def upgrade():
uuids = []
system_uuids_file = 'system_uuids.csv'
if os.path.exists(system_uuids_file):
with open(system_uuids_file) as f:
for x in f.read().split('\n'):
z = x.split(';')
if len(z) != 2:
continue
x, y = z
uuids.append([x.strip(), y.strip()])
con = op.get_bind()
for u in uuids[1:]:
if u[0] == '':
continue
u[0] = update_to_little_endian(u[0])
update_db(con, u[0], u[1])
def downgrade():
pass

View file

@ -94,7 +94,7 @@ class Processor(Component):
assert not hasattr(self, 'cores') or 1 <= self.cores <= 16
@staticmethod # noqa: C901
def processor_brand_generation(model: str):
def processor_brand_generation(model: str): # noqa: C901
"""Generates the ``brand`` and ``generation`` fields for the given model.
This returns a tuple with:

View file

@ -52,7 +52,7 @@ class ParseSnapshot:
self.device['type'] = self.get_type()
self.device['sku'] = self.get_sku()
self.device['version'] = self.get_version()
self.device['uuid'] = self.get_uuid()
self.device['system_uuid'] = self.get_uuid()
def set_components(self):
self.get_cpu()
@ -379,7 +379,7 @@ class ParseSnapshotLsHw:
raise ValidationError(txt)
self.device = pc
self.device['uuid'] = self.get_uuid()
self.device['system_uuid'] = self.get_uuid()
def set_components(self):
memory = None

View file

@ -4,10 +4,10 @@ import json
import os
import shutil
from datetime import datetime
from uuid import UUID
from flask import current_app as app
from flask import g
from marshmallow import ValidationError
from sqlalchemy.util import OrderedSet
from ereuse_devicehub.db import db
@ -117,6 +117,49 @@ class SnapshotMixin:
return snapshot
def get_old_smbios_version(self, debug):
capabilities = debug.get('lshw', {}).get('capabilities', {})
for x in capabilities.values():
if "SMBIOS version" in x:
e = x.split("SMBIOS version ")[1].split(".")
if int(e[0]) < 3 and int(e[1]) < 6:
self.errors(txt=x)
return True
return False
def get_uuid(self, debug):
if not debug or not isinstance(debug, dict):
self.errors(txt="There is not uuid")
return
if self.get_old_smbios_version(debug):
return
hw_uuid = debug.get('lshw', {}).get('configuration', {}).get('uuid')
if not hw_uuid:
self.errors(txt="There is not uuid")
return
uuid = UUID(hw_uuid)
return UUID(bytes_le=uuid.bytes)
def errors(self, txt=None, severity=Severity.Error, snapshot=None, commit=False):
if not txt:
return
from ereuse_devicehub.parser.models import SnapshotsLog
error = SnapshotsLog(
description=txt,
snapshot_uuid=self.uuid,
severity=severity,
sid=self.sid,
version=self.version,
snapshot=snapshot,
)
error.save(commit=commit)
class SnapshotView(SnapshotMixin):
"""Performs a Snapshot.
@ -129,38 +172,29 @@ class SnapshotView(SnapshotMixin):
# snapshot, and we want to wait to flush snapshot at the end
def __init__(self, snapshot_json: dict, resource_def, schema):
from ereuse_devicehub.parser.models import SnapshotsLog
self.schema = schema
self.resource_def = resource_def
self.tmp_snapshots = app.config['TMP_SNAPSHOTS']
self.path_snapshot = save_json(snapshot_json, self.tmp_snapshots, g.user.email)
snapshot_json.pop('debug', None)
self.version = snapshot_json.get('version')
self.uuid = snapshot_json.get('uuid')
self.sid = None
system_uuid = self.get_uuid(snapshot_json.pop('debug', None))
if system_uuid:
snapshot_json['device']['system_uuid'] = system_uuid
try:
self.snapshot_json = resource_def.schema.load(snapshot_json)
snapshot = self.build()
except Exception as err:
txt = "{}".format(err)
uuid = snapshot_json.get('uuid')
version = snapshot_json.get('version')
error = SnapshotsLog(
description=txt,
snapshot_uuid=uuid,
severity=Severity.Error,
version=str(version),
)
error.save(commit=True)
self.errors(txt=txt, commit=True)
raise err
db.session.add(snapshot)
snap_log = SnapshotsLog(
description='Ok',
snapshot_uuid=snapshot.uuid,
severity=Severity.Info,
version=str(snapshot.version),
snapshot=snapshot,
)
snap_log.save()
self.errors(txt="Ok", severity=Severity.Info, snapshot=snapshot, commit=False)
db.session().final_flush()
self.response = self.schema.jsonify(snapshot) # transform it back
self.response.status_code = 201

View file

@ -191,6 +191,7 @@ class Device(Thing):
'image',
'allocated',
'devicehub_id',
'system_uuid',
'active',
}
@ -818,7 +819,7 @@ class Computer(Device):
transfer_state.comment = TransferState.__doc__
receiver_id = db.Column(UUID(as_uuid=True), db.ForeignKey(User.id), nullable=True)
receiver = db.relationship(User, primaryjoin=receiver_id == User.id)
uuid = db.Column(UUID(as_uuid=True), nullable=True)
system_uuid = db.Column(UUID(as_uuid=True), nullable=True)
def __init__(self, *args, **kwargs) -> None:
if args:

View file

@ -1,17 +1,30 @@
import datetime
from marshmallow import post_load, pre_load, fields as f
from marshmallow.fields import Boolean, Date, DateTime, Float, Integer, List, Str, String, UUID, Dict
from marshmallow import fields as f
from marshmallow import post_load, pre_load
from marshmallow.fields import (
UUID,
Boolean,
Date,
DateTime,
Dict,
Float,
Integer,
List,
Str,
String,
)
from marshmallow.validate import Length, OneOf, Range
from sqlalchemy.util import OrderedSet
from stdnum import imei, meid
from teal.enums import Layouts
from teal.marshmallow import EnumField, SanitizedStr, URL, ValidationError
from teal.marshmallow import URL, EnumField, SanitizedStr, ValidationError
from teal.resource import Schema
from ereuse_devicehub.marshmallow import NestedOn
from ereuse_devicehub.resources import enums
from ereuse_devicehub.resources.device import models as m, states
from ereuse_devicehub.resources.device import models as m
from ereuse_devicehub.resources.device import states
from ereuse_devicehub.resources.models import STR_BIG_SIZE, STR_SIZE
from ereuse_devicehub.resources.schemas import Thing, UnitCodes
@ -20,58 +33,90 @@ class Device(Thing):
__doc__ = m.Device.__doc__
id = Integer(description=m.Device.id.comment, dump_only=True)
hid = SanitizedStr(lower=True, description=m.Device.hid.comment)
tags = NestedOn('Tag',
many=True,
collection_class=OrderedSet,
description='A set of tags that identify the device.')
model = SanitizedStr(lower=True,
validate=Length(max=STR_BIG_SIZE),
description=m.Device.model.comment)
manufacturer = SanitizedStr(lower=True,
validate=Length(max=STR_SIZE),
description=m.Device.manufacturer.comment)
serial_number = SanitizedStr(lower=True,
validate=Length(max=STR_BIG_SIZE),
data_key='serialNumber')
brand = SanitizedStr(validate=Length(max=STR_BIG_SIZE), description=m.Device.brand.comment)
generation = Integer(validate=Range(1, 100), description=m.Device.generation.comment)
tags = NestedOn(
'Tag',
many=True,
collection_class=OrderedSet,
description='A set of tags that identify the device.',
)
model = SanitizedStr(
lower=True,
validate=Length(max=STR_BIG_SIZE),
description=m.Device.model.comment,
)
manufacturer = SanitizedStr(
lower=True,
validate=Length(max=STR_SIZE),
description=m.Device.manufacturer.comment,
)
serial_number = SanitizedStr(
lower=True, validate=Length(max=STR_BIG_SIZE), data_key='serialNumber'
)
brand = SanitizedStr(
validate=Length(max=STR_BIG_SIZE), description=m.Device.brand.comment
)
generation = Integer(
validate=Range(1, 100), description=m.Device.generation.comment
)
version = SanitizedStr(description=m.Device.version)
weight = Float(validate=Range(0.1, 5), unit=UnitCodes.kgm, description=m.Device.weight.comment)
width = Float(validate=Range(0.1, 5), unit=UnitCodes.m, description=m.Device.width.comment)
height = Float(validate=Range(0.1, 5), unit=UnitCodes.m, description=m.Device.height.comment)
depth = Float(validate=Range(0.1, 5), unit=UnitCodes.m, description=m.Device.depth.comment)
weight = Float(
validate=Range(0.1, 5), unit=UnitCodes.kgm, description=m.Device.weight.comment
)
width = Float(
validate=Range(0.1, 5), unit=UnitCodes.m, description=m.Device.width.comment
)
height = Float(
validate=Range(0.1, 5), unit=UnitCodes.m, description=m.Device.height.comment
)
depth = Float(
validate=Range(0.1, 5), unit=UnitCodes.m, description=m.Device.depth.comment
)
# TODO TimeOut 2. Comment actions and lots if there are time out.
actions = NestedOn('Action', many=True, dump_only=True, description=m.Device.actions.__doc__)
actions = NestedOn(
'Action', many=True, dump_only=True, description=m.Device.actions.__doc__
)
# TODO TimeOut 2. Comment actions_one and lots if there are time out.
actions_one = NestedOn('Action', many=True, load_only=True, collection_class=OrderedSet)
problems = NestedOn('Action', many=True, dump_only=True, description=m.Device.problems.__doc__)
actions_one = NestedOn(
'Action', many=True, load_only=True, collection_class=OrderedSet
)
problems = NestedOn(
'Action', many=True, dump_only=True, description=m.Device.problems.__doc__
)
url = URL(dump_only=True, description=m.Device.url.__doc__)
# TODO TimeOut 2. Comment actions and lots if there are time out.
lots = NestedOn('Lot',
many=True,
dump_only=True,
description='The lots where this device is directly under.')
lots = NestedOn(
'Lot',
many=True,
dump_only=True,
description='The lots where this device is directly under.',
)
rate = NestedOn('Rate', dump_only=True, description=m.Device.rate.__doc__)
price = NestedOn('Price', dump_only=True, description=m.Device.price.__doc__)
tradings = Dict(dump_only=True, description='')
physical = EnumField(states.Physical, dump_only=True, description=m.Device.physical.__doc__)
traking = EnumField(states.Traking, dump_only=True, description=m.Device.physical.__doc__)
usage = EnumField(states.Usage, dump_only=True, description=m.Device.physical.__doc__)
physical = EnumField(
states.Physical, dump_only=True, description=m.Device.physical.__doc__
)
traking = EnumField(
states.Traking, dump_only=True, description=m.Device.physical.__doc__
)
usage = EnumField(
states.Usage, dump_only=True, description=m.Device.physical.__doc__
)
revoke = UUID(dump_only=True)
physical_possessor = NestedOn('Agent', dump_only=True, data_key='physicalPossessor')
production_date = DateTime('iso',
description=m.Device.updated.comment,
data_key='productionDate')
working = NestedOn('Action',
many=True,
dump_only=True,
description=m.Device.working.__doc__)
production_date = DateTime(
'iso', description=m.Device.updated.comment, data_key='productionDate'
)
working = NestedOn(
'Action', many=True, dump_only=True, description=m.Device.working.__doc__
)
variant = SanitizedStr(description=m.Device.variant.comment)
sku = SanitizedStr(description=m.Device.sku.comment)
image = URL(description=m.Device.image.comment)
allocated = Boolean(description=m.Device.allocated.comment)
devicehub_id = SanitizedStr(data_key='devicehubID',
description=m.Device.devicehub_id.comment)
devicehub_id = SanitizedStr(
data_key='devicehubID', description=m.Device.devicehub_id.comment
)
@pre_load
def from_actions_to_actions_one(self, data: dict):
@ -91,52 +136,73 @@ class Device(Thing):
@post_load
def validate_snapshot_actions(self, data):
"""Validates that only snapshot-related actions can be uploaded."""
from ereuse_devicehub.resources.action.models import EraseBasic, Test, Rate, Install, \
Benchmark
from ereuse_devicehub.resources.action.models import (
Benchmark,
EraseBasic,
Install,
Rate,
Test,
)
for action in data['actions_one']:
if not isinstance(action, (Install, EraseBasic, Rate, Test, Benchmark)):
raise ValidationError('You cannot upload {}'.format(action),
field_names=['actions'])
raise ValidationError(
'You cannot upload {}'.format(action), field_names=['actions']
)
class Computer(Device):
__doc__ = m.Computer.__doc__
# TODO TimeOut 1. Comment components if there are time out.
components = NestedOn('Component',
many=True,
dump_only=True,
collection_class=OrderedSet,
description='The components that are inside this computer.')
chassis = EnumField(enums.ComputerChassis,
description=m.Computer.chassis.comment)
ram_size = Integer(dump_only=True,
data_key='ramSize',
description=m.Computer.ram_size.__doc__)
data_storage_size = Integer(dump_only=True,
data_key='dataStorageSize',
description=m.Computer.data_storage_size.__doc__)
processor_model = Str(dump_only=True,
data_key='processorModel',
description=m.Computer.processor_model.__doc__)
graphic_card_model = Str(dump_only=True,
data_key='graphicCardModel',
description=m.Computer.graphic_card_model.__doc__)
network_speeds = List(Integer(dump_only=True),
dump_only=True,
data_key='networkSpeeds',
description=m.Computer.network_speeds.__doc__)
privacy = NestedOn('Action',
many=True,
dump_only=True,
collection_class=set,
description=m.Computer.privacy.__doc__)
amount = Integer(validate=f.validate.Range(min=0, max=100),
description=m.Computer.amount.__doc__)
components = NestedOn(
'Component',
many=True,
dump_only=True,
collection_class=OrderedSet,
description='The components that are inside this computer.',
)
chassis = EnumField(enums.ComputerChassis, description=m.Computer.chassis.comment)
ram_size = Integer(
dump_only=True, data_key='ramSize', description=m.Computer.ram_size.__doc__
)
data_storage_size = Integer(
dump_only=True,
data_key='dataStorageSize',
description=m.Computer.data_storage_size.__doc__,
)
processor_model = Str(
dump_only=True,
data_key='processorModel',
description=m.Computer.processor_model.__doc__,
)
graphic_card_model = Str(
dump_only=True,
data_key='graphicCardModel',
description=m.Computer.graphic_card_model.__doc__,
)
network_speeds = List(
Integer(dump_only=True),
dump_only=True,
data_key='networkSpeeds',
description=m.Computer.network_speeds.__doc__,
)
privacy = NestedOn(
'Action',
many=True,
dump_only=True,
collection_class=set,
description=m.Computer.privacy.__doc__,
)
amount = Integer(
validate=f.validate.Range(min=0, max=100), description=m.Computer.amount.__doc__
)
# author_id = NestedOn(s_user.User, only_query='author_id')
owner_id = UUID(data_key='ownerID')
transfer_state = EnumField(enums.TransferState, description=m.Computer.transfer_state.comment)
transfer_state = EnumField(
enums.TransferState, description=m.Computer.transfer_state.comment
)
receiver_id = UUID(data_key='receiverID')
uuid = UUID(required=False)
system_uuid = UUID(required=False)
class Desktop(Computer):
@ -155,29 +221,36 @@ class Server(Computer):
class DisplayMixin:
__doc__ = m.DisplayMixin.__doc__
size = Float(description=m.DisplayMixin.size.comment, validate=Range(2, 150))
technology = EnumField(enums.DisplayTech,
description=m.DisplayMixin.technology.comment)
resolution_width = Integer(data_key='resolutionWidth',
validate=Range(10, 20000),
description=m.DisplayMixin.resolution_width.comment,
)
resolution_height = Integer(data_key='resolutionHeight',
validate=Range(10, 20000),
description=m.DisplayMixin.resolution_height.comment,
)
technology = EnumField(
enums.DisplayTech, description=m.DisplayMixin.technology.comment
)
resolution_width = Integer(
data_key='resolutionWidth',
validate=Range(10, 20000),
description=m.DisplayMixin.resolution_width.comment,
)
resolution_height = Integer(
data_key='resolutionHeight',
validate=Range(10, 20000),
description=m.DisplayMixin.resolution_height.comment,
)
refresh_rate = Integer(data_key='refreshRate', validate=Range(10, 1000))
contrast_ratio = Integer(data_key='contrastRatio', validate=Range(100, 100000))
touchable = Boolean(description=m.DisplayMixin.touchable.comment)
aspect_ratio = String(dump_only=True, description=m.DisplayMixin.aspect_ratio.__doc__)
aspect_ratio = String(
dump_only=True, description=m.DisplayMixin.aspect_ratio.__doc__
)
widescreen = Boolean(dump_only=True, description=m.DisplayMixin.widescreen.__doc__)
class NetworkMixin:
__doc__ = m.NetworkMixin.__doc__
speed = Integer(validate=Range(min=10, max=10000),
unit=UnitCodes.mbps,
description=m.NetworkAdapter.speed.comment)
speed = Integer(
validate=Range(min=10, max=10000),
unit=UnitCodes.mbps,
description=m.NetworkAdapter.speed.comment,
)
wireless = Boolean(required=True)
@ -198,16 +271,22 @@ class Mobile(Device):
imei = Integer(description=m.Mobile.imei.comment)
meid = Str(description=m.Mobile.meid.comment)
ram_size = Integer(validate=Range(min=128, max=36000),
data_key='ramSize',
unit=UnitCodes.mbyte,
description=m.Mobile.ram_size.comment)
data_storage_size = Integer(validate=Range(0, 10 ** 8),
data_key='dataStorageSize',
description=m.Mobile.data_storage_size)
display_size = Float(validate=Range(min=0.1, max=30.0),
data_key='displaySize',
description=m.Mobile.display_size.comment)
ram_size = Integer(
validate=Range(min=128, max=36000),
data_key='ramSize',
unit=UnitCodes.mbyte,
description=m.Mobile.ram_size.comment,
)
data_storage_size = Integer(
validate=Range(0, 10**8),
data_key='dataStorageSize',
description=m.Mobile.data_storage_size,
)
display_size = Float(
validate=Range(min=0.1, max=30.0),
data_key='displaySize',
description=m.Mobile.display_size.comment,
)
@pre_load
def convert_check_imei(self, data):
@ -243,17 +322,21 @@ class Component(Device):
class GraphicCard(Component):
__doc__ = m.GraphicCard.__doc__
memory = Integer(validate=Range(0, 10000),
unit=UnitCodes.mbyte,
description=m.GraphicCard.memory.comment)
memory = Integer(
validate=Range(0, 10000),
unit=UnitCodes.mbyte,
description=m.GraphicCard.memory.comment,
)
class DataStorage(Component):
__doc__ = m.DataStorage.__doc__
size = Integer(validate=Range(0, 10 ** 8),
unit=UnitCodes.mbyte,
description=m.DataStorage.size.comment)
size = Integer(
validate=Range(0, 10**8),
unit=UnitCodes.mbyte,
description=m.DataStorage.size.comment,
)
interface = EnumField(enums.DataStorageInterface)
privacy = NestedOn('Action', dump_only=True)
@ -269,16 +352,21 @@ class SolidStateDrive(DataStorage):
class Motherboard(Component):
__doc__ = m.Motherboard.__doc__
slots = Integer(validate=Range(0, 20),
description=m.Motherboard.slots.comment)
slots = Integer(validate=Range(0, 20), description=m.Motherboard.slots.comment)
usb = Integer(validate=Range(0, 20), description=m.Motherboard.usb.comment)
firewire = Integer(validate=Range(0, 20), description=m.Motherboard.firewire.comment)
firewire = Integer(
validate=Range(0, 20), description=m.Motherboard.firewire.comment
)
serial = Integer(validate=Range(0, 20), description=m.Motherboard.serial.comment)
pcmcia = Integer(validate=Range(0, 20), description=m.Motherboard.pcmcia.comment)
bios_date = Date(validate=Range(datetime.date(year=1980, month=1, day=1),
datetime.date(year=2030, month=1, day=1)),
data_key='biosDate',
description=m.Motherboard.bios_date)
bios_date = Date(
validate=Range(
datetime.date(year=1980, month=1, day=1),
datetime.date(year=2030, month=1, day=1),
),
data_key='biosDate',
description=m.Motherboard.bios_date,
)
ram_slots = Integer(validate=Range(1), data_key='ramSlots')
ram_max_size = Integer(validate=Range(1), data_key='ramMaxSize')
@ -290,22 +378,32 @@ class NetworkAdapter(NetworkMixin, Component):
class Processor(Component):
__doc__ = m.Processor.__doc__
speed = Float(validate=Range(min=0.1, max=15),
unit=UnitCodes.ghz,
description=m.Processor.speed.comment)
cores = Integer(validate=Range(min=1, max=10), description=m.Processor.cores.comment)
threads = Integer(validate=Range(min=1, max=20), description=m.Processor.threads.comment)
address = Integer(validate=OneOf({8, 16, 32, 64, 128, 256}),
description=m.Processor.address.comment)
speed = Float(
validate=Range(min=0.1, max=15),
unit=UnitCodes.ghz,
description=m.Processor.speed.comment,
)
cores = Integer(
validate=Range(min=1, max=10), description=m.Processor.cores.comment
)
threads = Integer(
validate=Range(min=1, max=20), description=m.Processor.threads.comment
)
address = Integer(
validate=OneOf({8, 16, 32, 64, 128, 256}),
description=m.Processor.address.comment,
)
abi = SanitizedStr(lower=True, description=m.Processor.abi.comment)
class RamModule(Component):
__doc__ = m.RamModule.__doc__
size = Integer(validate=Range(min=128, max=17000),
unit=UnitCodes.mbyte,
description=m.RamModule.size.comment)
size = Integer(
validate=Range(min=128, max=17000),
unit=UnitCodes.mbyte,
description=m.RamModule.size.comment,
)
speed = Integer(validate=Range(min=100, max=10000), unit=UnitCodes.mhz)
interface = EnumField(enums.RamInterface)
format = EnumField(enums.RamFormat)
@ -323,7 +421,9 @@ class Battery(Component):
__doc__ = m.Battery.__doc__
wireless = Boolean(description=m.Battery.wireless.comment)
technology = EnumField(enums.BatteryTechnology, description=m.Battery.technology.comment)
technology = EnumField(
enums.BatteryTechnology, description=m.Battery.technology.comment
)
size = Integer(required=True, description=m.Battery.size.comment)
@ -393,12 +493,18 @@ class WirelessAccessPoint(Networking):
class Printer(Device):
__doc__ = m.Printer.__doc__
wireless = Boolean(required=True, missing=False, description=m.Printer.wireless.comment)
scanning = Boolean(required=True, missing=False, description=m.Printer.scanning.comment)
technology = EnumField(enums.PrinterTechnology,
required=True,
description=m.Printer.technology.comment)
monochrome = Boolean(required=True, missing=True, description=m.Printer.monochrome.comment)
wireless = Boolean(
required=True, missing=False, description=m.Printer.wireless.comment
)
scanning = Boolean(
required=True, missing=False, description=m.Printer.scanning.comment
)
technology = EnumField(
enums.PrinterTechnology, required=True, description=m.Printer.technology.comment
)
monochrome = Boolean(
required=True, missing=True, description=m.Printer.monochrome.comment
)
class LabelPrinter(Printer):

View file

@ -13,10 +13,14 @@ from teal.marshmallow import ValidationError
from ereuse_devicehub.db import db
from ereuse_devicehub.resources.action.models import Remove
from ereuse_devicehub.resources.device.models import Component, Computer, Device, DataStorage
from ereuse_devicehub.resources.device.models import (
Component,
Computer,
DataStorage,
Device,
)
from ereuse_devicehub.resources.tag.model import Tag
DEVICES_ALLOW_DUPLICITY = [
'RamModule',
'Display',
@ -26,12 +30,13 @@ DEVICES_ALLOW_DUPLICITY = [
'GraphicCard',
]
class Sync:
"""Synchronizes the device and components with the database."""
def run(self,
device: Device,
components: Iterable[Component] or None) -> (Device, OrderedSet):
def run(
self, device: Device, components: Iterable[Component] or None
) -> (Device, OrderedSet):
"""Synchronizes the device and components with the database.
Identifies if the device and components exist in the database
@ -72,9 +77,9 @@ class Sync:
blacklist = set() # type: Set[int]
not_new_components = set()
for component in components:
db_component, is_new = self.execute_register_component(component,
blacklist,
parent=db_device)
db_component, is_new = self.execute_register_component(
component, blacklist, parent=db_device
)
db_components.add(db_component)
if not is_new:
not_new_components.add(db_component)
@ -83,10 +88,9 @@ class Sync:
db_device.components = db_components
return db_device, actions
def execute_register_component(self,
component: Component,
blacklist: Set[int],
parent: Computer):
def execute_register_component(
self, component: Component, blacklist: Set[int], parent: Computer
):
"""Synchronizes one component to the DB.
This method is a specialization of :meth:`.execute_register`
@ -118,9 +122,12 @@ class Sync:
# if not, then continue with the traditional behaviour
try:
if component.hid:
db_component = Device.query.filter_by(hid=component.hid, owner_id=g.user.id).one()
assert isinstance(db_component, Device), \
'{} must be a component'.format(db_component)
db_component = Device.query.filter_by(
hid=component.hid, owner_id=g.user.id
).one()
assert isinstance(
db_component, Device
), '{} must be a component'.format(db_component)
else:
# Is there a component similar to ours?
db_component = component.similar_one(parent, blacklist)
@ -166,15 +173,35 @@ class Sync:
:return: The synced device from the db with the tags linked.
"""
assert inspect(device).transient, 'Device cannot be already synced from DB'
assert all(inspect(tag).transient for tag in device.tags), 'Tags cannot be synced from DB'
assert all(
inspect(tag).transient for tag in device.tags
), 'Tags cannot be synced from DB'
db_device = None
if device.hid:
if isinstance(device, Computer):
# first search by uuid
if device.system_uuid:
with suppress(ResourceNotFound):
db_device = Computer.query.filter_by(
system_uuid=device.system_uuid, owner_id=g.user.id, active=True
).one()
# if no there are any Computer by uuid search by hid
if not db_device and device.hid:
with suppress(ResourceNotFound):
db_device = Device.query.filter_by(
hid=device.hid, owner_id=g.user.id, active=True
).one()
elif device.hid:
with suppress(ResourceNotFound):
db_device = Device.query.filter_by(hid=device.hid, owner_id=g.user.id, active=True).one()
db_device = Device.query.filter_by(
hid=device.hid, owner_id=g.user.id, active=True
).one()
if db_device and db_device.allocated:
raise ResourceNotFound('device is actually allocated {}'.format(device))
try:
tags = {Tag.from_an_id(tag.id).one() for tag in device.tags} # type: Set[Tag]
tags = {
Tag.from_an_id(tag.id).one() for tag in device.tags
} # type: Set[Tag]
except ResourceNotFound:
raise ResourceNotFound('tag you are linking to device {}'.format(device))
linked_tags = {tag for tag in tags if tag.device_id} # type: Set[Tag]
@ -182,16 +209,22 @@ class Sync:
sample_tag = next(iter(linked_tags))
for tag in linked_tags:
if tag.device_id != sample_tag.device_id:
raise MismatchBetweenTags(tag, sample_tag) # Tags linked to different devices
raise MismatchBetweenTags(
tag, sample_tag
) # Tags linked to different devices
if db_device: # Device from hid
if sample_tag.device_id != db_device.id: # Device from hid != device from tags
if (
sample_tag.device_id != db_device.id
): # Device from hid != device from tags
raise MismatchBetweenTagsAndHid(db_device.id, db_device.hid)
else: # There was no device from hid
if sample_tag.device.physical_properties != device.physical_properties:
# Incoming physical props of device != props from tag's device
# which means that the devices are not the same
raise MismatchBetweenProperties(sample_tag.device.physical_properties,
device.physical_properties)
raise MismatchBetweenProperties(
sample_tag.device.physical_properties,
device.physical_properties,
)
db_device = sample_tag.device
if db_device: # Device from hid or tags
self.merge(device, db_device)
@ -199,17 +232,21 @@ class Sync:
device.tags.clear() # We don't want to add the transient dummy tags
db.session.add(device)
db_device = device
db_device.tags |= tags # Union of tags the device had plus the (potentially) new ones
db_device.tags |= (
tags # Union of tags the device had plus the (potentially) new ones
)
try:
db.session.flush()
except IntegrityError as e:
# Manage 'one tag per organization' unique constraint
if 'One tag per organization' in e.args[0]:
# todo test for this
id = int(e.args[0][135:e.args[0].index(',', 135)])
raise ValidationError('The device is already linked to tag {} '
'from the same organization.'.format(id),
field_names=['device.tags'])
id = int(e.args[0][135 : e.args[0].index(',', 135)])
raise ValidationError(
'The device is already linked to tag {} '
'from the same organization.'.format(id),
field_names=['device.tags'],
)
else:
raise
assert db_device is not None
@ -228,9 +265,15 @@ class Sync:
if value is not None:
setattr(db_device, field_name, value)
# if device.system_uuid and db_device.system_uuid and device.system_uuid != db_device.system_uuid:
# TODO @cayop send error to sentry.io
# there are 2 computers duplicate get db_device for hid
if hasattr(device, 'system_uuid') and device.system_uuid:
db_device.system_uuid = device.system_uuid
@staticmethod
def add_remove(device: Computer,
components: Set[Component]) -> OrderedSet:
def add_remove(device: Computer, components: Set[Component]) -> OrderedSet:
"""Generates the Add and Remove actions (but doesn't add them to
session).
@ -251,9 +294,13 @@ class Sync:
if adding:
# For the components we are adding, let's remove them from their old parents
def g_parent(component: Component) -> Device:
return component.parent or Device(id=0) # Computer with id 0 is our Identity
return component.parent or Device(
id=0
) # Computer with id 0 is our Identity
for parent, _components in groupby(sorted(adding, key=g_parent), key=g_parent):
for parent, _components in groupby(
sorted(adding, key=g_parent), key=g_parent
):
set_components = OrderedSet(_components)
check_owners = (x.owner_id == g.user.id for x in set_components)
# Is not Computer Identity and all components have the correct owner
@ -263,21 +310,18 @@ class Sync:
class MismatchBetweenTags(ValidationError):
def __init__(self,
tag: Tag,
other_tag: Tag,
field_names={'device.tags'}):
message = '{!r} and {!r} are linked to different devices.'.format(tag, other_tag)
def __init__(self, tag: Tag, other_tag: Tag, field_names={'device.tags'}):
message = '{!r} and {!r} are linked to different devices.'.format(
tag, other_tag
)
super().__init__(message, field_names)
class MismatchBetweenTagsAndHid(ValidationError):
def __init__(self,
device_id: int,
hid: str,
field_names={'device.hid'}):
message = 'Tags are linked to device {} but hid refers to device {}.'.format(device_id,
hid)
def __init__(self, device_id: int, hid: str, field_names={'device.hid'}):
message = 'Tags are linked to device {} but hid refers to device {}.'.format(
device_id, hid
)
super().__init__(message, field_names)
@ -285,6 +329,8 @@ class MismatchBetweenProperties(ValidationError):
def __init__(self, props1, props2, field_names={'device'}):
message = 'The device from the tag and the passed-in differ the following way:'
message += '\n'.join(
difflib.ndiff(yaml.dump(props1).splitlines(), yaml.dump(props2).splitlines())
difflib.ndiff(
yaml.dump(props1).splitlines(), yaml.dump(props2).splitlines()
)
)
super().__init__(message, field_names)

View file

@ -1,13 +1,18 @@
""" This file frame a correct row for csv report """
from collections import OrderedDict
from flask import url_for
from ereuse_devicehub.resources.enums import Severity
from ereuse_devicehub.resources.device import models as d, states
from ereuse_devicehub.resources.action import models as da
from ereuse_devicehub.resources.action.models import (BenchmarkDataStorage, RateComputer,
TestDataStorage)
from ereuse_devicehub.resources.action.models import (
BenchmarkDataStorage,
RateComputer,
TestDataStorage,
)
from ereuse_devicehub.resources.device import models as d
from ereuse_devicehub.resources.device import states
from ereuse_devicehub.resources.enums import Severity
class DeviceRow(OrderedDict):
@ -40,20 +45,20 @@ class DeviceRow(OrderedDict):
software = ''
if snapshot:
software = "{software} {version}".format(
software=snapshot.software.name, version=snapshot.version)
software=snapshot.software.name, version=snapshot.version
)
# General information about device
self['DHID'] = device.devicehub_id
self['DocumentID'] = self.document_id
self['Public Link'] = '{url}{id}'.format(
url=url_for('Device.main', _external=True),
id=device.devicehub_id)
url=url_for('Device.main', _external=True), id=device.devicehub_id
)
self['Lots'] = ', '.join([x.name for x in self.device.lots])
self['Tag 1 Type'] = self['Tag 1 ID'] = self['Tag 1 Organization'] = ''
self['Tag 2 Type'] = self['Tag 2 ID'] = self['Tag 2 Organization'] = ''
self['Tag 3 Type'] = self['Tag 3 ID'] = self['Tag 3 Organization'] = ''
for i, tag in zip(range(1, 3), device.tags):
self['Tag {} Type'.format(
i)] = 'unamed' if tag.provider else 'named'
self['Tag {} Type'.format(i)] = 'unamed' if tag.provider else 'named'
self['Tag {} ID'.format(i)] = tag.id
self['Tag {} Organization'.format(i)] = tag.org.name
@ -79,8 +84,7 @@ class DeviceRow(OrderedDict):
self['Allocate state'] = device.allocated_status.type
try:
self['Lifecycle state'] = device.last_action_of(
*states.Trading.actions()).t
self['Lifecycle state'] = device.last_action_of(*states.Status.actions()).t
except LookupError:
self['Lifecycle state'] = ''
if isinstance(device, d.Computer):
@ -155,10 +159,12 @@ class DeviceRow(OrderedDict):
self['{} {} Serial Number'.format(ctype, i)] = ''
else:
self['{} {} Manufacturer'.format(ctype, i)] = none2str(
component.manufacturer)
component.manufacturer
)
self['{} {} Model'.format(ctype, i)] = none2str(component.model)
self['{} {} Serial Number'.format(ctype, i)] = none2str(
component.serial_number)
component.serial_number
)
if ctype == d.Processor.t:
self.get_processor(ctype, i, component)
@ -178,12 +184,10 @@ class DeviceRow(OrderedDict):
self['{} {} Number of cores'.format(ctype, i)] = ''
self['{} {} Speed (GHz)'.format(ctype, i)] = ''
self['Benchmark {} {} (points)'.format(ctype, i)] = ''
self['Benchmark ProcessorSysbench {} {} (points)'.format(
ctype, i)] = ''
self['Benchmark ProcessorSysbench {} {} (points)'.format(ctype, i)] = ''
return
self['{} {} Number of cores'.format(
ctype, i)] = none2str(component.cores)
self['{} {} Number of cores'.format(ctype, i)] = none2str(component.cores)
self['{} {} Speed (GHz)'.format(ctype, i)] = none2str(component.speed)
benchmark = get_action(component, 'BenchmarkProcessor')
@ -194,11 +198,11 @@ class DeviceRow(OrderedDict):
sysbench = get_action(component, 'BenchmarkProcessorSysbench')
if not sysbench:
self['Benchmark ProcessorSysbench {} {} (points)'.format(
ctype, i)] = ''
self['Benchmark ProcessorSysbench {} {} (points)'.format(ctype, i)] = ''
return
self['Benchmark ProcessorSysbench {} {} (points)'.format(
ctype, i)] = sysbench.rate
self[
'Benchmark ProcessorSysbench {} {} (points)'.format(ctype, i)
] = sysbench.rate
def get_ram(self, ctype, i, component):
"""Particular fields for component Ram Module."""
@ -212,7 +216,7 @@ class DeviceRow(OrderedDict):
def get_datastorage(self, ctype, i, component):
"""Particular fields for component DataStorage.
A DataStorage can be HardDrive or SolidStateDrive.
A DataStorage can be HardDrive or SolidStateDrive.
"""
if component is None:
@ -244,21 +248,23 @@ class DeviceRow(OrderedDict):
software = ''
if snapshot:
software = "{software} {version}".format(
software=snapshot.software.name, version=snapshot.version)
software=snapshot.software.name, version=snapshot.version
)
self['{} {} Size (MB)'.format(ctype, i)] = none2str(component.size)
component_actions = sorted(component.actions, key=lambda x: x.created)
erasures = [a for a in component_actions if a.type in [
'EraseBasic', 'EraseSectors', 'DataWipe']]
erasures = [
a
for a in component_actions
if a.type in ['EraseBasic', 'EraseSectors', 'DataWipe']
]
erasure = erasures[-1] if erasures else None
if not erasure:
self['Erasure {} {}'.format(ctype, i)] = none2str(component.hid)
serial_number = none2str(component.serial_number)
self['Erasure {} {} Serial Number'.format(
ctype, i)] = serial_number
self['Erasure {} {} Size (MB)'.format(
ctype, i)] = none2str(component.size)
self['Erasure {} {} Serial Number'.format(ctype, i)] = serial_number
self['Erasure {} {} Size (MB)'.format(ctype, i)] = none2str(component.size)
self['Erasure {} {} Software'.format(ctype, i)] = ''
self['Erasure {} {} Result'.format(ctype, i)] = ''
self['Erasure {} {} Certificate URL'.format(ctype, i)] = ''
@ -272,15 +278,13 @@ class DeviceRow(OrderedDict):
elif hasattr(erasure, 'type') and erasure.type == 'DataWipe':
self['Erasure {} {}'.format(ctype, i)] = none2str(component.hid)
serial_number = none2str(component.serial_number)
self['Erasure {} {} Serial Number'.format(
ctype, i)] = serial_number
self['Erasure {} {} Size (MB)'.format(
ctype, i)] = none2str(component.size)
self['Erasure {} {} Software'.format(
ctype, i)] = erasure.document.software
self['Erasure {} {} Serial Number'.format(ctype, i)] = serial_number
self['Erasure {} {} Size (MB)'.format(ctype, i)] = none2str(component.size)
self['Erasure {} {} Software'.format(ctype, i)] = erasure.document.software
self['Erasure {} {} Result'.format(ctype, i)] = get_result(erasure)
self['Erasure {} {} Certificate URL'.format(
ctype, i)] = erasure.document.url and erasure.document.url.to_text() or ''
self['Erasure {} {} Certificate URL'.format(ctype, i)] = (
erasure.document.url and erasure.document.url.to_text() or ''
)
self['Erasure {} {} Type'.format(ctype, i)] = ''
self['Erasure {} {} Method'.format(ctype, i)] = ''
self['Erasure {} {} Elapsed (hours)'.format(ctype, i)] = ''
@ -291,10 +295,8 @@ class DeviceRow(OrderedDict):
else:
self['Erasure {} {}'.format(ctype, i)] = none2str(component.hid)
serial_number = none2str(component.serial_number)
self['Erasure {} {} Serial Number'.format(
ctype, i)] = serial_number
self['Erasure {} {} Size (MB)'.format(
ctype, i)] = none2str(component.size)
self['Erasure {} {} Serial Number'.format(ctype, i)] = serial_number
self['Erasure {} {} Size (MB)'.format(ctype, i)] = none2str(component.size)
self['Erasure {} {} Software'.format(ctype, i)] = software
result = get_result(erasure)
@ -302,20 +304,16 @@ class DeviceRow(OrderedDict):
self['Erasure {} {} Certificate URL'.format(ctype, i)] = ''
self['Erasure {} {} Type'.format(ctype, i)] = erasure.type
self['Erasure {} {} Method'.format(ctype, i)] = erasure.method
self['Erasure {} {} Elapsed (hours)'.format(
ctype, i)] = format(erasure.elapsed)
self['Erasure {} {} Date'.format(
ctype, i)] = format(erasure.created)
self['Erasure {} {} Elapsed (hours)'.format(ctype, i)] = format(
erasure.elapsed
)
self['Erasure {} {} Date'.format(ctype, i)] = format(erasure.created)
steps = ','.join((format(x) for x in erasure.steps))
self['Erasure {} {} Steps'.format(ctype, i)] = steps
steps_start_time = ','.join(
(format(x.start_time) for x in erasure.steps))
self['Erasure {} {} Steps Start Time'.format(
ctype, i)] = steps_start_time
steps_end_time = ','.join((format(x.end_time)
for x in erasure.steps))
self['Erasure {} {} Steps End Time'.format(
ctype, i)] = steps_end_time
steps_start_time = ','.join((format(x.start_time) for x in erasure.steps))
self['Erasure {} {} Steps Start Time'.format(ctype, i)] = steps_start_time
steps_end_time = ','.join((format(x.end_time) for x in erasure.steps))
self['Erasure {} {} Steps End Time'.format(ctype, i)] = steps_end_time
benchmark = get_action(component, 'BenchmarkDataStorage')
if not benchmark:
@ -323,9 +321,11 @@ class DeviceRow(OrderedDict):
self['Benchmark {} {} Writing speed (MB/s)'.format(ctype, i)] = ''
else:
self['Benchmark {} {} Read Speed (MB/s)'.format(ctype, i)] = none2str(
benchmark.read_speed)
benchmark.read_speed
)
self['Benchmark {} {} Writing speed (MB/s)'.format(ctype, i)] = none2str(
benchmark.write_speed)
benchmark.write_speed
)
test_storage = get_action(component, 'TestDataStorage')
if not test_storage:
@ -339,14 +339,16 @@ class DeviceRow(OrderedDict):
self['Test {} {} Software'.format(ctype, i)] = software
self['Test {} {} Type'.format(ctype, i)] = test_storage.length.value
self['Test {} {} Result'.format(ctype, i)] = get_result(
test_storage)
self['Test {} {} Result'.format(ctype, i)] = get_result(test_storage)
self['Test {} {} Power cycle count'.format(ctype, i)] = none2str(
test_storage.power_cycle_count)
test_storage.power_cycle_count
)
self['Test {} {} Lifetime (days)'.format(ctype, i)] = none2str(
test_storage.lifetime)
test_storage.lifetime
)
self['Test {} {} Power on hours'.format(ctype, i)] = none2str(
test_storage.power_on_hours)
test_storage.power_on_hours
)
def get_graphic_card(self, ctype, i, component):
"""Particular fields for component GraphicCard."""
@ -400,38 +402,35 @@ class StockRow(OrderedDict):
def get_result(erasure):
""" For the csv is necessary simplify the message of results """
"""For the csv is necessary simplify the message of results"""
if hasattr(erasure, 'type') and erasure.type == 'DataWipe':
if erasure.document.success:
return 'Success'
return 'Failure'
type_of_results = {
Severity.Error: 'Failure',
Severity.Warning: 'Success with Warnings',
Severity.Notice: 'Success',
Severity.Info: 'Success'
}
Severity.Info: 'Success',
}
return type_of_results[erasure.severity]
def none2str(string):
""" convert none to empty str """
"""convert none to empty str"""
if string is None:
return ''
return format(string)
def get_action(component, action):
""" Filter one action from a component or return None """
"""Filter one action from a component or return None"""
result = [a for a in component.actions if a.type == action]
return result[-1] if result else None
class ActionRow(OrderedDict):
def __init__(self, allocate):
super().__init__()
# General information about allocates, deallocate and lives
@ -459,7 +458,6 @@ class ActionRow(OrderedDict):
class InternalStatsRow(OrderedDict):
def __init__(self, user, create, actions):
super().__init__()
# General information about all internal stats
@ -488,13 +486,7 @@ class InternalStatsRow(OrderedDict):
def count_actions(self):
for ac in self.actions:
self.is_snapshot(
self.is_deallocate(
self.is_live(
self.is_allocate(ac)
)
)
)
self.is_snapshot(self.is_deallocate(self.is_live(self.is_allocate(ac))))
def is_allocate(self, ac):
if ac.type == 'Allocate':
@ -531,9 +523,18 @@ class InternalStatsRow(OrderedDict):
self['Snapshot (Registers)'] += 1
def quarter(self, month):
q = {1: 'Q1', 2: 'Q1', 3: 'Q1',
4: 'Q2', 5: 'Q2', 6: 'Q2',
7: 'Q3', 8: 'Q3', 9: 'Q3',
10: 'Q4', 11: 'Q4', 12: 'Q4',
}
q = {
1: 'Q1',
2: 'Q1',
3: 'Q1',
4: 'Q2',
5: 'Q2',
6: 'Q2',
7: 'Q3',
8: 'Q3',
9: 'Q3',
10: 'Q4',
11: 'Q4',
12: 'Q4',
}
return q[int(month)]

View file

@ -0,0 +1 @@
868e59911be73a941938644143d81f21a2fdbe82ea0841493c2d9fc04701e058334af5fecd69c1a1525ebd5c8c17ac3f49d8ecc53bbfc8a018f169be48fe79d6 USODY_2022.5.2-beta.iso

View file

@ -95,7 +95,7 @@
<li>
<a class="dropdown-item d-flex align-items-center" href="{{ url_for('workbench.settings') }}">
<i class="bi bi-tools"></i>
<span>Workbench Settings</span>
<span>Workbench</span>
</a>
</li>
<li>

View file

@ -29,8 +29,10 @@
<th scope="col">Snapshot id</th>
<th scope="col">Version</th>
<th scope="col">DHID</th>
<th scope="col">System UUID</th>
<th scope="col">Status</th>
<th scope="col" data-type="date" data-format="DD-MM-YYYY">Time</th>
<th scope="col"></th>
</tr>
</thead>
<tbody>
@ -60,10 +62,20 @@
</a>
{% endif %}
</td>
<td>
{{ snap.system_uuid }}
</td>
<td>
{{ snap.status }}
</td>
<td>{{ snap.created.strftime('%H:%M %d-%m-%Y') }}</td>
<td>
{% if snap.snapshot_uuid %}
<a href="{{ url_for('inventory.export', export_id='snapshot') }}?id={{ snap.snapshot_uuid }}" target="_blank">
<i class="bi bi-box-arrow-up-right"></i>
</a>
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>

View file

@ -28,6 +28,20 @@
<p class="small">Download the settings only for register devices.</p>
</div>
</div>
{% if iso %}
<div class="row pt-3">
<div class="col-5">
<a href="/static/iso/{{ iso }}" class="btn btn-primary">{{ iso }}</a>
</div>
<div class="col">
<p class="small">Download ISO workbench and burn it one one usb stick.</p>
<p class="small">
Download Checksum: <a style="color: #993365;" href="/static/iso/SHA512SUMS">SHA512SUMS</a> |
<a style="color: #993365;" href="/static/iso/SHA512SUMS.sign">Signature</a>
</p>
</div>
</div>
{% endif %}
</div>
</div>

View file

@ -1,4 +1,6 @@
import os
import time
from pathlib import Path
import flask
from flask import Blueprint
@ -22,6 +24,7 @@ class SettingsView(GenericMixin):
def dispatch_request(self):
self.get_context()
self.get_iso()
self.context.update(
{
'page_title': self.page_title,
@ -34,6 +37,19 @@ class SettingsView(GenericMixin):
return flask.render_template(self.template_name, **self.context)
def get_iso(self):
path = Path(__file__).parent.parent
files = [
f for f in os.listdir(f'{path}/static/iso/') if f[-3:].lower() == 'iso'
]
self.context['iso'] = ''
self.context['iso_sha'] = ''
if files:
self.context['iso'] = files[0]
self.context['iso_sha'] = 'aaa'
def download(self):
url = "https://{}/api/inventory/".format(app.config['HOST'])
self.wbContext = {

View file

@ -42,6 +42,7 @@ sortedcontainers==2.1.0
tqdm==4.32.2
python-decouple==3.3
python-dotenv==0.14.0
selenium==4.1.5
pyjwt==2.4.0
pint==0.9
py-dmidecode==0.1.0

36
scripts/extract_uuid.py Normal file
View file

@ -0,0 +1,36 @@
import json
import sys
def get_old_smbios_version(snapshot):
capabilities = snapshot.get('debug', {}).get('lshw', {}).get('capabilities', {})
for x in capabilities.values():
if "SMBIOS version" in x:
e = x.split("SMBIOS version ")[1].split(".")
if int(e[0]) < 3 and int(e[1]) < 6:
return True
return False
def get_uuid(snapshot):
return (
snapshot.get('debug', {}).get('lshw', {}).get('configuration', {}).get('uuid')
)
def main():
_file = sys.argv[1]
with open(_file) as file_snapshot:
snapshot = json.loads(file_snapshot.read())
if get_old_smbios_version(snapshot):
return
system_uuid = get_uuid(snapshot)
if system_uuid:
print("{};{}".format(system_uuid, snapshot['uuid']))
if __name__ == '__main__':
main()

1
scripts/extract_uuids.sh Normal file
View file

@ -0,0 +1 @@
for i in `ls ../snapshots/*/*.json`; do python examples/extract_uuid.py $i; done > system_uuids.csv

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,172 @@
{
"components": [
{
"size": 10.030411318500475,
"technology": "LCD",
"resolutionWidth": 1024,
"model": "AUO LCD Monitor",
"actions": [],
"type": "Display",
"refreshRate": 60,
"productionDate": "2009-01-04T00:00:00",
"manufacturer": "AUO \"AUO\"",
"serialNumber": null,
"resolutionHeight": 600
},
{
"generation": null,
"actions": [
{
"rate": 164.4981,
"type": "BenchmarkProcessorSysbench",
"elapsed": 165
},
{
"rate": 6650.48,
"type": "BenchmarkProcessor",
"elapsed": 0
}
],
"speed": 1.0,
"cores": 1,
"model": "Intel Atom CPU N450 @ 1.66GHz",
"address": 64,
"type": "Processor",
"threads": 2,
"manufacturer": "Intel Corp.",
"serialNumber": null,
"brand": "Atom"
},
{
"memory": null,
"model": "Atom Processor D4xx/D5xx/N4xx/N5xx Integrated Graphics Controller",
"actions": [],
"type": "GraphicCard",
"manufacturer": "Intel Corporation",
"serialNumber": null
},
{
"type": "SoundCard",
"actions": [],
"manufacturer": "Intel Corporation",
"serialNumber": null,
"model": "NM10/ICH7 Family High Definition Audio Controller"
},
{
"type": "SoundCard",
"actions": [],
"manufacturer": "XPA970VW0",
"serialNumber": null,
"model": "1.3M WebCam"
},
{
"size": 1024.0,
"actions": [],
"format": "SODIMM",
"model": "48594D503131325336344350362D53362020",
"interface": "DDR2",
"type": "RamModule",
"manufacturer": "Hynix Semiconductor",
"serialNumber": "4F43487B",
"speed": 667.0
},
{
"size": 160041.88569599998,
"variant": "1A01",
"actions": [
{
"type": "EraseBasic",
"steps": [
{
"type": "StepRandom",
"endTime": "2019-10-23T08:35:31.400587+00:00",
"severity": "Info",
"startTime": "2019-10-23T07:49:54.410830+00:00"
}
],
"endTime": "2019-10-23T08:35:31.400988+00:00",
"severity": "Info",
"startTime": "2019-10-23T07:49:54.410193+00:00"
},
{
"elapsed": 22,
"writeSpeed": 17.3,
"readSpeed": 41.6,
"type": "BenchmarkDataStorage"
},
{
"status": "Completed without error",
"reallocatedSectorCount": 0,
"currentPendingSectorCount": 0,
"assessment": true,
"severity": "Info",
"offlineUncorrectable": 0,
"lifetime": 4692,
"type": "TestDataStorage",
"length": "Short",
"elapsed": 118,
"powerCycleCount": 5293
}
],
"model": "WDC WD1600BEVT-2",
"interface": "ATA",
"type": "HardDrive",
"manufacturer": "Western Digital",
"serialNumber": "WD-WX11A80W7430"
},
{
"variant": "c1",
"actions": [],
"speed": 100.0,
"model": "AR8152 v1.1 Fast Ethernet",
"wireless": false,
"type": "NetworkAdapter",
"serialNumber": "88:ae:1d:a6:f3:d0",
"manufacturer": "Qualcomm Atheros"
},
{
"ramMaxSize": 4,
"slots": 1,
"model": "AOHAPPY",
"pcmcia": 0,
"type": "Motherboard",
"version": "V3.05(DDR2)",
"ramSlots": 2,
"serialNumber": "Base Board Serial Number",
"manufacturer": "Acer",
"serial": 1,
"actions": [],
"biosDate": "2010-08-12T00:00:00",
"firewire": 0,
"usb": 5
}
],
"software": "Workbench",
"device": {
"sku": null,
"chassis": "Netbook",
"actions": [
{
"type": "StressTest",
"elapsed": 60,
"severity": "Info"
},
{
"rate": 19.2726,
"type": "BenchmarkRamSysbench",
"elapsed": 19
}
],
"model": "AOHAPPY",
"type": "Laptop",
"version": "V3.05",
"manufacturer": "Acer",
"serialNumber": "LUSEA0D010038879A01601"
},
"uuid": "0973fda0-589a-11eb-ae93-0242ac130002",
"type": "Snapshot",
"version": "11.0b9",
"endTime": "2019-10-23T07:43:13.625104+00:00",
"elapsed": 3138,
"closed": true
}

View file

@ -0,0 +1,189 @@
{
"components": [
{
"size": 10.030411318500475,
"technology": "LCD",
"resolutionWidth": 1024,
"model": "AUO LCD Monitor",
"actions": [],
"type": "Display",
"refreshRate": 60,
"productionDate": "2009-01-04T00:00:00",
"manufacturer": "AUO \"AUO\"",
"serialNumber": null,
"resolutionHeight": 600
},
{
"generation": null,
"actions": [
{
"rate": 164.4981,
"type": "BenchmarkProcessorSysbench",
"elapsed": 165
},
{
"rate": 6650.48,
"type": "BenchmarkProcessor",
"elapsed": 0
}
],
"speed": 1.0,
"cores": 1,
"model": "Intel Atom CPU N450 @ 1.66GHz",
"address": 64,
"type": "Processor",
"threads": 2,
"manufacturer": "Intel Corp.",
"serialNumber": null,
"brand": "Atom"
},
{
"memory": null,
"model": "Atom Processor D4xx/D5xx/N4xx/N5xx Integrated Graphics Controller",
"actions": [],
"type": "GraphicCard",
"manufacturer": "Intel Corporation",
"serialNumber": null
},
{
"type": "SoundCard",
"actions": [],
"manufacturer": "Intel Corporation",
"serialNumber": null,
"model": "NM10/ICH7 Family High Definition Audio Controller"
},
{
"type": "SoundCard",
"actions": [],
"manufacturer": "XPA970VW0",
"serialNumber": null,
"model": "1.3M WebCam"
},
{
"size": 1024.0,
"actions": [],
"format": "SODIMM",
"model": "48594D503131325336344350362D53362020",
"interface": "DDR2",
"type": "RamModule",
"manufacturer": "Hynix Semiconductor",
"serialNumber": "4F43487B",
"speed": 667.0
},
{
"size": 160041.88569599998,
"variant": "1A01",
"actions": [
{
"type": "EraseBasic",
"steps": [
{
"type": "StepRandom",
"endTime": "2019-10-23T08:35:31.400587+00:00",
"severity": "Info",
"startTime": "2019-10-23T07:49:54.410830+00:00"
}
],
"endTime": "2019-10-23T08:35:31.400988+00:00",
"severity": "Info",
"startTime": "2019-10-23T07:49:54.410193+00:00"
},
{
"elapsed": 22,
"writeSpeed": 17.3,
"readSpeed": 41.6,
"type": "BenchmarkDataStorage"
},
{
"status": "Completed without error",
"reallocatedSectorCount": 0,
"currentPendingSectorCount": 0,
"assessment": true,
"severity": "Info",
"offlineUncorrectable": 0,
"lifetime": 4692,
"type": "TestDataStorage",
"length": "Short",
"elapsed": 118,
"powerCycleCount": 5293
}
],
"model": "WDC WD1600BEVT-2",
"interface": "ATA",
"type": "HardDrive",
"manufacturer": "Western Digital",
"serialNumber": "WD-WX11A80W7430"
},
{
"variant": "c1",
"actions": [],
"speed": 100.0,
"model": "AR8152 v1.1 Fast Ethernet",
"wireless": false,
"type": "NetworkAdapter",
"serialNumber": "88:ae:1d:a6:f3:d0",
"manufacturer": "Qualcomm Atheros"
},
{
"ramMaxSize": 4,
"slots": 1,
"model": "AOHAPPY",
"pcmcia": 0,
"type": "Motherboard",
"version": "V3.05(DDR2)",
"ramSlots": 2,
"serialNumber": "Base Board Serial Number",
"manufacturer": "Acer",
"serial": 1,
"actions": [],
"biosDate": "2010-08-12T00:00:00",
"firewire": 0,
"usb": 5
}
],
"software": "Workbench",
"device": {
"sku": null,
"chassis": "Netbook",
"actions": [
{
"type": "StressTest",
"elapsed": 60,
"severity": "Info"
},
{
"rate": 19.2726,
"type": "BenchmarkRamSysbench",
"elapsed": 19
}
],
"model": "AOHAPPY",
"type": "Laptop",
"version": "V3.05",
"manufacturer": "Acer",
"serialNumber": "LUSEA0D010038879A01601"
},
"debug": {
"lshw": {
"capabilities": {
"dmi-2.5": "DMI version 2.5",
"smbios-2.5": "SMBIOS version 2.5",
"smp": "Symmetric Multi-Processing",
"vsyscall32": "32-bit processes"
},
"configuration": {
"boot": "normal",
"chassis": "notebook",
"family": "Intel_Mobile",
"sku": "NetTopSku",
"uuid": "364ee69c-9c82-9cb1-2111-88ae1da6f3d0"
}
}
},
"uuid": "0973fda0-589a-11eb-ae93-0242ac130002",
"type": "Snapshot",
"version": "11.0b9",
"endTime": "2019-10-23T07:43:13.625104+00:00",
"elapsed": 3138,
"closed": true
}

View file

@ -130,7 +130,6 @@ def test_physical_properties():
'model': 'foo',
'receiver_id': None,
'serial_number': 'foo-bar',
'uuid': None,
'transfer_state': TransferState.Initial
}

View file

@ -1385,3 +1385,18 @@ def test_export_lots(user3: UserClientFlask):
assert fixture_csv[0] == export_csv[0], 'Headers are not equal'
assert fixture_csv[1][1:] == export_csv[1][1:], 'Computer information are not equal'
UUID(export_csv[1][0])
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_export_snapshot_json(user3: UserClientFlask):
file_name = 'real-eee-1001pxd.snapshot.12.json'
snap = create_device(user3, file_name)
snapshot = conftest.yaml2json(file_name.split(".json")[0])
snapshot = json.dumps(snapshot)
uri = "/inventory/export/snapshot/?id={}".format(snap.uuid)
body, status = user3.get(uri)
assert status == '200 OK'
assert body == snapshot

104
tests/test_selenium.py Normal file
View file

@ -0,0 +1,104 @@
# Generated by Selenium IDE
import time
from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
class TestSelenium:
def setup_method(self, method):
options = Options()
options.add_argument("--headless")
self.driver = webdriver.Firefox(
options=options, executable_path=r'./bin/geckodriver'
)
self.vars = {}
def teardown_method(self, method):
self.driver.quit()
def test_selenium(self):
# setup
self.driver.get("http://localhost:5000/login/")
self.driver.set_window_size(1920, 1063)
# login
self.driver.find_element(By.ID, "yourEmail").click()
self.driver.implicitly_wait(3)
self.driver.find_element(By.ID, "yourPassword").send_keys("1234")
self.driver.find_element(By.ID, "yourEmail").send_keys("user@dhub.com")
self.driver.find_element(By.CSS_SELECTOR, ".btn").click()
self.driver.implicitly_wait(3)
# select the first lot and get the ID of it
self.driver.find_element(By.LINK_TEXT, "Temporary Lots").click()
self.driver.implicitly_wait(3)
self.driver.find_element(
By.CSS_SELECTOR, "#temporal-lots-nav > li:nth-child(2) span"
).click()
self.driver.implicitly_wait(3)
lot_id = self.driver.current_url.split("/")[5]
# go to unassigned
self.driver.find_element(By.CSS_SELECTOR, ".nav-item:nth-child(5) span").click()
self.driver.implicitly_wait(3)
# select the first device
self.driver.find_element(
By.CSS_SELECTOR, "tr:nth-child(1) .deviceSelect"
).click()
self.driver.implicitly_wait(3)
# add to new selenium_lot
self.driver.find_element(By.ID, "btnLots").click()
self.driver.implicitly_wait(3)
self.driver.find_element(By.ID, lot_id).click()
self.driver.implicitly_wait(3)
self.driver.find_element(By.ID, "ApplyDeviceLots").click()
time.sleep(3)
element = self.driver.find_element(By.ID, "ApplyDeviceLots")
time.sleep(3)
actions = ActionChains(self.driver)
time.sleep(3)
actions.move_to_element(element).perform()
time.sleep(3)
element = self.driver.find_element(By.CSS_SELECTOR, "body")
time.sleep(3)
actions = ActionChains(self.driver)
time.sleep(3)
# actions.move_to_element(element, 0, 0).perform()
actions.move_to_element(element).perform()
time.sleep(3)
self.driver.find_element(By.ID, "SaveAllActions").click()
time.sleep(3)
# go to selenium lot
self.driver.find_element(By.LINK_TEXT, "Temporary Lots").click()
self.driver.implicitly_wait(3)
self.driver.find_element(
By.CSS_SELECTOR, "#temporal-lots-nav > li:nth-child(2) span"
).click()
self.driver.implicitly_wait(3)
# select the first device
self.driver.find_element(By.CSS_SELECTOR, ".deviceSelect").click()
# remove to new selenium_lot
self.driver.find_element(By.ID, "btnLots").click()
self.driver.implicitly_wait(3)
self.driver.find_element(By.ID, lot_id).click()
self.driver.implicitly_wait(3)
self.driver.find_element(By.ID, "ApplyDeviceLots").click()
time.sleep(3)
self.driver.find_element(By.ID, "SaveAllActions").click()
time.sleep(3)
self.driver.find_element(By.CSS_SELECTOR, ".nav-item:nth-child(5) span").click()
self.driver.implicitly_wait(3)
# logout
self.driver.find_element(By.CSS_SELECTOR, ".d-md-block:nth-child(2)").click()
self.driver.implicitly_wait(3)
self.driver.find_element(By.LINK_TEXT, "Sign Out").click()

View file

@ -190,10 +190,12 @@ def test_snapshot_power_on_hours(user: UserClient):
)
errors = SnapshotsLog.query.filter().all()
snap_log = errors[0]
assert str(snap_log.snapshot.uuid) == snap['uuid']
assert len(errors) == 1
assert errors[0].description == 'Ok'
snap_log = errors[1]
assert len(errors) == 2
assert str(errors[0].snapshot_uuid) == snap['uuid']
assert str(errors[1].snapshot.uuid) == snap['uuid']
assert errors[0].description == 'There is not uuid'
assert errors[1].description == 'Ok'
@pytest.mark.mvp
@ -784,11 +786,13 @@ def test_backup_snapshot_with_errors(app: Devicehub, user: UserClient):
response = user.post(res=Snapshot, data=json_encode(snapshot_no_hid))
errors = SnapshotsLog.query.filter().all()
snap_log = errors[0]
snap_log = errors[1]
assert snap_log.description == "'BenchmarkProcessorr'"
assert errors[0].description == 'There is not uuid'
assert snap_log.version == "11.0b9"
assert str(snap_log.snapshot_uuid) == '9a3e7485-fdd0-47ce-bcc7-65c55226b598'
assert len(errors) == 1
assert str(errors[0].snapshot_uuid) == '9a3e7485-fdd0-47ce-bcc7-65c55226b598'
assert len(errors) == 2
files = [x for x in os.listdir(path_dir_base) if uuid in x]
if files:

610
tests/test_system_uuid.py Normal file
View file

@ -0,0 +1,610 @@
import json
from io import BytesIO
import pytest
from flask_wtf.csrf import generate_csrf
from ereuse_devicehub.client import UserClient, UserClientFlask
from ereuse_devicehub.resources.action.models import Snapshot
from ereuse_devicehub.resources.device.models import Computer
from tests import conftest
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_wb11_form(user3: UserClientFlask):
uri = '/inventory/upload-snapshot/'
file_name = 'system_uuid1.json'
user3.get(uri)
snapshot = conftest.file_json(file_name)
b_snapshot = bytes(json.dumps(snapshot), 'utf-8')
file_snap = (BytesIO(b_snapshot), file_name)
data = {
'snapshot': file_snap,
'csrf_token': generate_csrf(),
}
user3.post(uri, data=data, content_type="multipart/form-data")
db_snapthot = Snapshot.query.one()
device = db_snapthot.device
assert device.hid == 'laptop-toshiba-satellite_l655-2b335208q-00:26:6c:ae:ee:78'
assert str(device.system_uuid) == 'f0dc6a7f-c23f-e011-b5d0-00266caeee78'
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_wb11_api(user: UserClient):
file_name = 'system_uuid1.json'
snapshot_11 = conftest.file_json(file_name)
user.post(snapshot_11, res=Snapshot)
db_snapthot = Snapshot.query.one()
device = db_snapthot.device
assert device.hid == 'laptop-toshiba-satellite_l655-2b335208q-00:26:6c:ae:ee:78'
assert str(device.system_uuid) == 'f0dc6a7f-c23f-e011-b5d0-00266caeee78'
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_wbLite_form(user3: UserClientFlask):
uri = '/inventory/upload-snapshot/'
file_name = 'system_uuid2.json'
user3.get(uri)
snapshot = conftest.file_json(file_name)
b_snapshot = bytes(json.dumps(snapshot), 'utf-8')
file_snap = (BytesIO(b_snapshot), file_name)
data = {
'snapshot': file_snap,
'csrf_token': generate_csrf(),
}
user3.post(uri, data=data, content_type="multipart/form-data")
db_snapthot = Snapshot.query.one()
device = db_snapthot.device
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert str(device.system_uuid) == '9ce64e36-829c-b19c-2111-88ae1da6f3d0'
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_wbLite_api(user: UserClient):
snapshot_lite = conftest.file_json('system_uuid2.json')
user.post(snapshot_lite, uri="/api/inventory/")
db_snapthot = Snapshot.query.one()
device = db_snapthot.device
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert str(device.system_uuid) == '9ce64e36-829c-b19c-2111-88ae1da6f3d0'
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_wb11_to_wb11_with_uuid_api(user: UserClient):
# insert computer with wb11 with hid and without uuid, (old version)
snapshot_11 = conftest.file_json('system_uuid3.json')
user.post(snapshot_11, res=Snapshot)
db_snapthot = Snapshot.query.one()
device = db_snapthot.device
assert Computer.query.count() == 1
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert device.system_uuid is None
# insert the same computer with wb11 with hid and with uuid, (new version)
snapshot_lite = conftest.file_json('system_uuid2.json')
snapshot_11['debug'] = {'lshw': snapshot_lite['data']['lshw']}
snapshot_11['uuid'] = '0973fda0-589a-11eb-ae93-0242ac130003'
user.post(snapshot_11, res=Snapshot)
assert (
snapshot_11['debug']['lshw']['configuration']['uuid']
== '364ee69c-9c82-9cb1-2111-88ae1da6f3d0'
)
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert str(device.system_uuid) == '9ce64e36-829c-b19c-2111-88ae1da6f3d0'
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_wb11_with_uuid_to_wb11_api(user: UserClient):
# insert computer with wb11 with hid and without uuid, (old version)
snapshot_11 = conftest.file_json('system_uuid3.json')
snapshot_lite = conftest.file_json('system_uuid2.json')
snapshot_11['debug'] = {'lshw': snapshot_lite['data']['lshw']}
snapshot_11['uuid'] = '0973fda0-589a-11eb-ae93-0242ac130003'
user.post(snapshot_11, res=Snapshot)
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert str(device.system_uuid) == '9ce64e36-829c-b19c-2111-88ae1da6f3d0'
# insert the same computer with wb11 with hid and with uuid, (new version)
snapshot_11 = conftest.file_json('system_uuid3.json')
assert 'debug' not in snapshot_11
user.post(snapshot_11, res=Snapshot)
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert str(device.system_uuid) == '9ce64e36-829c-b19c-2111-88ae1da6f3d0'
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_wb11_with_uuid_to_wb11_without_hid_api(user: UserClient):
# insert computer with wb11 with hid and without uuid, (old version)
snapshot_11 = conftest.file_json('system_uuid3.json')
snapshot_lite = conftest.file_json('system_uuid2.json')
snapshot_11['debug'] = {'lshw': snapshot_lite['data']['lshw']}
snapshot_11['uuid'] = '0973fda0-589a-11eb-ae93-0242ac130003'
user.post(snapshot_11, res=Snapshot)
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert str(device.system_uuid) == '9ce64e36-829c-b19c-2111-88ae1da6f3d0'
# insert the same computer with wb11 with hid and with uuid, (new version)
snapshot_11 = conftest.file_json('system_uuid3.json')
components = [x for x in snapshot_11['components'] if x['type'] != 'NetworkAdapter']
snapshot_11['components'] = components
snapshot_11['debug'] = {'lshw': snapshot_lite['data']['lshw']}
user.post(snapshot_11, res=Snapshot)
assert Computer.query.count() == 1
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_wb11_to_wb11_with_uuid_form(user3: UserClientFlask):
# insert computer with wb11 with hid and without uuid, (old version)
uri = '/inventory/upload-snapshot/'
user3.get(uri)
file_name = 'system_uuid3.json'
snapshot = conftest.file_json(file_name)
b_snapshot = bytes(json.dumps(snapshot), 'utf-8')
file_snap = (BytesIO(b_snapshot), file_name)
data = {
'snapshot': file_snap,
'csrf_token': generate_csrf(),
}
user3.post(uri, data=data, content_type="multipart/form-data")
db_snapthot = Snapshot.query.one()
device = db_snapthot.device
assert Computer.query.count() == 1
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert device.system_uuid is None
# insert the same computer with wb11 with hid and with uuid, (new version)
snapshot_lite = conftest.file_json('system_uuid2.json')
snapshot['debug'] = {'lshw': snapshot_lite['data']['lshw']}
snapshot['uuid'] = '0973fda0-589a-11eb-ae93-0242ac130003'
b_snapshot = bytes(json.dumps(snapshot), 'utf-8')
file_snap = (BytesIO(b_snapshot), file_name)
data = {
'snapshot': file_snap,
'csrf_token': generate_csrf(),
}
user3.post(uri, data=data, content_type="multipart/form-data")
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert str(device.system_uuid) == '9ce64e36-829c-b19c-2111-88ae1da6f3d0'
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_wb11_with_uuid_to_wb11_form(user3: UserClientFlask):
# insert computer with wb11 with hid and without uuid, (old version)
uri = '/inventory/upload-snapshot/'
user3.get(uri)
file_name = 'system_uuid3.json'
snapshot_lite = conftest.file_json('system_uuid2.json')
snapshot = conftest.file_json(file_name)
snapshot['debug'] = {'lshw': snapshot_lite['data']['lshw']}
snapshot['uuid'] = '0973fda0-589a-11eb-ae93-0242ac130003'
b_snapshot = bytes(json.dumps(snapshot), 'utf-8')
file_snap = (BytesIO(b_snapshot), file_name)
data = {
'snapshot': file_snap,
'csrf_token': generate_csrf(),
}
user3.post(uri, data=data, content_type="multipart/form-data")
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert str(device.system_uuid) == '9ce64e36-829c-b19c-2111-88ae1da6f3d0'
# insert the same computer with wb11 with hid and with uuid, (new version)
snapshot = conftest.file_json('system_uuid3.json')
assert 'debug' not in snapshot
b_snapshot = bytes(json.dumps(snapshot), 'utf-8')
file_snap = (BytesIO(b_snapshot), file_name)
data = {
'snapshot': file_snap,
'csrf_token': generate_csrf(),
}
user3.post(uri, data=data, content_type="multipart/form-data")
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert str(device.system_uuid) == '9ce64e36-829c-b19c-2111-88ae1da6f3d0'
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_wb11_with_uuid_to_wb11_without_hid_form(user3: UserClientFlask):
uri = '/inventory/upload-snapshot/'
user3.get(uri)
# insert computer with wb11 with hid and without uuid, (old version)
snapshot_lite = conftest.file_json('system_uuid2.json')
file_name = 'system_uuid3.json'
snapshot_11 = conftest.file_json(file_name)
snapshot_11['debug'] = {'lshw': snapshot_lite['data']['lshw']}
snapshot_11['uuid'] = '0973fda0-589a-11eb-ae93-0242ac130003'
b_snapshot = bytes(json.dumps(snapshot_11), 'utf-8')
file_snap = (BytesIO(b_snapshot), file_name)
data = {
'snapshot': file_snap,
'csrf_token': generate_csrf(),
}
user3.post(uri, data=data, content_type="multipart/form-data")
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert str(device.system_uuid) == '9ce64e36-829c-b19c-2111-88ae1da6f3d0'
# insert the same computer with wb11 with hid and with uuid, (new version)
snapshot_11 = conftest.file_json('system_uuid3.json')
components = [x for x in snapshot_11['components'] if x['type'] != 'NetworkAdapter']
snapshot_11['components'] = components
snapshot_11['debug'] = {'lshw': snapshot_lite['data']['lshw']}
b_snapshot = bytes(json.dumps(snapshot_11), 'utf-8')
file_snap = (BytesIO(b_snapshot), file_name)
data = {
'snapshot': file_snap,
'csrf_token': generate_csrf(),
}
user3.post(uri, data=data, content_type="multipart/form-data")
assert Computer.query.count() == 1
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_wb11_to_wblite_api(user: UserClient):
# insert computer with wb11 with hid and without uuid, (old version)
snapshot_11 = conftest.file_json('system_uuid3.json')
user.post(snapshot_11, res=Snapshot)
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert device.system_uuid is None
snapshot_lite = conftest.file_json('system_uuid2.json')
user.post(snapshot_lite, uri="/api/inventory/")
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert str(device.system_uuid) == '9ce64e36-829c-b19c-2111-88ae1da6f3d0'
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_wblite_to_wb11_api(user: UserClient):
snapshot_lite = conftest.file_json('system_uuid2.json')
user.post(snapshot_lite, uri="/api/inventory/")
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert str(device.system_uuid) == '9ce64e36-829c-b19c-2111-88ae1da6f3d0'
snapshot_11 = conftest.file_json('system_uuid3.json')
user.post(snapshot_11, res=Snapshot)
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert str(device.system_uuid) == '9ce64e36-829c-b19c-2111-88ae1da6f3d0'
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_wb11_to_wblite_form(user3: UserClientFlask):
uri = '/inventory/upload-snapshot/'
user3.get(uri)
file_name = 'system_uuid3.json'
snapshot_11 = conftest.file_json(file_name)
b_snapshot = bytes(json.dumps(snapshot_11), 'utf-8')
file_snap = (BytesIO(b_snapshot), file_name)
data = {
'snapshot': file_snap,
'csrf_token': generate_csrf(),
}
user3.post(uri, data=data, content_type="multipart/form-data")
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert device.system_uuid is None
file_name = 'system_uuid2.json'
snapshot_lite = conftest.file_json(file_name)
b_snapshot = bytes(json.dumps(snapshot_lite), 'utf-8')
file_snap = (BytesIO(b_snapshot), file_name)
data = {
'snapshot': file_snap,
'csrf_token': generate_csrf(),
}
user3.post(uri, data=data, content_type="multipart/form-data")
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert str(device.system_uuid) == '9ce64e36-829c-b19c-2111-88ae1da6f3d0'
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_wblite_to_wb11_form(user3: UserClientFlask):
uri = '/inventory/upload-snapshot/'
user3.get(uri)
file_name = 'system_uuid2.json'
snapshot_lite = conftest.file_json(file_name)
b_snapshot = bytes(json.dumps(snapshot_lite), 'utf-8')
file_snap = (BytesIO(b_snapshot), file_name)
data = {
'snapshot': file_snap,
'csrf_token': generate_csrf(),
}
user3.post(uri, data=data, content_type="multipart/form-data")
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert str(device.system_uuid) == '9ce64e36-829c-b19c-2111-88ae1da6f3d0'
file_name = 'system_uuid3.json'
snapshot_11 = conftest.file_json(file_name)
b_snapshot = bytes(json.dumps(snapshot_11), 'utf-8')
file_snap = (BytesIO(b_snapshot), file_name)
data = {
'snapshot': file_snap,
'csrf_token': generate_csrf(),
}
user3.post(uri, data=data, content_type="multipart/form-data")
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert str(device.system_uuid) == '9ce64e36-829c-b19c-2111-88ae1da6f3d0'
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_wblite_to_wblite_api(user: UserClient):
snapshot_lite = conftest.file_json('system_uuid2.json')
user.post(snapshot_lite, uri="/api/inventory/")
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert str(device.system_uuid) == '9ce64e36-829c-b19c-2111-88ae1da6f3d0'
snapshot_lite = conftest.file_json('system_uuid2.json')
snapshot_lite['uuid'] = '0973fda0-589a-11eb-ae93-0242ac130003'
user.post(snapshot_lite, uri="/api/inventory/")
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert str(device.system_uuid) == '9ce64e36-829c-b19c-2111-88ae1da6f3d0'
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_wblite_to_wblite_form(user3: UserClientFlask):
uri = '/inventory/upload-snapshot/'
user3.get(uri)
file_name = 'system_uuid2.json'
snapshot_lite = conftest.file_json(file_name)
b_snapshot = bytes(json.dumps(snapshot_lite), 'utf-8')
file_snap = (BytesIO(b_snapshot), file_name)
data = {
'snapshot': file_snap,
'csrf_token': generate_csrf(),
}
user3.post(uri, data=data, content_type="multipart/form-data")
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert str(device.system_uuid) == '9ce64e36-829c-b19c-2111-88ae1da6f3d0'
file_name = 'system_uuid2.json'
snapshot_lite = conftest.file_json(file_name)
snapshot_lite['uuid'] = '0973fda0-589a-11eb-ae93-0242ac130003'
b_snapshot = bytes(json.dumps(snapshot_lite), 'utf-8')
file_snap = (BytesIO(b_snapshot), file_name)
data = {
'snapshot': file_snap,
'csrf_token': generate_csrf(),
}
user3.post(uri, data=data, content_type="multipart/form-data")
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert str(device.system_uuid) == '9ce64e36-829c-b19c-2111-88ae1da6f3d0'
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_wb11_to_wb11_duplicity_api(user: UserClient):
# insert computer with wb11 with hid and without uuid, (old version)
snapshot_11 = conftest.file_json('system_uuid3.json')
user.post(snapshot_11, res=Snapshot)
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert device.system_uuid is None
snapshot_11 = conftest.file_json('system_uuid3.json')
snapshot_11['uuid'] = '0973fda0-589a-11eb-ae93-0242ac130003'
components = [x for x in snapshot_11['components'] if x['type'] != 'NetworkAdapter']
snapshot_11['components'] = components
user.post(snapshot_11, res=Snapshot)
assert Computer.query.count() == 2
for c in Computer.query.all():
assert 'laptop-acer-aohappy-lusea0d010038879a01601' in c.hid
assert c.system_uuid is None
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_wb11_to_wb11_duplicity_form(user3: UserClientFlask):
uri = '/inventory/upload-snapshot/'
user3.get(uri)
file_name = 'system_uuid3.json'
snapshot_11 = conftest.file_json(file_name)
b_snapshot = bytes(json.dumps(snapshot_11), 'utf-8')
file_snap = (BytesIO(b_snapshot), file_name)
data = {
'snapshot': file_snap,
'csrf_token': generate_csrf(),
}
user3.post(uri, data=data, content_type="multipart/form-data")
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert device.system_uuid is None
snapshot_11 = conftest.file_json('system_uuid3.json')
snapshot_11['uuid'] = '0973fda0-589a-11eb-ae93-0242ac130003'
components = [x for x in snapshot_11['components'] if x['type'] != 'NetworkAdapter']
snapshot_11['components'] = components
b_snapshot = bytes(json.dumps(snapshot_11), 'utf-8')
file_snap = (BytesIO(b_snapshot), file_name)
data = {
'snapshot': file_snap,
'csrf_token': generate_csrf(),
}
user3.post(uri, data=data, content_type="multipart/form-data")
assert Computer.query.count() == 2
for c in Computer.query.all():
assert 'laptop-acer-aohappy-lusea0d010038879a01601' in c.hid
assert c.system_uuid is None
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_wb11_smbios_2_5_api(user: UserClient):
# insert computer with wb11 with hid and without uuid, (old version)
snapshot_11 = conftest.file_json('system_uuid4.json')
user.post(snapshot_11, res=Snapshot)
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert device.system_uuid is None
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_wb11_smbios_2_5_form(user3: UserClientFlask):
uri = '/inventory/upload-snapshot/'
user3.get(uri)
file_name = 'system_uuid4.json'
snapshot_11 = conftest.file_json(file_name)
b_snapshot = bytes(json.dumps(snapshot_11), 'utf-8')
file_snap = (BytesIO(b_snapshot), file_name)
data = {
'snapshot': file_snap,
'csrf_token': generate_csrf(),
}
user3.post(uri, data=data, content_type="multipart/form-data")
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert device.system_uuid is None
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_wblite_smbios_2_5_api(user: UserClient):
# insert computer with wb11 with hid and without uuid, (old version)
snapshot_lite = conftest.file_json('system_uuid2.json')
snapshot_lite['data']['lshw']['capabilities']['smbios-3.0'] = 'SMBIOS version 2.5'
user.post(snapshot_lite, uri="/api/inventory/")
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert str(device.system_uuid) == '9ce64e36-829c-b19c-2111-88ae1da6f3d0'
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_wblite_smbios_2_5_form(user3: UserClientFlask):
uri = '/inventory/upload-snapshot/'
user3.get(uri)
file_name = 'system_uuid2.json'
snapshot_lite = conftest.file_json(file_name)
snapshot_lite['data']['lshw']['capabilities']['smbios-3.0'] = 'SMBIOS version 2.5'
b_snapshot = bytes(json.dumps(snapshot_lite), 'utf-8')
file_snap = (BytesIO(b_snapshot), file_name)
data = {
'snapshot': file_snap,
'csrf_token': generate_csrf(),
}
user3.post(uri, data=data, content_type="multipart/form-data")
assert Computer.query.count() == 1
device = Computer.query.one()
assert device.hid == 'laptop-acer-aohappy-lusea0d010038879a01601-88:ae:1d:a6:f3:d0'
assert str(device.system_uuid) == '9ce64e36-829c-b19c-2111-88ae1da6f3d0'