change is_temporary, is_outgoing, is_incoming

This commit is contained in:
Cayo Puigdefabregas 2022-05-27 16:48:40 +02:00
parent b2e11527cb
commit b8819b7a4a
1 changed files with 162 additions and 92 deletions

View File

@ -5,12 +5,11 @@ from typing import Union
from boltons import urlutils from boltons import urlutils
from citext import CIText from citext import CIText
from flask import g from flask import g
from flask_login import current_user
from sqlalchemy import TEXT from sqlalchemy import TEXT
from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy_utils import LtreeType from sqlalchemy_utils import LtreeType
from sqlalchemy_utils.types.ltree import LQUERY from sqlalchemy_utils.types.ltree import LQUERY
from teal.db import CASCADE_OWN, UUIDLtree, check_range, IntEnum from teal.db import CASCADE_OWN, IntEnum, UUIDLtree, check_range
from teal.resource import url_for_resource from teal.resource import url_for_resource
from ereuse_devicehub.db import create_view, db, exp, f from ereuse_devicehub.db import create_view, db, exp, f
@ -21,24 +20,29 @@ from ereuse_devicehub.resources.user.models import User
class Lot(Thing): class Lot(Thing):
id = db.Column(UUID(as_uuid=True), primary_key=True) # uuid is generated on init by default id = db.Column(
UUID(as_uuid=True), primary_key=True
) # uuid is generated on init by default
name = db.Column(CIText(), nullable=False) name = db.Column(CIText(), nullable=False)
description = db.Column(CIText()) description = db.Column(CIText())
description.comment = """A comment about the lot.""" description.comment = """A comment about the lot."""
closed = db.Column(db.Boolean, default=False, nullable=False) closed = db.Column(db.Boolean, default=False, nullable=False)
closed.comment = """A closed lot cannot be modified anymore.""" closed.comment = """A closed lot cannot be modified anymore."""
devices = db.relationship(Device, devices = db.relationship(
Device,
backref=db.backref('lots', lazy=True, collection_class=set), backref=db.backref('lots', lazy=True, collection_class=set),
secondary=lambda: LotDevice.__table__, secondary=lambda: LotDevice.__table__,
lazy=True, lazy=True,
collection_class=set) collection_class=set,
)
"""The **children** devices that the lot has. """The **children** devices that the lot has.
Note that the lot can have more devices, if they are inside Note that the lot can have more devices, if they are inside
descendant lots. descendant lots.
""" """
parents = db.relationship(lambda: Lot, parents = db.relationship(
lambda: Lot,
viewonly=True, viewonly=True,
lazy=True, lazy=True,
collection_class=set, collection_class=set,
@ -46,45 +50,58 @@ class Lot(Thing):
primaryjoin=lambda: Lot.id == LotParent.child_id, primaryjoin=lambda: Lot.id == LotParent.child_id,
secondaryjoin=lambda: LotParent.parent_id == Lot.id, secondaryjoin=lambda: LotParent.parent_id == Lot.id,
cascade='refresh-expire', # propagate changes outside ORM cascade='refresh-expire', # propagate changes outside ORM
backref=db.backref('children', backref=db.backref(
'children',
viewonly=True, viewonly=True,
lazy=True, lazy=True,
cascade='refresh-expire', cascade='refresh-expire',
collection_class=set) collection_class=set,
),
) )
"""The parent lots.""" """The parent lots."""
all_devices = db.relationship(Device, all_devices = db.relationship(
Device,
viewonly=True, viewonly=True,
lazy=True, lazy=True,
collection_class=set, collection_class=set,
secondary=lambda: LotDeviceDescendants.__table__, secondary=lambda: LotDeviceDescendants.__table__,
primaryjoin=lambda: Lot.id == LotDeviceDescendants.ancestor_lot_id, primaryjoin=lambda: Lot.id == LotDeviceDescendants.ancestor_lot_id,
secondaryjoin=lambda: LotDeviceDescendants.device_id == Device.id) secondaryjoin=lambda: LotDeviceDescendants.device_id == Device.id,
)
"""All devices, including components, inside this lot and its """All devices, including components, inside this lot and its
descendants. descendants.
""" """
amount = db.Column(db.Integer, check_range('amount', min=0, max=100), default=0) amount = db.Column(db.Integer, check_range('amount', min=0, max=100), default=0)
owner_id = db.Column(UUID(as_uuid=True), owner_id = db.Column(
UUID(as_uuid=True),
db.ForeignKey(User.id), db.ForeignKey(User.id),
nullable=False, nullable=False,
default=lambda: g.user.id) default=lambda: g.user.id,
)
owner = db.relationship(User, primaryjoin=owner_id == User.id) owner = db.relationship(User, primaryjoin=owner_id == User.id)
transfer_state = db.Column(IntEnum(TransferState), default=TransferState.Initial, nullable=False) transfer_state = db.Column(
IntEnum(TransferState), default=TransferState.Initial, nullable=False
)
transfer_state.comment = TransferState.__doc__ transfer_state.comment = TransferState.__doc__
receiver_address = db.Column(CIText(), receiver_address = db.Column(
CIText(),
db.ForeignKey(User.email), db.ForeignKey(User.email),
nullable=False, nullable=False,
default=lambda: g.user.email) default=lambda: g.user.email,
)
receiver = db.relationship(User, primaryjoin=receiver_address == User.email) receiver = db.relationship(User, primaryjoin=receiver_address == User.email)
def __init__(self, name: str, closed: bool = closed.default.arg, def __init__(
description: str = None) -> None: self, name: str, closed: bool = closed.default.arg, description: str = None
) -> None:
"""Initializes a lot """Initializes a lot
:param name: :param name:
:param closed: :param closed:
""" """
super().__init__(id=uuid.uuid4(), name=name, closed=closed, description=description) super().__init__(
id=uuid.uuid4(), name=name, closed=closed, description=description
)
Path(self) # Lots have always one edge per default. Path(self) # Lots have always one edge per default.
@property @property
@ -102,20 +119,32 @@ class Lot(Thing):
@property @property
def is_temporary(self): def is_temporary(self):
return not bool(self.trade) return not bool(self.trade) and not bool(self.transfer)
@property @property
def is_incoming(self): def is_incoming(self):
return bool(self.trade and self.trade.user_to == current_user) if self.trade:
return self.trade.user_to == g.user
if self.transfer:
return self.transfer.user_to == g.user
return False
@property @property
def is_outgoing(self): def is_outgoing(self):
return bool(self.trade and self.trade.user_from == current_user) if self.trade:
return self.trade.user_from == g.user
if self.transfer:
return self.transfer.user_from == g.user
return False
@classmethod @classmethod
def descendantsq(cls, id): def descendantsq(cls, id):
_id = UUIDLtree.convert(id) _id = UUIDLtree.convert(id)
return (cls.id == Path.lot_id) & Path.path.lquery(exp.cast('*.{}.*'.format(_id), LQUERY)) return (cls.id == Path.lot_id) & Path.path.lquery(
exp.cast('*.{}.*'.format(_id), LQUERY)
)
@classmethod @classmethod
def roots(cls): def roots(cls):
@ -176,13 +205,17 @@ class Lot(Thing):
if isinstance(child, Lot): if isinstance(child, Lot):
return Path.has_lot(self.id, child.id) return Path.has_lot(self.id, child.id)
elif isinstance(child, Device): elif isinstance(child, Device):
device = db.session.query(LotDeviceDescendants) \ device = (
.filter(LotDeviceDescendants.device_id == child.id) \ db.session.query(LotDeviceDescendants)
.filter(LotDeviceDescendants.ancestor_lot_id == self.id) \ .filter(LotDeviceDescendants.device_id == child.id)
.filter(LotDeviceDescendants.ancestor_lot_id == self.id)
.one_or_none() .one_or_none()
)
return device return device
else: else:
raise TypeError('Lot only contains devices and lots, not {}'.format(child.__class__)) raise TypeError(
'Lot only contains devices and lots, not {}'.format(child.__class__)
)
def __repr__(self) -> str: def __repr__(self) -> str:
return '<Lot {0.name} devices={0.devices!r}>'.format(self) return '<Lot {0.name} devices={0.devices!r}>'.format(self)
@ -192,35 +225,44 @@ class LotDevice(db.Model):
device_id = db.Column(db.BigInteger, db.ForeignKey(Device.id), primary_key=True) device_id = db.Column(db.BigInteger, db.ForeignKey(Device.id), primary_key=True)
lot_id = db.Column(UUID(as_uuid=True), db.ForeignKey(Lot.id), primary_key=True) lot_id = db.Column(UUID(as_uuid=True), db.ForeignKey(Lot.id), primary_key=True)
created = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) created = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
author_id = db.Column(UUID(as_uuid=True), author_id = db.Column(
UUID(as_uuid=True),
db.ForeignKey(User.id), db.ForeignKey(User.id),
nullable=False, nullable=False,
default=lambda: g.user.id) default=lambda: g.user.id,
)
author = db.relationship(User, primaryjoin=author_id == User.id) author = db.relationship(User, primaryjoin=author_id == User.id)
author_id.comment = """The user that put the device in the lot.""" author_id.comment = """The user that put the device in the lot."""
class Path(db.Model): class Path(db.Model):
id = db.Column(db.UUID(as_uuid=True), id = db.Column(
db.UUID(as_uuid=True),
primary_key=True, primary_key=True,
server_default=db.text('gen_random_uuid()')) server_default=db.text('gen_random_uuid()'),
)
lot_id = db.Column(db.UUID(as_uuid=True), db.ForeignKey(Lot.id), nullable=False) lot_id = db.Column(db.UUID(as_uuid=True), db.ForeignKey(Lot.id), nullable=False)
lot = db.relationship(Lot, lot = db.relationship(
backref=db.backref('paths', Lot,
lazy=True, backref=db.backref(
collection_class=set, 'paths', lazy=True, collection_class=set, cascade=CASCADE_OWN
cascade=CASCADE_OWN), ),
primaryjoin=Lot.id == lot_id) primaryjoin=Lot.id == lot_id,
)
path = db.Column(LtreeType, nullable=False) path = db.Column(LtreeType, nullable=False)
created = db.Column(db.TIMESTAMP(timezone=True), server_default=db.text('CURRENT_TIMESTAMP')) created = db.Column(
db.TIMESTAMP(timezone=True), server_default=db.text('CURRENT_TIMESTAMP')
)
created.comment = """When Devicehub created this.""" created.comment = """When Devicehub created this."""
__table_args__ = ( __table_args__ = (
# dag.delete_edge needs to disable internally/temporarily the unique constraint # dag.delete_edge needs to disable internally/temporarily the unique constraint
db.UniqueConstraint(path, name='path_unique', deferrable=True, initially='immediate'), db.UniqueConstraint(
path, name='path_unique', deferrable=True, initially='immediate'
),
db.Index('path_gist', path, postgresql_using='gist'), db.Index('path_gist', path, postgresql_using='gist'),
db.Index('path_btree', path, postgresql_using='btree'), db.Index('path_btree', path, postgresql_using='btree'),
db.Index('lot_id_index', lot_id, postgresql_using='hash') db.Index('lot_id_index', lot_id, postgresql_using='hash'),
) )
def __init__(self, lot: Lot) -> None: def __init__(self, lot: Lot) -> None:
@ -243,7 +285,9 @@ class Path(db.Model):
child_id = UUIDLtree.convert(child_id) child_id = UUIDLtree.convert(child_id)
return bool( return bool(
db.session.execute( db.session.execute(
"SELECT 1 from path where path ~ '*.{}.*.{}.*'".format(parent_id, child_id) "SELECT 1 from path where path ~ '*.{}.*.{}.*'".format(
parent_id, child_id
)
).first() ).first()
) )
@ -263,47 +307,73 @@ class LotDeviceDescendants(db.Model):
"""Ancestor lot table.""" """Ancestor lot table."""
_desc = Lot.__table__.alias() _desc = Lot.__table__.alias()
"""Descendant lot table.""" """Descendant lot table."""
lot_device = _desc \ lot_device = _desc.join(LotDevice, _desc.c.id == LotDevice.lot_id).join(
.join(LotDevice, _desc.c.id == LotDevice.lot_id) \ Path, _desc.c.id == Path.lot_id
.join(Path, _desc.c.id == Path.lot_id) )
"""Join: Path -- Lot -- LotDevice""" """Join: Path -- Lot -- LotDevice"""
descendants = "path.path ~ (CAST('*.'|| replace(CAST({}.id as text), '-', '_') " \ descendants = (
"path.path ~ (CAST('*.'|| replace(CAST({}.id as text), '-', '_') "
"|| '.*' AS LQUERY))".format(_ancestor.name) "|| '.*' AS LQUERY))".format(_ancestor.name)
)
"""Query that gets the descendants of the ancestor lot.""" """Query that gets the descendants of the ancestor lot."""
devices = db.select([ devices = (
db.select(
[
LotDevice.device_id, LotDevice.device_id,
_desc.c.id.label('parent_lot_id'), _desc.c.id.label('parent_lot_id'),
_ancestor.c.id.label('ancestor_lot_id'), _ancestor.c.id.label('ancestor_lot_id'),
None None,
]).select_from(_ancestor).select_from(lot_device).where(db.text(descendants)) ]
)
.select_from(_ancestor)
.select_from(lot_device)
.where(db.text(descendants))
)
# Components # Components
_parent_device = Device.__table__.alias(name='parent_device') _parent_device = Device.__table__.alias(name='parent_device')
"""The device that has the access to the lot.""" """The device that has the access to the lot."""
lot_device_component = lot_device \ lot_device_component = lot_device.join(
.join(_parent_device, _parent_device.c.id == LotDevice.device_id) \ _parent_device, _parent_device.c.id == LotDevice.device_id
.join(Component, _parent_device.c.id == Component.parent_id) ).join(Component, _parent_device.c.id == Component.parent_id)
"""Join: Path -- Lot -- LotDevice -- ParentDevice (Device) -- Component""" """Join: Path -- Lot -- LotDevice -- ParentDevice (Device) -- Component"""
components = db.select([ components = (
db.select(
[
Component.id.label('device_id'), Component.id.label('device_id'),
_desc.c.id.label('parent_lot_id'), _desc.c.id.label('parent_lot_id'),
_ancestor.c.id.label('ancestor_lot_id'), _ancestor.c.id.label('ancestor_lot_id'),
LotDevice.device_id.label('device_parent_id'), LotDevice.device_id.label('device_parent_id'),
]).select_from(_ancestor).select_from(lot_device_component).where(db.text(descendants)) ]
)
.select_from(_ancestor)
.select_from(lot_device_component)
.where(db.text(descendants))
)
__table__ = create_view('lot_device_descendants', devices.union(components)) __table__ = create_view('lot_device_descendants', devices.union(components))
class LotParent(db.Model): class LotParent(db.Model):
i = f.index(Path.path, db.func.text2ltree(f.replace(exp.cast(Path.lot_id, TEXT), '-', '_'))) i = f.index(
Path.path, db.func.text2ltree(f.replace(exp.cast(Path.lot_id, TEXT), '-', '_'))
)
__table__ = create_view( __table__ = create_view(
'lot_parent', 'lot_parent',
db.select([ db.select(
[
Path.lot_id.label('child_id'), Path.lot_id.label('child_id'),
exp.cast(f.replace(exp.cast(f.subltree(Path.path, i - 1, i), TEXT), '_', '-'), exp.cast(
UUID).label('parent_id') f.replace(
]).select_from(Path).where(i > 0), exp.cast(f.subltree(Path.path, i - 1, i), TEXT), '_', '-'
),
UUID,
).label('parent_id'),
]
)
.select_from(Path)
.where(i > 0),
) )