This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
devicehub-teal/ereuse_devicehub/resources/action/views/views.py

229 lines
8.0 KiB
Python

""" This is the view for Snapshots """
from datetime import timedelta
from distutils.version import StrictVersion
from uuid import UUID
import jwt
from flask import current_app as app, request, g
from teal.db import ResourceNotFound
from teal.marshmallow import ValidationError
from teal.resource import View
from ereuse_devicehub.db import db
from ereuse_devicehub.query import things_response
from ereuse_devicehub.resources.action.models import (Action, Snapshot, VisualTest,
InitTransfer, Live, Allocate, Deallocate,
Trade, Confirm, ConfirmRevoke, Revoke)
from ereuse_devicehub.resources.action.views import trade as trade_view
from ereuse_devicehub.resources.action.views.snapshot import SnapshotView, save_json, move_json
from ereuse_devicehub.resources.device.models import Device, Computer, DataStorage
from ereuse_devicehub.resources.enums import Severity
SUPPORTED_WORKBENCH = StrictVersion('11.0')
class AllocateMix():
model = None
def post(self):
""" Create one res_obj """
res_json = request.get_json()
res_obj = self.model(**res_json)
db.session.add(res_obj)
db.session().final_flush()
ret = self.schema.jsonify(res_obj)
ret.status_code = 201
db.session.commit()
return ret
def find(self, args: dict):
res_objs = self.model.query.filter_by(author=g.user) \
.order_by(self.model.created.desc()) \
.paginate(per_page=200)
return things_response(
self.schema.dump(res_objs.items, many=True, nested=0),
res_objs.page, res_objs.per_page, res_objs.total,
res_objs.prev_num, res_objs.next_num
)
class AllocateView(AllocateMix, View):
model = Allocate
class DeallocateView(AllocateMix, View):
model = Deallocate
class LiveView(View):
def post(self):
"""Posts an action."""
res_json = request.get_json(validate=False)
tmp_snapshots = app.config['TMP_LIVES']
path_live = save_json(res_json, tmp_snapshots, '', live=True)
res_json.pop('debug', None)
res_json.pop('elapsed', None)
res_json.pop('os', None)
res_json_valid = self.schema.load(res_json)
live = self.live(res_json_valid)
db.session.add(live)
db.session().final_flush()
ret = self.schema.jsonify(live)
ret.status_code = 201
db.session.commit()
move_json(tmp_snapshots, path_live, '', live=True)
return ret
def get_hdd_details(self, snapshot, device):
"""We get the liftime and serial_number of the disk"""
usage_time_hdd = None
serial_number = None
components = [c for c in snapshot['components']]
components.sort(key=lambda x: x.created)
for hd in components:
if not isinstance(hd, DataStorage):
continue
serial_number = hd.serial_number
for act in hd.actions:
if not act.type == "TestDataStorage":
continue
usage_time_hdd = act.lifetime
break
if usage_time_hdd:
break
if not serial_number:
"""There aren't any disk"""
raise ResourceNotFound("There aren't any disk in this device {}".format(device))
return usage_time_hdd, serial_number
def get_hid(self, snapshot):
device = snapshot.get('device') # type: Computer
components = snapshot.get('components')
if not device:
return None
if not components:
return device.hid
macs = [c.serial_number for c in components
if c.type == 'NetworkAdapter' and c.serial_number is not None]
macs.sort()
mac = ''
hid = device.hid
if not hid:
return hid
if macs:
mac = "-{mac}".format(mac=macs[0])
hid += mac
return hid
def live(self, snapshot):
"""If the device.allocated == True, then this snapshot create an action live."""
hid = self.get_hid(snapshot)
if not hid or not Device.query.filter(
Device.hid == hid).count():
raise ValidationError('Device not exist.')
device = Device.query.filter(
Device.hid == hid, Device.allocated == True).one()
# Is not necessary
if not device:
raise ValidationError('Device not exist.')
if not device.allocated:
raise ValidationError('Sorry this device is not allocated.')
usage_time_hdd, serial_number = self.get_hdd_details(snapshot, device)
data_live = {'usage_time_hdd': usage_time_hdd,
'serial_number': serial_number,
'snapshot_uuid': snapshot['uuid'],
'description': '',
'software': snapshot['software'],
'software_version': snapshot['version'],
'licence_version': snapshot['licence_version'],
'author_id': device.owner_id,
'agent_id': device.owner.individual.id,
'device': device}
live = Live(**data_live)
if not usage_time_hdd:
warning = f"We don't found any TestDataStorage for disk sn: {serial_number}"
live.severity = Severity.Warning
live.description = warning
return live
live.sort_actions()
diff_time = live.diff_time()
if diff_time is None:
warning = "Don't exist one previous live or snapshot as reference"
live.description += warning
live.severity = Severity.Warning
elif diff_time < timedelta(0):
warning = "The difference with the last live/snapshot is negative"
live.description += warning
live.severity = Severity.Warning
return live
class ActionView(View):
def post(self):
"""Posts an action."""
json = request.get_json(validate=False)
if not json:
raise ValidationError('Post request needs a json.')
elif 'type' not in json:
# JN TODO Use the user's key instead an empty string
key = ''
json = jwt.decode(json, key, algorithms="HS256")
# todo there should be a way to better get subclassess resource
# defs
resource_def = app.resources[json['type']]
if json['type'] == Snapshot.t:
snapshot = SnapshotView(json, resource_def, self.schema)
return snapshot.post()
if json['type'] == VisualTest.t:
pass
# TODO JN add compute rate with new visual test and old components device
if json['type'] == InitTransfer.t:
return self.transfer_ownership()
if json['type'] == Trade.t:
trade = trade_view.TradeView(json, resource_def, self.schema)
return trade.post()
if json['type'] == Confirm.t:
confirm = trade_view.ConfirmView(json, resource_def, self.schema)
return confirm.post()
if json['type'] == Revoke.t:
revoke = trade_view.RevokeView(json, resource_def, self.schema)
return revoke.post()
if json['type'] == ConfirmRevoke.t:
confirm_revoke = trade_view.ConfirmRevokeView(json, resource_def, self.schema)
return confirm_revoke.post()
a = resource_def.schema.load(json)
Model = db.Model._decl_class_registry.data[json['type']]()
action = Model(**a)
db.session.add(action)
db.session().final_flush()
ret = self.schema.jsonify(action)
ret.status_code = 201
db.session.commit()
return ret
def one(self, id: UUID):
"""Gets one action."""
action = Action.query.filter_by(id=id).one()
return self.schema.jsonify(action)
def transfer_ownership(self):
"""Perform a InitTransfer action to change author_id of device"""
pass