resolve conflicts

This commit is contained in:
Cayo Puigdefabregas 2021-04-16 18:16:07 +02:00
commit 149255bcfe
23 changed files with 158 additions and 91 deletions

View File

@ -14,6 +14,7 @@ ml).
## [1.0.5-beta]
- [addend] #124 adding endpoint for extract the internal stats of use
- [addend] #122 system for verify all documents that it's produced from devicehub
- [addend] #131 add one code for every device
## [1.0.4-beta]
- [addend] #95 adding endpoint for check the hash of one report

View File

@ -97,6 +97,7 @@ class Dummy:
s, _ = user1.post(res=m.Snapshot, data=snapshot)
if s.get('uuid', None) == 'ec23c11b-80b6-42cd-ac5c-73ba7acddbc4':
sample_pc = s['device']['id']
sample_pc_devicehub_id = s['device']['devicehubID']
else:
pcs.add(s['device']['id'])
if s.get('uuid', None) == 'de4f495e-c58b-40e1-a33e-46ab5e84767e': # oreo
@ -182,9 +183,9 @@ class Dummy:
# res=m.Action)
# todo Receive
user1.get(res=Device, item=sample_pc) # Test
user1.get(res=Device, item=sample_pc_devicehub_id) # Test
anonymous = self.app.test_client()
html, _ = anonymous.get(res=Device, item=sample_pc, accept=ANY)
html, _ = anonymous.get(res=Device, item=sample_pc_devicehub_id, accept=ANY)
assert 'intel core2 duo cpu' in html
# For netbook: to preapre -> torepair -> to dispose -> disposed

View File

@ -0,0 +1,49 @@
"""add code to device
Revision ID: 8cb91ad1cc40
Revises: 6a2a939d5668
Create Date: 2021-03-03 10:39:19.331027
"""
import citext
import sqlalchemy as sa
from alembic import op
from alembic import context
from ereuse_devicehub.resources.utils import hashcode
# revision identifiers, used by Alembic.
revision = '8cb91ad1cc40'
down_revision = '6a2a939d5668'
branch_labels = None
depends_on = None
def get_inv():
INV = context.get_x_argument(as_dictionary=True).get('inventory')
if not INV:
raise ValueError("Inventory value is not specified")
return INV
def upgrade_data():
con = op.get_bind()
devices = con.execute(f"select id from {get_inv()}.device")
for d in devices:
id_dev = d.id
code = hashcode.encode(d.id)
sql = f"update {get_inv()}.device set devicehub_id='{code}' where id={id_dev};"
con.execute(sql)
def upgrade():
op.add_column('device', sa.Column('devicehub_id', citext.CIText(),
unique=True,
nullable=True), schema=f'{get_inv()}')
upgrade_data()
def downgrade():
op.drop_column('device', 'devicehub_id', schema=f'{get_inv()}')

View File

