Merge branch 'testing' into bugfix/88-action-end_time

This commit is contained in:
Cayo Puigdefabregas 2020-12-10 10:54:11 +01:00
commit 51d0bd0cc0
11 changed files with 227 additions and 52 deletions

17
CHANGELOG.md Normal file
View File

@ -0,0 +1,17 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.ht
ml).
## master
[1.0.1-beta]
## testing
[1.0.2-beta]
## [1.0.2-beta]
- [addend] #87 allocate, deallocate and live actions
- [fixed] #89 save json on disk only for shapshots
- [addend] #83 add owner_id in all kind of device

View File

@ -0,0 +1,67 @@
"""adding owner_id in device
Revision ID: 68a5c025ab8e
Revises: b9b0ee7d9dca
Create Date: 2020-10-30 11:48:34.992498
"""
import sqlalchemy as sa
from alembic import context
from alembic import op
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = '68a5c025ab8e'
down_revision = 'e93aec8fc41f'
branch_labels = None
depends_on = None
def get_inv():
INV = context.get_x_argument(as_dictionary=True).get('inventory')
if not INV:
raise ValueError("Inventory value is not specified")
return INV
def upgrade_data():
con = op.get_bind()
computers = con.execute(f"select id, owner_id from {get_inv()}.computer")
for c in computers:
id_dev = c.id
id_owner = c.owner_id
sql = f"update {get_inv()}.device set owner_id='{id_owner}' where id={id_dev};"
con.execute(sql)
values = f"{get_inv()}.component.id, {get_inv()}.computer.owner_id"
table = f"{get_inv()}.component"
joins = f"inner join {get_inv()}.computer"
on = f"on {table}.parent_id={get_inv()}.computer.id"
sql = f"select {values} from {table} {joins} {on}"
components = con.execute(sql)
for c in components:
id = c.id
id_owner = c.owner_id
sql = f"update {get_inv()}.device set owner_id='{id_owner}' where id={id};"
con.execute(sql)
def upgrade():
# We need get the actual computers with owner_id
# because when add a column in device this reset the values of the owner_id
# in the computer tables
op.add_column('device', sa.Column('owner_id', postgresql.UUID(),
nullable=True), schema=f'{get_inv()}')
op.create_foreign_key("fk_device_owner_id_user_id",
"device", "user",
["owner_id"], ["id"],
ondelete="SET NULL",
source_schema=f'{get_inv()}', referent_schema='common')
upgrade_data()
def downgrade():
op.drop_constraint("fk_device_owner_id_user_id", "device", type_="foreignkey", schema=f'{get_inv()}')
op.drop_column('device', 'owner_id', schema=f'{get_inv()}')

View File

@ -237,12 +237,14 @@ class ActionView(View):
def live(self, snapshot):
"""If the device.allocated == True, then this snapshot create an action live."""
device = snapshot.get('device') # type: Computer
# TODO @cayop dependency of pulls 85 and 83
# if the pr/85 and pr/83 is merged, then you need change this way for get the device
if not device.hid or not Device.query.filter(Device.hid==device.hid).count():
# TODO @cayop dependency of pulls 85
# if the pr/85 is merged, then you need change this way for get the device
if not device.hid or not Device.query.filter(
Device.hid==device.hid, Device.owner_id==g.user.id).count():
return None
device = Device.query.filter(Device.hid==device.hid).one()
device = Device.query.filter(
Device.hid==device.hid, Device.owner_id==g.user.id).one()
if not device.allocated:
return None

View File

@ -32,6 +32,7 @@ from ereuse_devicehub.resources.models import STR_SM_SIZE, Thing, listener_reset
from ereuse_devicehub.resources.user.models import User
class Device(Thing):
"""Base class for any type of physical object that can be identified.
@ -106,6 +107,11 @@ class Device(Thing):
image = db.Column(db.URL)
image.comment = "An image of the device."
owner_id = db.Column(UUID(as_uuid=True),
db.ForeignKey(User.id),
nullable=False,
default=lambda: g.user.id)
owner = db.relationship(User, primaryjoin=owner_id == User.id)
allocated = db.Column(Boolean, default=False)
allocated.comment = "device is allocated or not."
@ -115,6 +121,7 @@ class Device(Thing):
'created',
'updated',
'parent_id',
'owner_id',
'hid',
'production_date',
'color', # these are only user-input thus volatile
@ -593,7 +600,8 @@ class Component(Device):
"""
assert self.hid is None, 'Don\'t use this method with a component that has HID'
component = self.__class__.query \
.filter_by(parent=parent, hid=None, **self.physical_properties) \
.filter_by(parent=parent, hid=None, owner_id=self.owner_id,
**self.physical_properties) \
.filter(~Component.id.in_(blacklist)) \
.first()
if not component:

View File

@ -4,6 +4,7 @@ from itertools import groupby
from typing import Iterable, Set
import yaml
from flask import g
from sqlalchemy import inspect
from sqlalchemy.exc import IntegrityError
from sqlalchemy.util import OrderedSet
@ -101,7 +102,7 @@ class Sync:
assert inspect(component).transient, 'Component should not be synced from DB'
try:
if component.hid:
db_component = Device.query.filter_by(hid=component.hid).one()
db_component = Device.query.filter_by(hid=component.hid, owner_id=g.user.id).one()
assert isinstance(db_component, Device), \
'{} must be a component'.format(db_component)
else:
@ -153,7 +154,7 @@ class Sync:
db_device = None
if device.hid:
with suppress(ResourceNotFound):
db_device = Device.query.filter_by(hid=device.hid).one()
db_device = Device.query.filter_by(hid=device.hid, owner_id=g.user.id).one()
if db_device and db_device.allocated:
raise ResourceNotFound('device is actually allocated {}'.format(device))
try:
@ -204,6 +205,9 @@ class Sync:
This method mutates db_device.
"""
if db_device.owner_id != g.user.id:
return
for field_name, value in device.physical_properties.items():
if value is not None:
setattr(db_device, field_name, value)
@ -234,8 +238,11 @@ class Sync:
return component.parent or Device(id=0) # Computer with id 0 is our Identity
for parent, _components in groupby(sorted(adding, key=g_parent), key=g_parent):
if parent.id != 0: # Is not Computer Identity
actions.add(Remove(device=parent, components=OrderedSet(_components)))
set_components = OrderedSet(_components)
check_owners = (x.owner_id == g.user.id for x in set_components)
# Is not Computer Identity and all components have the correct owner
if parent.id != 0 and all(check_owners):
actions.add(Remove(device=parent, components=set_components))
return actions

View File

@ -101,7 +101,7 @@ class DeviceView(View):
return super().get(id)
def patch(self, id):
dev = Device.query.filter_by(id=id).one()
dev = Device.query.filter_by(id=id, owner_id=g.user.id).one()
if isinstance(dev, Computer):
resource_def = app.resources['Computer']
# TODO check how to handle the 'actions_one'
@ -131,9 +131,9 @@ class DeviceView(View):
@auth.Auth.requires_auth
def one_private(self, id: int):
device = Device.query.filter_by(id=id).one()
if hasattr(device, 'owner_id') and device.owner_id != g.user.id:
device = {}
device = Device.query.filter_by(id=id, owner_id=g.user.id).first()
if not device:
return self.one_public(id)
return self.schema.jsonify(device)
@auth.Auth.requires_auth
@ -149,7 +149,7 @@ class DeviceView(View):
)
def query(self, args):
query = Device.query.distinct() # todo we should not force to do this if the query is ok
query = Device.query.filter((Device.owner_id == g.user.id)).distinct()
search_p = args.get('search', None)
if search_p:
properties = DeviceSearch.properties
@ -159,17 +159,8 @@ class DeviceView(View):
).order_by(
search.Search.rank(properties, search_p) + search.Search.rank(tags, search_p)
)
query = self.visibility_filter(query)
return query.filter(*args['filter']).order_by(*args['sort'])
def visibility_filter(self, query):
filterqs = request.args.get('filter', None)
if (filterqs and
'lot' not in filterqs):
query = query.filter((Computer.id == Device.id), (Computer.owner_id == g.user.id))
pass
return query
class DeviceMergeView(View):
"""View for merging two devices
@ -194,8 +185,13 @@ class DeviceMergeView(View):
many models in session.
"""
# base_device = Device.query.filter_by(id=dev1_id, owner_id=g.user.id).one()
self.base_device = Device.query.filter_by(id=dev1_id).one()
self.with_device = Device.query.filter_by(id=dev2_id).one()
self.base_device = Device.query.filter_by(id=dev1_id, owner_id=g.user.id).one()
self.with_device = Device.query.filter_by(id=dev2_id, owner_id=g.user.id).one()
if self.base_device.allocated or self.with_device.allocated:
# Validation than any device is allocated
msg = 'The device is allocated, please deallocated before merge.'
raise ValidationError(msg)
if not self.base_device.type == self.with_device.type:
# Validation than we are speaking of the same kind of devices

View File

@ -3,7 +3,7 @@ from uuid import UUID
import pytest
from marshmallow import ValidationError
from sqlalchemy_utils import PhoneNumber
from teal.db import UniqueViolation
from teal.db import UniqueViolation, DBError
from teal.enums import Country
from ereuse_devicehub.config import DevicehubConfig
@ -80,7 +80,7 @@ def test_membership_repeated():
db.session.add(person)
person.member_of.add(Membership(org, person))
with pytest.raises(UniqueViolation):
with pytest.raises(DBError):
db.session.flush()
@ -95,7 +95,7 @@ def test_membership_repeating_id():
person2 = Person(name='Tommy')
person2.member_of.add(Membership(org, person2, id='acme-1'))
db.session.add(person2)
with pytest.raises(UniqueViolation) as e:
with pytest.raises(DBError) as e:
db.session.flush()
assert 'One member id per organization' in str(e)

View File

@ -128,7 +128,6 @@ def test_physical_properties():
'ethereum_address': None,
'manufacturer': 'bar',
'model': 'foo',
'owner_id': pc.owner_id,
'receiver_id': None,
'serial_number': 'foo-bar',
'transfer_state': TransferState.Initial
@ -138,6 +137,7 @@ def test_physical_properties():
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.auth_app_context.__name__)
def test_component_similar_one():
user = User.query.filter().first()
snapshot = conftest.file('pc-components.db')
pc = snapshot['device']
snapshot['components'][0]['serial_number'] = snapshot['components'][1]['serial_number'] = None
@ -146,7 +146,8 @@ def test_component_similar_one():
db.session.add(pc)
db.session.flush()
# Let's create a new component named 'A' similar to 1
componentA = d.Component(model=component1.model, manufacturer=component1.manufacturer)
componentA = d.Component(model=component1.model, manufacturer=component1.manufacturer,
owner_id=user.id)
similar_to_a = componentA.similar_one(pc, set())
assert similar_to_a == component1
# d.Component B does not have the same model
@ -165,16 +166,17 @@ def test_add_remove():
# pc has c1 and c2
# pc2 has c3
# c4 is not with any pc
user = User.query.filter().first()
values = conftest.file('pc-components.db')
pc = values['device']
c1, c2 = (d.Component(**c) for c in values['components'])
pc = d.Desktop(**pc, components=OrderedSet([c1, c2]))
db.session.add(pc)
c3 = d.Component(serial_number='nc1')
c3 = d.Component(serial_number='nc1', owner_id=user.id)
pc2 = d.Desktop(serial_number='s2',
components=OrderedSet([c3]),
chassis=ComputerChassis.Microtower)
c4 = d.Component(serial_number='c4s')
c4 = d.Component(serial_number='c4s', owner_id=user.id)
db.session.add(pc2)
db.session.add(c4)
db.session.commit()
@ -313,14 +315,16 @@ def test_sync_execute_register_no_hid_tag_not_linked(tag_id: str):
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
@pytest.mark.usefixtures(conftest.auth_app_context.__name__)
def test_sync_execute_register_tag_does_not_exist():
"""Ensures not being able to register if the tag does not exist,
even if the device has HID or it existed before.
Tags have to be created before trying to link them through a Snapshot.
"""
user = User.query.filter().first()
pc = d.Desktop(**conftest.file('pc-components.db')['device'], tags=OrderedSet([Tag('foo')]))
pc.owner_id = user.id
with raises(ResourceNotFound):
Sync().execute_register(pc)
@ -401,8 +405,9 @@ def test_get_device(app: Devicehub, user: UserClient):
chassis=ComputerChassis.Tower,
owner_id=user.user['id'])
pc.components = OrderedSet([
d.NetworkAdapter(model='c1mo', manufacturer='c1ma', serial_number='c1s'),
d.GraphicCard(model='c2mo', manufacturer='c2ma', memory=1500)
d.NetworkAdapter(model='c1mo', manufacturer='c1ma', serial_number='c1s',
owner_id=user.user['id']),
d.GraphicCard(model='c2mo', manufacturer='c2ma', memory=1500, owner_id=user.user['id'])
])
db.session.add(pc)
# todo test is an abstract class. replace with another one
@ -438,8 +443,10 @@ def test_get_devices(app: Devicehub, user: UserClient):
chassis=ComputerChassis.Tower,
owner_id=user.user['id'])
pc.components = OrderedSet([
d.NetworkAdapter(model='c1mo', manufacturer='c1ma', serial_number='c1s'),
d.GraphicCard(model='c2mo', manufacturer='c2ma', memory=1500)
d.NetworkAdapter(model='c1mo', manufacturer='c1ma', serial_number='c1s',
owner_id=user.user['id']),
d.GraphicCard(model='c2mo', manufacturer='c2ma', memory=1500,
owner_id=user.user['id'])
])
pc1 = d.Desktop(model='p2mo',
manufacturer='p2ma',
@ -461,17 +468,21 @@ def test_get_devices(app: Devicehub, user: UserClient):
@pytest.mark.mvp
def test_get_device_permissions(app: Devicehub, user: UserClient, user2: UserClient):
def test_get_device_permissions(app: Devicehub, user: UserClient, user2: UserClient,
client: Client):
"""Checks GETting a d.Desktop with its components."""
user.post(file('asus-eee-1000h.snapshot.11'), res=m.Snapshot)
pc, res = user.get("/devices/1", None)
s, _ = user.post(file('asus-eee-1000h.snapshot.11'), res=m.Snapshot)
pc, res = user.get(res=d.Device, item=s['device']['id'])
assert res.status_code == 200
assert len(pc['actions']) == 9
pc2, res2 = user2.get("/devices/1", None)
html, _ = client.get(res=d.Device, item=s['device']['id'], accept=ANY)
assert 'intel atom cpu n270 @ 1.60ghz' in html
assert '00:24:8C:7F:CF:2D 100 Mbps' in html
pc2, res2 = user2.get(res=d.Device, item=s['device']['id'], accept=ANY)
assert res2.status_code == 200
assert pc2 == {}
assert pc2 == html
@pytest.mark.mvp
@ -551,29 +562,30 @@ def test_device_public(user: UserClient, client: Client):
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_computer_accessory_model():
sai = d.SAI()
def test_computer_accessory_model(user: UserClient):
sai = d.SAI(owner_id=user.user['id'])
db.session.add(sai)
keyboard = d.Keyboard(layout=Layouts.ES)
keyboard = d.Keyboard(layout=Layouts.ES, owner_id=user.user['id'])
db.session.add(keyboard)
mouse = d.Mouse()
mouse = d.Mouse(owner_id=user.user['id'])
db.session.add(mouse)
db.session.commit()
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_networking_model():
router = d.Router(speed=1000, wireless=True)
def test_networking_model(user: UserClient):
router = d.Router(speed=1000, wireless=True, owner_id=user.user['id'])
db.session.add(router)
switch = d.Switch(speed=1000, wireless=False)
switch = d.Switch(speed=1000, wireless=False, owner_id=user.user['id'])
db.session.add(switch)
db.session.commit()
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_cooking_mixer():
mixer = d.Mixer(serial_number='foo', model='bar', manufacturer='foobar')
def test_cooking_mixer(user: UserClient):
mixer = d.Mixer(serial_number='foo', model='bar', manufacturer='foobar',
owner_id=user.user['id'])
db.session.add(mixer)
db.session.commit()

View File

@ -1,4 +1,5 @@
import pytest
import uuid
from teal.utils import compiled
from ereuse_devicehub.client import UserClient
@ -185,6 +186,26 @@ def test_device_query(user: UserClient):
assert not pc['tags']
@pytest.mark.mvp
def test_device_query_permitions(user: UserClient, user2: UserClient):
"""Checks result of inventory for two users"""
user.post(file('basic.snapshot'), res=Snapshot)
i, _ = user.get(res=Device)
pc1 = next(d for d in i['items'] if d['type'] == 'Desktop')
i2, _ = user2.get(res=Device)
assert i2['items'] == []
basic_snapshot = file('basic.snapshot')
basic_snapshot['uuid'] = f"{uuid.uuid4()}"
user2.post(basic_snapshot, res=Snapshot)
i2, _ = user2.get(res=Device)
pc2 = next(d for d in i2['items'] if d['type'] == 'Desktop')
assert pc1['id'] != pc2['id']
assert pc1['hid'] == pc2['hid']
@pytest.mark.mvp
def test_device_search_all_devices_token_if_empty(app: Devicehub, user: UserClient):
"""Ensures DeviceSearch can regenerate itself when the table is empty."""

View File

@ -4,6 +4,7 @@ from io import StringIO
from pathlib import Path
import pytest
from werkzeug.exceptions import Unauthorized
import teal.marshmallow
from ereuse_utils.test import ANY
@ -79,6 +80,29 @@ def test_erasure_certificate_wrong_id(client: Client):
status=teal.marshmallow.ValidationError)
@pytest.mark.mvp
def test_export_csv_permitions(user: UserClient, user2: UserClient, client: Client):
"""Test export device information in a csv file with others users."""
snapshot, _ = user.post(file('basic.snapshot'), res=Snapshot)
csv_user, _ = user.get(res=documents.DocumentDef.t,
item='devices/',
accept='text/csv',
query=[('filter', {'type': ['Computer']})])
csv_user2, _ = user2.get(res=documents.DocumentDef.t,
item='devices/',
accept='text/csv',
query=[('filter', {'type': ['Computer']})])
_, res = client.get(res=documents.DocumentDef.t,
item='devices/',
accept='text/csv',
query=[('filter', {'type': ['Computer']})], status=401)
assert res.status_code == 401
assert len(csv_user) > 0
assert len(csv_user2) == 0
@pytest.mark.mvp
def test_export_basic_snapshot(user: UserClient):
"""Test export device information in a csv file."""

View File

@ -2,6 +2,7 @@ import os
import json
import shutil
import pytest
import uuid
from datetime import datetime, timedelta, timezone
from requests.exceptions import HTTPError
@ -104,6 +105,24 @@ def test_snapshot_post(user: UserClient):
assert rate['snapshot']['id'] == snapshot['id']
@pytest.mark.mvp
def test_same_device_tow_users(user: UserClient, user2: UserClient):
"""Two users can up the same snapshot and the system save 2 computers"""
user.post(file('basic.snapshot'), res=Snapshot)
i, _ = user.get(res=m.Device)
pc = next(d for d in i['items'] if d['type'] == 'Desktop')
pc_id = pc['id']
assert i['items'][0]['url'] == f'/devices/{pc_id}'
basic_snapshot = file('basic.snapshot')
basic_snapshot['uuid'] = f"{uuid.uuid4()}"
user2.post(basic_snapshot, res=Snapshot)
i2, _ = user2.get(res=m.Device)
pc2 = next(d for d in i2['items'] if d['type'] == 'Desktop')
assert pc['id'] != pc2['id']
assert pc['ownerID'] != pc2['ownerID']
assert pc['hid'] == pc2['hid']
@pytest.mark.mvp
def test_snapshot_update_timefield_updated(user: UserClient):
"""
@ -253,7 +272,9 @@ def test_snapshot_component_add_remove(user: UserClient):
assert {c['serialNumber'] for c in pc1['components']} == {'p1c3s', 'p1c4s'}
assert all(c['parent'] == pc1_id for c in pc1['components'])
# This last Action only
assert get_actions_info(pc1['actions'])[-1] == ('RateComputer', ['p1c3s', 'p1c4s'])
act = get_actions_info(pc1['actions'])[-1]
assert 'RateComputer' in act
assert set(act[1]) == {'p1c3s', 'p1c4s'}
# PC2
# We haven't changed PC2
assert tuple(c['serialNumber'] for c in pc2['components']) == ('p2c1s',)