@ -10,7 +10,7 @@ from ereuse_devicehub.resources.device.views import DeviceView, DeviceMergeView,
class DeviceDef(Resource):
SCHEMA = schemas.Device
VIEW = DeviceView
ID_CONVERTER = Converters.int
ID_CONVERTER = Converters.string
AUTH = False # We manage this at each view
def __init__(self, app,

View File

@ -8,6 +8,7 @@ from typing import Dict, List, Set
from boltons import urlutils
from citext import CIText
from flask_sqlalchemy import event
from ereuse_utils.naming import HID_CONVERSION_DOC, Naming
from flask import g
from more_itertools import unique_everseen
@ -27,12 +28,19 @@ from teal.marshmallow import ValidationError
from teal.resource import url_for_resource
from ereuse_devicehub.db import db
from ereuse_devicehub.resources.utils import hashcode
from ereuse_devicehub.resources.enums import BatteryTechnology, CameraFacing, ComputerChassis, \
DataStorageInterface, DisplayTech, PrinterTechnology, RamFormat, RamInterface, Severity, TransferState
from ereuse_devicehub.resources.models import STR_SM_SIZE, Thing, listener_reset_field_updated_in_actual_time
from ereuse_devicehub.resources.user.models import User
def create_code(context):
_id = Device.query.order_by(Device.id.desc()).first() or 1
if not _id == 1:
_id = _id.id + 1
return hashcode.encode(_id)
class Device(Thing):
"""Base class for any type of physical object that can be identified.
@ -115,6 +123,8 @@ class Device(Thing):
owner = db.relationship(User, primaryjoin=owner_id == User.id)
allocated = db.Column(Boolean, default=False)
allocated.comment = "device is allocated or not."
devicehub_id = db.Column(db.CIText(), nullable=True, unique=True, default=create_code)
devicehub_id.comment = "device have a unique code."
_NON_PHYSICAL_PROPS = {
'id',
@ -137,7 +147,8 @@ class Device(Thing):
'version',
'sku',
'image',
'allocated'
'allocated',
'devicehub_id'
}
__table_args__ = (
@ -225,7 +236,7 @@ class Device(Thing):
@property
def url(self) -> urlutils.URL:
"""The URL where to GET this device."""
return urlutils.URL(url_for_resource(Device, item_id=self.id))
return urlutils.URL(url_for_resource(Device, item_id=self.devicehub_id))
@property
def rate(self):
@ -363,7 +374,7 @@ class Device(Thing):
if act.type == 'Allocate':
allo = {'type': 'Allocate',
'systemId': self.id,
'devicehubID': self.devicehub_id,
'finalUserCode': act.final_user_code,
'numEndUsers': act.end_users,
'hid': self.hid,
@ -384,7 +395,7 @@ class Device(Thing):
if act.type == 'Deallocate':
deallo = {'type': 'Deallocate',
'systemId': self.id,
'devicehubID': self.devicehub_id,
'finalUserCode': '',
'numEndUsers': '',
'hid': self.hid,
@ -1023,3 +1034,4 @@ class Manufacturer(db.Model):
listener_reset_field_updated_in_actual_time(Device)

View File

@ -66,6 +66,8 @@ class Device(Thing):
sku = SanitizedStr(description=m.Device.sku.comment)
image = URL(description=m.Device.image.comment)
allocated = Boolean(description=m.Device.allocated.comment)
devicehub_id = SanitizedStr(data_key='devicehubID',
description=m.Device.devicehub_id.comment)
@pre_load
def from_actions_to_actions_one(self, data: dict):

View File

@ -118,7 +118,7 @@ class DeviceView(View):
return Response(status=204)
raise ValueError('Cannot patch a non computer')
def one(self, id: int):
def one(self, id: str):
"""Gets one device."""
if not request.authorization:
return self.one_public(id)
@ -126,12 +126,12 @@ class DeviceView(View):
return self.one_private(id)
def one_public(self, id: int):
device = Device.query.filter_by(id=id).one()
device = Device.query.filter_by(devicehub_id=id).one()
return render_template('devices/layout.html', device=device, states=states)
@auth.Auth.requires_auth
def one_private(self, id: int):
device = Device.query.filter_by(id=id, owner_id=g.user.id).first()
def one_private(self, id: str):
device = Device.query.filter_by(devicehub_id=id, owner_id=g.user.id).first()
if not device:
return self.one_public(id)
return self.schema.jsonify(device)

View File

@ -42,10 +42,10 @@ class DeviceRow(OrderedDict):
software = "{software} {version}".format(
software=snapshot.software.name, version=snapshot.version)
# General information about device
self['System ID'] = device.id
self['DevicehubID'] = device.devicehub_id
self['DocumentID'] = self.document_id
self['Public Link'] = '{url}{id}'.format(url=url_for('Device.main', _external=True),
id=device.id)
id=device.devicehub_id)
self['Tag 1 Type'] = self['Tag 1 ID'] = self['Tag 1 Organization'] = ''
self['Tag 2 Type'] = self['Tag 2 ID'] = self['Tag 2 Organization'] = ''
self['Tag 3 Type'] = self['Tag 3 ID'] = self['Tag 3 Organization'] = ''
@ -369,7 +369,7 @@ class ActionRow(OrderedDict):
def __init__(self, allocate):
super().__init__()
# General information about allocates, deallocate and lives
self['SystemId'] = allocate['systemId']
self['DevicehubID'] = allocate['devicehubID']
self['Hid'] = allocate['hid']
self['Start'] = allocate['start']
self['FinalUserCode'] = allocate['finalUserCode']

View File

@ -105,13 +105,10 @@ class DocumentView(DeviceView):
url_pdf = boltons.urlutils.URL(flask.request.url)
url_pdf.query_params['format'] = 'PDF'
url_web = boltons.urlutils.URL(flask.request.url)
url_web.query_params['format'] = 'HTML'
params = {
'title': 'Erasure Certificate',
'erasures': tuple(erasures()),
'url_pdf': url_pdf.to_text(),
'url_web': url_web.to_text()
'url_pdf': url_pdf.to_text()
}
return flask.render_template('documents/erasure.html', **params)

View File

@ -42,13 +42,13 @@
<dt>Computer where was erase:</dt>
<dd>Title: {{ erasure.parent.__format__('ts') }}</dd>
<dd>SystemId: {{ erasure.parent.id }}</dd>
<dd>DevicehubID: {{ erasure.parent.devicehub_id }}</dd>
<dd>Hid: {{ erasure.parent.hid }}</dd>
<dd>Tags: {{ erasure.parent.tags }}</dd>
<dt>Computer where it resides:</dt>
<dd>Title: {{ erasure.device.parent.__format__('ts') }}</dd>
<dd>SystemId: {{ erasure.device.parent.id }}</dd>
<dd>DevicehubID: {{ erasure.device.parent.devicehub_id }}</dd>
<dd>Hid: {{ erasure.device.parent.hid }}</dd>
<dd>Tags: {{ erasure.device.parent.tags }}</dd>
@ -87,6 +87,6 @@
<a href="{{ url_pdf }}">Click here to download the PDF.</a>
</div>
<div class="print-only">
<a href="{{ url_web }}">Verify on-line the integrity of this document</a>
<a href="{{ url_for('Document.StampsView', _external=True) }}">Verify on-line the integrity of this document</a>
</div>
{% endblock %}

View File

@ -35,7 +35,6 @@ class TagDef(Resource):
)
super().__init__(app, import_name, static_folder, static_url_path, template_folder,
url_prefix, subdomain, url_defaults, root_path, cli_commands)
_get_device_from_tag = app.auth.requires_auth(get_device_from_tag)
# DeviceTagView URLs
device_view = TagDeviceView.as_view('tag-device-view', definition=self, auth=app.auth)

View File

@ -18,7 +18,7 @@ class TagView(View):
tag = Tag.query.filter_by(internal_id=internal_id).one() # type: Tag
if not tag.device:
raise TagNotLinked(tag.id)
return redirect(location=url_for_resource(Device, tag.device.id))
return redirect(location=url_for_resource(Device, tag.device.devicehub_id))
@auth.Auth.requires_auth
def post(self):
@ -82,7 +82,9 @@ class TagDeviceView(View):
tag = Tag.from_an_id(id).one() # type: Tag
if not tag.device:
raise TagNotLinked(tag.id)
return redirect(location=url_for_resource(Device, tag.device.id))
if not request.authorization:
return redirect(location=url_for_resource(Device, tag.device.devicehub_id))
return app.resources[Device.t].schema.jsonify(tag.device.devicehub_id)
@auth.Auth.requires_auth
def one_authorization(self, id):
@ -91,11 +93,10 @@ class TagDeviceView(View):
raise TagNotLinked(tag.id)
return app.resources[Device.t].schema.jsonify(tag.device)
# noinspection PyMethodOverriding
@auth.Auth.requires_auth
def put(self, tag_id: str, device_id: str):
def put(self, tag_id: str, device_id: int):
"""Links an existing tag with a device."""
# tag = Tag.from_an_id(tag_id).one() # type: Tag
device_id = int(device_id)
tag = Tag.from_an_id(tag_id).filter_by(owner=g.user).one() # type: Tag
if tag.device_id:
if tag.device_id == device_id:
@ -137,7 +138,7 @@ def get_device_from_tag(id: str):
# todo this could be more efficient by Device.query... join with tag
device = Tag.query.filter_by(id=id).one().device
if not request.authorization:
return redirect(location=url_for_resource(Device, device.id))
return redirect(location=url_for_resource(Device, device.devicehub_id))
if device is None:
raise TagNotLinked(id)
return app.resources[Device.t].schema.jsonify(device)

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -242,7 +242,7 @@ def test_generic_action(action_model_state: Tuple[models.Action, states.Trading]
action = {'type': action_model.t, 'devices': [snapshot['device']['id']]}
action, _ = user.post(action, res=models.Action)
assert action['devices'][0]['id'] == snapshot['device']['id']
device, _ = user.get(res=Device, item=snapshot['device']['id'])
device, _ = user.get(res=Device, item=snapshot['device']['devicehubID'])
assert device['actions'][-1]['id'] == action['id']
assert device['physical'] == state.name
# Check if the update of device is changed
@ -625,6 +625,7 @@ def test_allocate(user: UserClient):
""" Tests allocate """
snapshot, _ = user.post(file('basic.snapshot'), res=models.Snapshot)
device_id = snapshot['device']['id']
devicehub_id = snapshot['device']['devicehubID']
post_request = {"transaction": "ccc",
"finalUserCode": "aabbcc",
"name": "John",
@ -638,7 +639,7 @@ def test_allocate(user: UserClient):
allocate, _ = user.post(res=models.Allocate, data=post_request)
# Normal allocate
device, _ = user.get(res=Device, item=device_id)
device, _ = user.get(res=Device, item=devicehub_id)
assert device['allocated'] == True
action = [a for a in device['actions'] if a['type'] == 'Allocate'][0]
assert action['transaction'] == allocate['transaction']
@ -691,6 +692,7 @@ def test_deallocate(user: UserClient):
""" Tests deallocate """
snapshot, _ = user.post(file('basic.snapshot'), res=models.Snapshot)
device_id = snapshot['device']['id']
devicehub_id = snapshot['device']['devicehubID']
post_deallocate = {"startTime": "2020-11-01T02:00:00+00:00",
"transaction": "ccc",
"devices": [device_id]
@ -705,7 +707,7 @@ def test_deallocate(user: UserClient):
}
user.post(res=models.Allocate, data=post_allocate)
device, _ = user.get(res=Device, item=device_id)
device, _ = user.get(res=Device, item=devicehub_id)
assert device['allocated'] == True
deallocate, _ = user.post(res=models.Deallocate, data=post_deallocate)
assert deallocate['startTime'] == post_deallocate['startTime']
@ -763,7 +765,7 @@ def test_trade(action_model_state: Tuple[Type[models.Action], states.Trading], u
action['invoiceNumber'] = 'ABC'
action, _ = user.post(action, res=models.Action)
assert action['devices'][0]['id'] == snapshot['device']['id']
device, _ = user.get(res=Device, item=snapshot['device']['id'])
device, _ = user.get(res=Device, item=snapshot['device']['devicehubID'])
assert device['actions'][-1]['id'] == action['id']
assert device['trading'] == state.name
@ -786,7 +788,7 @@ def test_price_custom():
assert p['price'] == 25.25
assert p['currency'] == Currency.EUR.name == 'EUR'
c, _ = client.get(res=Device, item=computer.id)
c, _ = client.get(res=Device, item=computer.devicehub_id)
assert c['price']['id'] == p['id']
@ -804,7 +806,7 @@ def test_price_custom_client(user: UserClient):
assert 25 == price['price']
assert Currency.EUR.name == price['currency']
device, _ = user.get(res=Device, item=price['device']['id'])
device, _ = user.get(res=Device, item=price['device']['devicehubID'])
assert 25 == device['price']['price']

View File

@ -416,7 +416,7 @@ def test_get_device(user: UserClient):
agent=Person(name='Timmy'),
author=User(email='bar@bar.com')))
db.session.commit()
pc_api, _ = user.get(res=d.Device, item=pc.id)
pc_api, _ = user.get(res=d.Device, item=pc.devicehub_id)
assert len(pc_api['actions']) == 1
assert pc_api['actions'][0]['type'] == 'TestConnectivity'
assert pc_api['actions'][0]['device'] == pc.id
@ -474,14 +474,14 @@ def test_get_device_permissions(app: Devicehub, user: UserClient, user2: UserCli
"""Checks GETting a d.Desktop with its components."""
s, _ = user.post(file('asus-eee-1000h.snapshot.11'), res=m.Snapshot)
pc, res = user.get(res=d.Device, item=s['device']['id'])
pc, res = user.get(res=d.Device, item=s['device']['devicehubID'])
assert res.status_code == 200
assert len(pc['actions']) == 9
html, _ = client.get(res=d.Device, item=s['device']['id'], accept=ANY)
html, _ = client.get(res=d.Device, item=s['device']['devicehubID'], accept=ANY)
assert 'intel atom cpu n270 @ 1.60ghz' in html
assert '00:24:8C:7F:CF:2D 100 Mbps' in html
pc2, res2 = user2.get(res=d.Device, item=s['device']['id'], accept=ANY)
pc2, res2 = user2.get(res=d.Device, item=s['device']['devicehubID'], accept=ANY)
assert res2.status_code == 200
assert pc2 == html
@ -556,7 +556,7 @@ def test_device_properties_format(app: Devicehub, user: UserClient):
@pytest.mark.mvp
def test_device_public(user: UserClient, client: Client):
s, _ = user.post(file('asus-eee-1000h.snapshot.11'), res=m.Snapshot)
html, _ = client.get(res=d.Device, item=s['device']['id'], accept=ANY)
html, _ = client.get(res=d.Device, item=s['device']['devicehubID'], accept=ANY)
assert 'intel atom cpu n270 @ 1.60ghz' in html
assert '00:24:8C:7F:CF:2D 100 Mbps' in html
@ -615,8 +615,8 @@ def test_cooking_mixer_api(user: UserClient):
def test_hid_with_mac(app: Devicehub, user: UserClient):
"""Checks hid with mac."""
snapshot = file('asus-eee-1000h.snapshot.11')
user.post(snapshot, res=m.Snapshot)
pc, _ = user.get(res=d.Device, item=3)
snap, _ = user.post(snapshot, res=m.Snapshot)
pc, _ = user.get(res=d.Device, item=snap['device']['devicehubID'])
assert pc['hid'] == 'laptop-asustek_computer_inc-1000h-94oaaq021116-00:24:8c:7f:cf:2d'
@ -626,7 +626,7 @@ def test_hid_without_mac(app: Devicehub, user: UserClient):
snapshot = file('asus-eee-1000h.snapshot.11')
snapshot['components'] = [c for c in snapshot['components'] if c['type'] != 'NetworkAdapter']
snap, _ = user.post(snapshot, res=m.Snapshot)
pc, _ = user.get(res=d.Device, item=snap['device']['id'])
pc, _ = user.get(res=d.Device, item=snap['device']['devicehubID'])
assert pc['hid'] == 'laptop-asustek_computer_inc-1000h-94oaaq021116'
@ -637,7 +637,7 @@ def test_hid_with_mac_none(app: Devicehub, user: UserClient):
network = [c for c in snapshot['components'] if c['type'] == 'NetworkAdapter'][0]
network['serialNumber'] = None
snap, _ = user.post(snapshot, res=m.Snapshot)
pc, _ = user.get(res=d.Device, item=snap['device']['id'])
pc, _ = user.get(res=d.Device, item=snap['device']['devicehubID'])
assert pc['hid'] == 'laptop-asustek_computer_inc-1000h-94oaaq021116'
@ -666,7 +666,7 @@ def test_hid_with_2network_and_drop_no_mac_in_hid(app: Devicehub, user: UserClie
snapshot['components'].append(network2)
network['serialNumber'] = 'a0:24:8c:7f:cf:2d'
snap, _ = user.post(snapshot, res=m.Snapshot)
pc, _ = user.get(res=d.Device, item=snap['device']['id'])
pc, _ = user.get(res=d.Device, item=snap['device']['devicehubID'])
assert pc['hid'] == 'laptop-asustek_computer_inc-1000h-94oaaq021116-00:24:8c:7f:cf:2d'
snapshot['uuid'] = 'd1b70cb8-8929-4f36-99b7-fe052cec0abb'
@ -689,7 +689,7 @@ def test_hid_with_2network_and_drop_mac_in_hid(app: Devicehub, user: UserClient)
snapshot['components'].append(network2)
network['serialNumber'] = 'a0:24:8c:7f:cf:2d'
snap, _ = user.post(snapshot, res=m.Snapshot)
pc, _ = user.get(res=d.Device, item=snap['device']['id'])
pc, _ = user.get(res=d.Device, item=snap['device']['devicehubID'])
assert pc['hid'] == 'laptop-asustek_computer_inc-1000h-94oaaq021116-00:24:8c:7f:cf:2d'
# we drop the network card then is used for to build the hid

View File

@ -179,7 +179,7 @@ def test_device_query(user: UserClient):
snapshot, _ = user.post(conftest.file('basic.snapshot'), res=Snapshot)
i, _ = user.get(res=Device)
assert i['url'] == '/devices/'
assert i['items'][0]['url'] == '/devices/{}'.format(snapshot['device']['id'])
assert i['items'][0]['url'] == '/devices/%s' % snapshot['device']['devicehubID']
pc = next(d for d in i['items'] if d['type'] == 'Desktop')
assert len(pc['actions']) == 4
assert len(pc['components']) == 3

View File

@ -177,7 +177,7 @@ def test_live_export_csv2(user: UserClient, client: Client, app: Devicehub):
assert "4692" in csv_user
assert "8692" in csv_user
assert "SystemId" in csv_user
assert "DevicehubID" in csv_user
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)

View File

@ -365,13 +365,14 @@ def test_lot_post_add_remove_device_view(app: Devicehub, user: UserClient):
db.session.add(device)
db.session.commit()
device_id = device.id
devicehub_id = device.devicehub_id
parent, _ = user.post(({'name': 'lot'}), res=Lot)
lot, _ = user.post({},
res=Lot,
item='{}/devices'.format(parent['id']),
query=[('id', device_id)])
assert lot['devices'][0]['id'] == device_id, 'Lot contains device'
device, _ = user.get(res=Device, item=device_id)
device, _ = user.get(res=Device, item=devicehub_id)
assert len(device['lots']) == 1
assert device['lots'][0]['id'] == lot['id'], 'Device is inside lot'

View File

@ -39,7 +39,7 @@ def test_rate_with_multiple_visual_tests(user: UserClient):
"""
s = file('real-hp.snapshot.11')
snapshot, _ = user.post(s, res=Snapshot)
device, _ = user.get(res=Device, item=snapshot['device']['id'])
device, _ = user.get(res=Device, item=snapshot['device']['devicehubID'])
visual_test = next(e for e in reversed(device['actions']) if e['type'] == VisualTest.t)
assert visual_test['appearanceRange'] == 'B'
@ -53,7 +53,7 @@ def test_rate_with_multiple_visual_tests(user: UserClient):
'appearanceRange': 'A',
'functionalityRange': 'A'
}, res=Action)
device, _ = user.get(res=Device, item=snapshot['device']['id'])
device, _ = user.get(res=Device, item=snapshot['device']['devicehubID'])
visual_test = next(e for e in reversed(device['actions']) if e['type'] == VisualTest.t)
assert visual_test['appearanceRange'] == 'A'

View File

@ -64,7 +64,7 @@ def test_snapshot_model():
assert m.Desktop.query.one_or_none() is None
assert m.Device.query.one_or_none() is None
# Check properties
assert device.url == urlutils.URL('http://localhost/devices/3')
assert device.url == urlutils.URL('http://localhost/devices/%s' % device.devicehub_id)
@pytest.mark.mvp
@ -93,7 +93,7 @@ def test_snapshot_post(user: UserClient):
assert snapshot['author']['id'] == user.user['id']
assert 'actions' not in snapshot['device']
assert 'author' not in snapshot['device']
device, _ = user.get(res=m.Device, item=snapshot['device']['id'])
device, _ = user.get(res=m.Device, item=snapshot['device']['devicehubID'])
key = itemgetter('serialNumber')
snapshot['components'].sort(key=key)
device['components'].sort(key=key)
@ -116,7 +116,8 @@ def test_same_device_tow_users(user: UserClient, user2: UserClient):
i, _ = user.get(res=m.Device)
pc = next(d for d in i['items'] if d['type'] == 'Desktop')
pc_id = pc['id']
assert i['items'][0]['url'] == f'/devices/{pc_id}'
devicehub_id = pc['devicehubID']
assert i['items'][0]['url'] == f'/devices/{devicehub_id}'
basic_snapshot = file('basic.snapshot')
basic_snapshot['uuid'] = f"{uuid.uuid4()}"
@ -141,8 +142,8 @@ def test_snapshot_update_timefield_updated(user: UserClient):
computer2 = file('2-second-device-with-components-of-first.snapshot')
snapshot_and_check(user, computer2, action_types=('Remove', 'RateComputer'),
perform_second_snapshot=False)
pc1_id = snapshot['device']['id']
pc1, _ = user.get(res=m.Device, item=pc1_id)
pc1_devicehub_id = snapshot['device']['devicehubID']
pc1, _ = user.get(res=m.Device, item=pc1_devicehub_id)
assert pc1['updated'] != snapshot['device']['updated']
@ -171,7 +172,8 @@ def test_snapshot_component_add_remove(user: UserClient):
RateComputer.t),
perform_second_snapshot=False)
pc1_id = snapshot1['device']['id']
pc1, _ = user.get(res=m.Device, item=pc1_id)
pc1_devicehub_id = snapshot1['device']['devicehubID']
pc1, _ = user.get(res=m.Device, item=pc1_devicehub_id)
update1_pc1 = pc1['updated']
# Parent contains components
assert tuple(c['serialNumber'] for c in pc1['components']) == ('p1c1s', 'p1c2s', 'p1c3s')
@ -181,7 +183,7 @@ def test_snapshot_component_add_remove(user: UserClient):
assert len(pc1['actions']) == 3
assert pc1['actions'][1]['type'] == Snapshot.t
# p1c1s has Snapshot
p1c1s, _ = user.get(res=m.Device, item=pc1['components'][0]['id'])
p1c1s, _ = user.get(res=m.Device, item=pc1['components'][0]['devicehubID'])
assert tuple(e['type'] for e in p1c1s['actions']) == ('Snapshot', 'RateComputer')
# We register a new device
@ -193,8 +195,9 @@ def test_snapshot_component_add_remove(user: UserClient):
snapshot2 = snapshot_and_check(user, s2, action_types=('Remove', 'RateComputer'),
perform_second_snapshot=False)
pc2_id = snapshot2['device']['id']
pc1, _ = user.get(res=m.Device, item=pc1_id)
pc2, _ = user.get(res=m.Device, item=pc2_id)
pc2_devicehub_id = snapshot2['device']['devicehubID']
pc1, _ = user.get(res=m.Device, item=pc1_devicehub_id)
pc2, _ = user.get(res=m.Device, item=pc2_devicehub_id)
# Check if the update_timestamp is updated
update1_pc2 = pc2['updated']
update2_pc1 = pc1['updated']
@ -208,7 +211,7 @@ def test_snapshot_component_add_remove(user: UserClient):
assert all(c['parent'] == pc2_id for c in pc2['components'])
assert tuple(e['type'] for e in pc2['actions']) == ('Snapshot', 'RateComputer')
# p1c2s has two Snapshots, a Remove and an Add
p1c2s, _ = user.get(res=m.Device, item=pc2['components'][0]['id'])
p1c2s, _ = user.get(res=m.Device, item=pc2['components'][0]['devicehubID'])
assert tuple(e['type'] for e in p1c2s['actions']) == (
'BenchmarkProcessor', 'Snapshot', 'RateComputer', 'Snapshot', 'Remove', 'RateComputer'
)
@ -219,8 +222,8 @@ def test_snapshot_component_add_remove(user: UserClient):
# PC 0: p1c2s, p1c3s. PC 1: p2c1s
s3 = file('3-first-device-but-removing-motherboard-and-adding-processor-from-2.snapshot')
snapshot_and_check(user, s3, ('Remove', 'RateComputer'), perform_second_snapshot=False)
pc1, _ = user.get(res=m.Device, item=pc1_id)
pc2, _ = user.get(res=m.Device, item=pc2_id)
pc1, _ = user.get(res=m.Device, item=pc1_devicehub_id)
pc2, _ = user.get(res=m.Device, item=pc2_devicehub_id)
# Check if the update_timestamp is updated
update2_pc2 = pc2['updated']
update3_pc1 = pc1['updated']
@ -248,7 +251,7 @@ def test_snapshot_component_add_remove(user: UserClient):
'Remove' # the processor we added in 2.
)
# p1c2s has Snapshot, Remove and Add
p1c2s, _ = user.get(res=m.Device, item=pc1['components'][0]['id'])
p1c2s, _ = user.get(res=m.Device, item=pc1['components'][0]['devicehubID'])
assert tuple(get_actions_info(p1c2s['actions'])) == (
('BenchmarkProcessor', []), # first BenchmarkProcessor
('Snapshot', ['p1c1s', 'p1c2s', 'p1c3s']), # First Snapshot to PC1
@ -265,8 +268,8 @@ def test_snapshot_component_add_remove(user: UserClient):
# adding a graphic card and adding a new component
s4 = file('4-first-device-but-removing-processor.snapshot-and-adding-graphic-card')
snapshot4 = snapshot_and_check(user, s4, ('RateComputer',), perform_second_snapshot=False)
pc1, _ = user.get(res=m.Device, item=pc1_id)
pc2, _ = user.get(res=m.Device, item=pc2_id)
pc1, _ = user.get(res=m.Device, item=pc1_devicehub_id)
pc2, _ = user.get(res=m.Device, item=pc2_devicehub_id)
# Check if the update_timestamp is updated
update3_pc2 = pc2['updated']
update4_pc1 = pc1['updated']
@ -430,7 +433,7 @@ def test_erase_privacy_standards_endtime_sort(user: UserClient):
# The actual test
storage = next(e for e in snapshot['components'] if e['type'] == SolidStateDrive.t)
storage, _ = user.get(res=m.Device, item=storage['id']) # Let's get storage actions too
storage, _ = user.get(res=m.Device, item=storage['devicehubID']) # Let's get storage actions too
# order: endTime ascending
# erasure1/2 have an user defined time and others actions endTime = created
erasure1, erasure2, benchmark_hdd1, _snapshot1, _, _, benchmark_hdd2, _snapshot2 = storage['actions'][:8]
@ -456,17 +459,17 @@ def test_erase_privacy_standards_endtime_sort(user: UserClient):
assert 'num' not in step2
assert ['HMG_IS5'] == erasure['standards']
assert storage['privacy']['type'] == 'EraseSectors'
pc, _ = user.get(res=m.Device, item=snapshot['device']['id'])
pc, _ = user.get(res=m.Device, item=snapshot['device']['devicehubID'])
assert pc['privacy'] == [storage['privacy']]
# Let's try a second erasure with an error
s['uuid'] = uuid4()
s['components'][0]['actions'][0]['severity'] = 'Error'
snapshot, _ = user.post(s, res=Snapshot)
storage, _ = user.get(res=m.Device, item=storage['id'])
storage, _ = user.get(res=m.Device, item=storage['devicehubID'])
assert storage['hid'] == 'solidstatedrive-c1mr-c1ml-c1s'
assert storage['privacy']['type'] == 'EraseSectors'
pc, _ = user.get(res=m.Device, item=snapshot['device']['id'])
pc, _ = user.get(res=m.Device, item=snapshot['device']['devicehubID'])
assert pc['privacy'] == [storage['privacy']]
@ -549,7 +552,7 @@ def snapshot_and_check(user: UserClient,
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_erase_changing_hdd_between_pcs(user: UserClient):
"""Tests when we erase one device and next change the disk in other device we
"""Tests when we erase one device and next change the disk in other device we
want see in the second device the disks erase."""
s1 = file('erase-sectors-2-hdd.snapshot')
s2 = file('erase-sectors-2-hdd.snapshot2')
@ -616,7 +619,7 @@ def test_save_snapshot_in_file(app: Devicehub, user: UserClient):
@pytest.mark.mvp
def test_action_no_snapshot_without_save_file(app: Devicehub, user: UserClient):
""" This test check if the function save_snapshot_in_file not work when we
""" This test check if the function save_snapshot_in_file not work when we
send one other action different to snapshot
"""
s = file('laptop-hp_255_g3_notebook-hewlett-packard-cnd52270fw.snapshot')
@ -737,7 +740,7 @@ def test_snapshot_not_failed_null_chassis(app: Devicehub, user: UserClient):
tmp_snapshots = app.config['TMP_SNAPSHOTS']
path_dir_base = os.path.join(tmp_snapshots, user.user['email'], 'errors')
snapshot_error = file('desktop-9644w8n-lenovo-0169622.snapshot')
snapshot_error['device']['chassis'] = None
snapshot_error['device']['chassis'] = None
uuid = snapshot_error['uuid']
snapshot, res = user.post(res=Snapshot, data=snapshot_error)
@ -781,7 +784,7 @@ def test_snapshot_failed_end_time_bug(app: Devicehub, user: UserClient):
"""
snapshot_file = file('asus-end_time_bug88.snapshot')
snapshot, _ = user.post(res=Snapshot, data=snapshot_file)
device, _ = user.get(res=m.Device, item=snapshot['device']['id'])
device, _ = user.get(res=m.Device, item=snapshot['device']['devicehubID'])
end_times = [x['endTime'] for x in device['actions']]
assert '1970-01-02T00:00:00+00:00' in end_times
@ -798,7 +801,7 @@ def test_snapshot_not_failed_end_time_bug(app: Devicehub, user: UserClient):
snapshot_file = file('asus-end_time_bug88.snapshot')
snapshot_file['endTime'] = '2001-01-01 00:00:00+00:00'
snapshot, _ = user.post(res=Snapshot, data=snapshot_file)
device, _ = user.get(res=m.Device, item=snapshot['device']['id'])
device, _ = user.get(res=m.Device, item=snapshot['device']['devicehubID'])
end_times = [x['endTime'] for x in device['actions']]
assert not '1970-01-02T00:00:00+00:00' in end_times
@ -818,7 +821,7 @@ def test_snapshot_bug_smallint_hdd(app: Devicehub, user: UserClient):
snapshot, _ = user.post(res=Snapshot, data=snapshot_file)
act = [act for act in snapshot['actions'] if act['type'] == 'TestDataStorage'][0]
assert act['currentPendingSectorCount'] == 473302660
assert act['currentPendingSectorCount'] == 473302660
assert act['offlineUncorrectable'] == 182042944
tmp_snapshots = app.config['TMP_SNAPSHOTS']
@ -832,9 +835,7 @@ def test_snapshot_mobil(app: Devicehub, user: UserClient):
"""
snapshot_file = file('mobil')
snapshot, _ = user.post(res=Snapshot, data=snapshot_file)
device, _ = user.get(res=m.Device, item=snapshot['device']['id'])
device, _ = user.get(res=m.Device, item=snapshot['device']['devicehubID'])
tmp_snapshots = app.config['TMP_SNAPSHOTS']
shutil.rmtree(tmp_snapshots)

View File

@ -281,8 +281,9 @@ def test_tag_manual_link_search(app: Devicehub, user: UserClient):
db.session.add(desktop)
db.session.commit()
desktop_id = desktop.id
devicehub_id = desktop.devicehub_id
user.put({}, res=Tag, item='foo-bar/device/{}'.format(desktop_id), status=204)
device, _ = user.get(res=Device, item=desktop_id)
device, _ = user.get(res=Device, item=devicehub_id)
assert device['tags'][0]['id'] == 'foo-bar'
# Device already linked
@ -321,7 +322,7 @@ def test_tag_secondary_workbench_link_find(user: UserClient):
s = file('basic.snapshot')
s['device']['tags'] = [{'id': 'foo', 'secondary': 'bar', 'type': 'Tag'}]
snapshot, _ = user.post(s, res=Snapshot)
device, _ = user.get(res=Device, item=snapshot['device']['id'])
device, _ = user.get(res=Device, item=snapshot['device']['devicehubID'])
assert device['tags'][0]['id'] == 'foo'
assert device['tags'][0]['secondary'] == 'bar'

View File

@ -53,7 +53,7 @@ def test_workbench_server_condensed(user: UserClient):
}
assert snapshot['closed']
assert snapshot['severity'] == 'Info'
device, _ = user.get(res=Device, item=snapshot['device']['id'])
device, _ = user.get(res=Device, item=snapshot['device']['devicehubID'])
assert device['dataStorageSize'] == 1100
assert device['chassis'] == 'Tower'
assert device['hid'] == 'desktop-d1mr-d1ml-d1s-na1-s'
@ -133,7 +133,7 @@ def test_workbench_server_phases(user: UserClient):
assert snapshot['closed']
assert snapshot['severity'] == 'Info'
pc, _ = user.get(res=Device, item=snapshot['id'])
pc, _ = user.get(res=Device, item=snapshot['devicehubID'])
assert len(pc['actions']) == 10 # todo shall I add child actions?
@ -177,7 +177,7 @@ def test_snapshot_real_eee_1001pxd_with_rate(user: UserClient):
"""
s = file('real-eee-1001pxd.snapshot.11')
snapshot, _ = user.post(res=em.Snapshot, data=s)
pc, _ = user.get(res=Device, item=snapshot['device']['id'])
pc, _ = user.get(res=Device, item=snapshot['device']['devicehubID'])
assert pc['type'] == 'Laptop'
assert pc['chassis'] == 'Netbook'
assert pc['model'] == '1001pxd'
@ -217,7 +217,7 @@ def test_snapshot_real_eee_1001pxd_with_rate(user: UserClient):
assert cpu['speed'] == 1.667
assert 'hid' not in cpu
assert pc['processorModel'] == cpu['model'] == 'intel atom cpu n455 @ 1.66ghz'
cpu, _ = user.get(res=Device, item=cpu['id'])
cpu, _ = user.get(res=Device, item=cpu['devicehubID'])
actions = cpu['actions']
sysbench = next(e for e in actions if e['type'] == em.BenchmarkProcessorSysbench.t)
assert sysbench['elapsed'] == 164
@ -237,7 +237,7 @@ def test_snapshot_real_eee_1001pxd_with_rate(user: UserClient):
assert gpu['model'] == 'atom processor d4xx/d5xx/n4xx/n5xx integrated graphics controller'
assert gpu['manufacturer'] == 'intel corporation'
assert gpu['memory'] == 256
gpu, _ = user.get(res=Device, item=gpu['id'])
gpu, _ = user.get(res=Device, item=gpu['devicehubID'])
action_types = tuple(e['type'] for e in gpu['actions'])
assert em.BenchmarkRamSysbench.t in action_types
assert em.StressTest.t in action_types
@ -256,7 +256,7 @@ def test_snapshot_real_eee_1001pxd_with_rate(user: UserClient):
assert hdd['hid'] == 'harddrive-hitachi-hts54322-e2024242cv86hj'
assert hdd['interface'] == 'ATA'
assert hdd['size'] == 238475
hdd, _ = user.get(res=Device, item=hdd['id'])
hdd, _ = user.get(res=Device, item=hdd['devicehubID'])
action_types = tuple(e['type'] for e in hdd['actions'])
assert em.BenchmarkRamSysbench.t in action_types
assert em.StressTest.t in action_types