add ereuse_utils as module

This commit is contained in:
Cayo Puigdefabregas 2023-03-21 17:31:43 +01:00
parent 01ef359bd4
commit 83f1e4c18f
40 changed files with 8683 additions and 1654 deletions

View file

@ -2,16 +2,18 @@ import os
import click.testing
import flask.cli
import ereuse_utils
import ereuse_devicehub.ereuse_utils
from ereuse_devicehub.config import DevicehubConfig
from ereuse_devicehub.devicehub import Devicehub
import sys
sys.ps1 = '\001\033[92m\002>>> \001\033[0m\002'
sys.ps2= '\001\033[94m\002... \001\033[0m\002'
sys.ps2 = '\001\033[94m\002... \001\033[0m\002'
import os, readline, atexit
history_file = os.path.join(os.environ['HOME'], '.python_history')
try:
readline.read_history_file(history_file)
@ -29,6 +31,7 @@ readline.parse_and_bind('"\e[1;5D": backward-word')
readline.set_history_length(100000)
atexit.register(readline.write_history_file, history_file)
class DevicehubGroup(flask.cli.FlaskGroup):
# todo users cannot make cli to use a custom db this way!
CONFIG = DevicehubConfig
@ -49,17 +52,25 @@ class DevicehubGroup(flask.cli.FlaskGroup):
def get_version(ctx, param, value):
if not value or ctx.resilient_parsing:
return
click.echo('Devicehub {}'.format(ereuse_utils.version('ereuse-devicehub')), color=ctx.color)
click.echo(
'Devicehub {}'.format(
ereuse_devicehub.ereuse_utils.version('ereuse-devicehub')
),
color=ctx.color,
)
flask.cli.get_version(ctx, param, value)
@click.option('--version',
@click.option(
'--version',
help='Devicehub version.',
expose_value=False,
callback=get_version,
is_flag=True,
is_eager=True)
@click.group(cls=DevicehubGroup,
is_eager=True,
)
@click.group(
cls=DevicehubGroup,
context_settings=Devicehub.cli_context_settings,
add_version_option=False,
help="""Manages the Devicehub of the inventory {}.
@ -69,6 +80,9 @@ def get_version(ctx, param, value):
'dh tag add' adds a tag in the db1 database. Operations
that affect the common database (like creating an user)
are not affected by this.
""".format(os.environ.get('dhi')))
""".format(
os.environ.get('dhi')
),
)
def cli():
pass

View file

@ -1,7 +1,7 @@
from inspect import isclass
from typing import Dict, Iterable, Type, Union
from ereuse_utils.test import JSON, Res
from ereuse_devicehub.ereuse_utils.test import JSON, Res
from flask.testing import FlaskClient
from flask_wtf.csrf import generate_csrf
from werkzeug.exceptions import HTTPException

View file

@ -5,8 +5,8 @@ from typing import Type
import boltons.urlutils
import click
import click_spinner
import ereuse_utils.cli
from ereuse_utils.session import DevicehubClient
import ereuse_devicehub.ereuse_utils.cli
from ereuse_devicehub.ereuse_utils.session import DevicehubClient
from flask import _app_ctx_stack, g
from flask_login import LoginManager, current_user
from flask_sqlalchemy import SQLAlchemy
@ -122,7 +122,7 @@ class Devicehub(Teal):
@click.option(
'--tag-url',
'-tu',
type=ereuse_utils.cli.URL(scheme=True, host=True, path=False),
type=ereuse_devicehub.ereuse_utils.cli.URL(scheme=True, host=True, path=False),
default='http://example.com',
help='The base url (scheme and host) of the tag provider.',
)

View file

@ -5,10 +5,10 @@ from pathlib import Path
import click
import click_spinner
import ereuse_utils.cli
import jwt
import yaml
from ereuse_utils.test import ANY
from ereuse_devicehub.ereuse_utils.test import ANY
from ereuse_devicehub import ereuse_utils
from ereuse_devicehub.client import UserClient
from ereuse_devicehub.db import db

View file

@ -0,0 +1,173 @@
import enum
import ipaddress
import json
import locale
from collections import Iterable
from datetime import datetime, timedelta
from decimal import Decimal
from distutils.version import StrictVersion
from functools import wraps
from typing import Generator, Union
from uuid import UUID
class JSONEncoder(json.JSONEncoder):
"""An overloaded JSON Encoder with extra type support."""
def default(self, obj):
if isinstance(obj, enum.Enum):
return obj.name
elif isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, timedelta):
return round(obj.total_seconds())
elif isinstance(obj, UUID):
return str(obj)
elif isinstance(obj, StrictVersion):
return str(obj)
elif isinstance(obj, set):
return list(obj)
elif isinstance(obj, Decimal):
return float(obj)
elif isinstance(obj, Dumpeable):
return obj.dump()
elif isinstance(obj, ipaddress._BaseAddress):
return str(obj)
# Instead of failing, return the string representation by default
return str(obj)
class Dumpeable:
"""Dumps dictionaries and jsons for Devicehub.
A base class to allow subclasses to generate dictionaries
and json suitable for sending to a Devicehub, i.e. preventing
private and constants to be in the JSON and camelCases field names.
"""
ENCODER = JSONEncoder
def dump(self):
"""
Creates a dictionary consisting of the
non-private fields of this instance with camelCase field names.
"""
import inflection
return {
inflection.camelize(name, uppercase_first_letter=False): getattr(self, name)
for name in self._field_names()
if not name.startswith('_') and not name[0].isupper()
}
def _field_names(self):
"""An iterable of the names to dump."""
# Feel free to override this
return vars(self).keys()
def to_json(self):
"""
Creates a JSON representation of the non-private fields of
this class.
"""
return json.dumps(self, cls=self.ENCODER, indent=2)
class DumpeableModel(Dumpeable):
"""A dumpeable for SQLAlchemy models.
Note that this does not avoid recursive relations.
"""
def _field_names(self):
from sqlalchemy import inspect
return (a.key for a in inspect(self).attrs)
def ensure_utf8(app_name_to_show_on_error: str):
"""
Python3 uses by default the system set, but it expects it to be
utf-8 to work correctly.
This can generate problems in reading and writing files and in
``.decode()`` method.
An example how to 'fix' it::
echo 'export LC_CTYPE=en_US.UTF-8' > .bash_profile
echo 'export LC_ALL=en_US.UTF-8' > .bash_profile
"""
encoding = locale.getpreferredencoding()
if encoding.lower() != 'utf-8':
raise OSError(
'{} works only in UTF-8, but yours is set at {}'
''.format(app_name_to_show_on_error, encoding)
)
def now() -> datetime:
"""
Returns a compatible 'now' with DeviceHub's API,
this is as UTC and without microseconds.
"""
return datetime.utcnow().replace(microsecond=0)
def flatten_mixed(values: Iterable) -> Generator:
"""
Flatten a list containing lists and other elements. This is not deep.
>>> list(flatten_mixed([1, 2, [3, 4]]))
[1, 2, 3, 4]
"""
for x in values:
if isinstance(x, list):
for y in x:
yield y
else:
yield x
def if_none_return_none(f):
"""If the first value is None return None, otherwise execute f."""
@wraps(f)
def wrapper(self, value, *args, **kwargs):
if value is None:
return None
return f(self, value, *args, **kwargs)
return wrapper
def local_ip(
dest='109.69.8.152',
) -> Union[ipaddress.IPv4Address, ipaddress.IPv6Address]:
"""Gets the local IP of the interface that has access to the
Internet.
This is a reliable way to test if a device has an active
connection to the Internet.
This method works by connecting, by default,
to the IP of ereuse01.ereuse.org.
>>> local_ip()
:raise OSError: The device cannot connect to the Internet.
"""
import socket, ipaddress
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect((dest, 80))
ip = s.getsockname()[0]
s.close()
return ipaddress.ip_address(ip)
def version(package_name: str) -> StrictVersion:
"""Returns the version of a package name installed with pip."""
# From https://stackoverflow.com/a/2073599
import pkg_resources
return StrictVersion(pkg_resources.require(package_name)[0].version)

View file

@ -0,0 +1,301 @@
import enum as _enum
import getpass
import itertools
import os
import pathlib
import threading
from contextlib import contextmanager
from time import sleep
from typing import Any, Iterable, Type
from boltons import urlutils
from click import types as click_types
from colorama import Fore
from tqdm import tqdm
from ereuse_devicehub.ereuse_utils import if_none_return_none
COMMON_CONTEXT_S = {'help_option_names': ('-h', '--help')}
"""Common Context settings used for our implementations of the
Click cli.
"""
# Py2/3 compat. Empty conditional to avoid coverage
try:
_unicode = unicode
except NameError:
_unicode = str
class Enum(click_types.Choice):
"""
Enum support for click.
Use it as a collection: @click.option(..., type=cli.Enum(MyEnum)).
Then, this expects you to pass the *name* of a member of the enum.
From `this github issue <https://github.com/pallets/click/issues/
605#issuecomment-277539425>`_.
"""
def __init__(self, enum: Type[_enum.Enum]):
self.__enum = enum
super().__init__(enum.__members__)
def convert(self, value, param, ctx):
return self.__enum[super().convert(value, param, ctx)]
class Path(click_types.Path):
"""Like click.Path but returning ``pathlib.Path`` objects."""
def convert(self, value, param, ctx):
return pathlib.Path(super().convert(value, param, ctx))
class URL(click_types.StringParamType):
"""Returns a bolton's URL."""
name = 'url'
def __init__(
self,
scheme=None,
username=None,
password=None,
host=None,
port=None,
path=None,
query_params=None,
fragment=None,
) -> None:
super().__init__()
"""Creates the type URL. You can require or enforce parts
of the URL by setting parameters of this constructor.
If the param is...
- None, no check is performed (default).
- True, it is then required as part of the URL.
- False, it is then required NOT to be part of the URL.
- Any other value, then such value is required to be in
the URL.
"""
self.attrs = (
('scheme', scheme),
('username', username),
('password', password),
('host', host),
('port', port),
('path', path),
('query_params', query_params),
('fragment', fragment),
)
@if_none_return_none
def convert(self, value, param, ctx):
url = urlutils.URL(super().convert(value, param, ctx))
for name, attr in self.attrs:
if attr is True:
if not getattr(url, name):
self.fail(
'URL {} must contain {} but it does not.'.format(url, name)
)
elif attr is False:
if getattr(url, name):
self.fail('URL {} cannot contain {} but it does.'.format(url, name))
elif attr:
if getattr(url, name) != attr:
self.fail('{} form {} can only be {}'.format(name, url, attr))
return url
def password(service: str, username: str, prompt: str = 'Password:') -> str:
"""Gets a password from the keyring or the terminal."""
import keyring
return keyring.get_password(service, username) or getpass.getpass(prompt)
class Line(tqdm):
spinner_cycle = itertools.cycle(['-', '/', '|', '\\'])
def __init__(
self,
total=None,
desc=None,
leave=True,
file=None,
ncols=None,
mininterval=0.2,
maxinterval=10.0,
miniters=None,
ascii=None,
disable=False,
unit='it',
unit_scale=False,
dynamic_ncols=True,
smoothing=0.3,
bar_format=None,
initial=0,
position=None,
postfix=None,
unit_divisor=1000,
write_bytes=None,
gui=False,
close_message: Iterable = None,
error_message: Iterable = None,
**kwargs,
):
"""This cannot work with iterables. Iterable use is considered
backward-compatibility in tqdm and inconsistent in Line.
Manually call ``update``.
"""
self._close_message = close_message
self._error_message = error_message
if total:
bar_format = '{desc}{percentage:.1f}% |{bar}| {n:1g}/{total:1g} {elapsed}<{remaining}'
super().__init__(
None,
desc,
total,
leave,
file,
ncols,
mininterval,
maxinterval,
miniters,
ascii,
disable,
unit,
unit_scale,
dynamic_ncols,
smoothing,
bar_format,
initial,
position,
postfix,
unit_divisor,
write_bytes,
gui,
**kwargs,
)
def write_at_line(self, *args):
self.clear()
with self._lock:
self.display(''.join(str(arg) for arg in args))
def close_message(self, *args):
self._close_message = args
def error_message(self, *args):
self._error_message = args
def close(self):
"""
Cleanup and (if leave=False) close the progressbar.
"""
if self.disable:
return
# Prevent multiple closures
self.disable = True
# decrement instance pos and remove from internal set
pos = abs(self.pos)
self._decr_instances(self)
# GUI mode
if not hasattr(self, "sp"):
return
# annoyingly, _supports_unicode isn't good enough
def fp_write(s):
self.fp.write(_unicode(s))
try:
fp_write('')
except ValueError as e:
if 'closed' in str(e):
return
raise # pragma: no cover
with self._lock:
if self.leave:
if self._close_message:
self.display(
''.join(str(arg) for arg in self._close_message), pos=pos
)
elif self.last_print_n < self.n:
# stats for overall rate (no weighted average)
self.avg_time = None
self.display(pos=pos)
if not max(
[abs(getattr(i, "pos", 0)) for i in self._instances] + [pos]
):
# only if not nested (#477)
fp_write('\n')
else:
if self._close_message:
self.display(
''.join(str(arg) for arg in self._close_message), pos=pos
)
else:
self.display(msg='', pos=pos)
if not pos:
fp_write('\r')
@contextmanager
def spin(self, prefix: str):
self._stop_running = threading.Event()
spin_thread = threading.Thread(target=self._spin, args=[prefix])
spin_thread.start()
try:
yield
finally:
self._stop_running.set()
spin_thread.join()
def _spin(self, prefix: str):
while not self._stop_running.is_set():
self.write_at_line(prefix, next(self.spinner_cycle))
sleep(0.50)
@classmethod
@contextmanager
def reserve_lines(self, n):
try:
yield
finally:
self.move_down(n - 1)
@classmethod
def move_down(cls, n: int):
print('\n' * n)
def __exit__(self, *exc):
if exc[0]:
self._close_message = self._error_message
return super().__exit__(*exc)
def clear():
os.system('clear')
def title(text: Any, ljust=32) -> str:
# Note that is 38 px + 1 extra space = 39 min
return str(text).ljust(ljust) + ' '
def danger(text: Any) -> str:
return '{}{}{}'.format(Fore.RED, text, Fore.RESET)
def warning(text: Any) -> str:
return '{}{}{}'.format(Fore.YELLOW, text, Fore.RESET)
def done(text: Any = 'done.') -> str:
return '{}{}{}'.format(Fore.GREEN, text, Fore.RESET)

View file

@ -0,0 +1,150 @@
import subprocess
from contextlib import suppress
from typing import Any, Set, TextIO
from ereuse_devicehub.ereuse_utils import text
def run(
*cmd: Any,
out=subprocess.PIPE,
err=subprocess.DEVNULL,
to_string=True,
check=True,
shell=False,
**kwargs,
) -> subprocess.CompletedProcess:
"""subprocess.run with a better API.
:param cmd: A list of commands to execute as parameters.
Parameters will be passed-in to ``str()`` so they
can be any object that can handle str().
:param out: As ``subprocess.run.stdout``.
:param err: As ``subprocess.run.stderr``.
:param to_string: As ``subprocess.run.universal_newlines``.
:param check: As ``subprocess.run.check``.
:param shell:
:param kwargs: Any other parameters that ``subprocess.run``
accepts.
:return: The result of executing ``subprocess.run``.
"""
cmds = tuple(str(c) for c in cmd)
return subprocess.run(
' '.join(cmds) if shell else cmds,
stdout=out,
stderr=err,
universal_newlines=to_string,
check=check,
shell=shell,
**kwargs,
)
class ProgressiveCmd:
"""Executes a cmd while interpreting its completion percentage.
The completion percentage of the cmd is stored in
:attr:`.percentage` and the user can obtain percentage
increments by executing :meth:`.increment`.
This class is useful to use within a child thread, so a main
thread can request from time to time the percentage / increment
status of the running command.
"""
READ_LINE = None
DECIMALS = {4, 5, 6}
DECIMAL_NUMBERS = 2
INT = {1, 2, 3}
def __init__(
self,
*cmd: Any,
stdout=subprocess.DEVNULL,
number_chars: Set[int] = INT,
decimal_numbers: int = None,
read: int = READ_LINE,
callback=None,
check=True,
):
"""
:param cmd: The command to execute.
:param stderr: the stderr passed-in to Popen.
:param stdout: the stdout passed-in to Popen
:param number_chars: The number of chars used to represent
the percentage. Normalized cases are
:attr:`.DECIMALS` and :attr:`.INT`.
:param read: For commands that do not print lines, how many
characters we should read between updates.
The percentage should be between those
characters.
:param callback: If passed in, this method is executed every time
run gets an update from the command, passing
in the increment from the last execution.
If not passed-in, you can get such increment
by executing manually the ``increment`` method.
:param check: Raise error if subprocess return code is non-zero.
"""
self.cmd = tuple(str(c) for c in cmd)
self.read = read
self.step = 0
self.check = check
self.number_chars = number_chars
self.decimal_numbers = decimal_numbers
# We call subprocess in the main thread so the main thread
# can react on ``CalledProcessError`` exceptions
self.conn = conn = subprocess.Popen(
self.cmd, universal_newlines=True, stderr=subprocess.PIPE, stdout=stdout
)
self.out = (
conn.stdout if stdout == subprocess.PIPE else conn.stderr
) # type: TextIO
self._callback = callback
self.last_update_percentage = 0
self.percentage = 0
@property
def percentage(self):
return self._percentage
@percentage.setter
def percentage(self, v):
self._percentage = v
if self._callback and self._percentage > 0:
increment = self.increment()
if (
increment > 0
): # Do not bother calling if there has not been any increment
self._callback(increment, self._percentage)
def run(self) -> None:
"""Processes the output."""
while True:
out = self.out.read(self.read) if self.read else self.out.readline()
if out:
with suppress(StopIteration):
self.percentage = next(
text.positive_percentages(
out, self.number_chars, self.decimal_numbers
)
)
else: # No more output
break
return_code = self.conn.wait() # wait until cmd ends
if self.check and return_code != 0:
raise subprocess.CalledProcessError(
self.conn.returncode, self.conn.args, stderr=self.conn.stderr.read()
)
def increment(self):
"""Returns the increment of progression from
the last time this method is executed.
"""
# for cmd badblocks the increment can be negative at the
# beginning of the second step where last_percentage
# is 100 and percentage is 0. By using max we
# kind-of reset the increment and start counting for
# the second step
increment = max(self.percentage - self.last_update_percentage, 0)
self.last_update_percentage = self.percentage
return increment

View file

@ -0,0 +1,171 @@
"""Functions to get values from dictionaries and list encoded key-value
strings with meaningful indentations.
Values obtained from these functions are sanitized and automatically
(or explicitly set) casted. Sanitization includes removing unnecessary
whitespaces and removing useless keywords (in the context of
computer hardware) from the texts.
"""
import re
from itertools import chain
from typing import Any, Iterable, Set, Type, Union
from unittest.mock import DEFAULT
import boltons.iterutils
import yaml
from ereuse_devicehub.ereuse_utils.text import clean
def dict(
d: dict,
path: Union[str, tuple],
remove: Set[str] = set(),
default: Any = DEFAULT,
type: Type = None,
):
"""Gets a value from the dictionary and sanitizes it.
Values are patterned and compared against sets
of meaningless characters for device hardware.
:param d: A dictionary potentially containing the value.
:param path: The key or a tuple-path where the value should be.
:param remove: Remove these words if found.
:param default: A default value to return if not found. If not set,
an exception is raised.
:param type: Enforce a type on the value (like ``int``). By default
dict tries to guess the correct type.
"""
try:
v = boltons.iterutils.get_path(d, (path,) if isinstance(path, str) else path)
except KeyError:
return _default(path, default)
else:
return sanitize(v, remove, type=type)
def kv(
iterable: Iterable[str],
key: str,
default: Any = DEFAULT,
sep=':',
type: Type = None,
) -> Any:
"""Key-value. Gets a value from an iterable representing key values in the
form of a list of strings lines, for example an ``.ini`` or yaml file,
if they are opened with ``.splitlines()``.
:param iterable: An iterable of strings.
:param key: The key where the value should be.
:param default: A default value to return if not found. If not set,
an exception is raised.
:param sep: What separates the key from the value in the line.
Usually ``:`` or ``=``.
:param type: Enforce a type on the value (like ``int``). By default
dict tries to guess the correct type.
"""
for line in iterable:
try:
k, value, *_ = line.strip().split(sep)
except ValueError:
continue
else:
if key == k:
return sanitize(value, type=type)
return _default(key, default)
def indents(iterable: Iterable[str], keyword: str, indent=' '):
"""For a given iterable of strings, returns blocks of the same
left indentation.
For example:
foo1
bar1
bar2
foo2
foo2
For that text, this method would return ``[bar1, bar2]`` for passed-in
keyword ``foo1``.
:param iterable: A list of strings representing lines.
:param keyword: The title preceding the indentation.
:param indent: Which characters makes the indentation.
"""
section_pos = None
for i, line in enumerate(iterable):
if not line.startswith(indent):
if keyword in line:
section_pos = i
elif section_pos is not None:
yield iterable[section_pos:i]
section_pos = None
return
def _default(key, default):
if default is DEFAULT:
raise IndexError('Value {} not found.'.format(key))
else:
return default
"""Gets"""
TO_REMOVE = {'none', 'prod', 'o.e.m', 'oem', r'n/a', 'atapi', 'pc', 'unknown'}
"""Delete those *words* from the value"""
assert all(v.lower() == v for v in TO_REMOVE), 'All words need to be lower-case'
REMOVE_CHARS_BETWEEN = '(){}[]'
"""
Remove those *characters* from the value.
All chars inside those are removed. Ex: foo (bar) => foo
"""
CHARS_TO_REMOVE = '*'
"""Remove the characters.
'*' Needs to be removed or otherwise it is interpreted
as a glob expression by regexes.
"""
MEANINGLESS = {
'to be filled',
'system manufacturer',
'system product',
'sernum',
'xxxxx',
'system name',
'not specified',
'modulepartnumber',
'system serial',
'0001-067a-0000',
'partnum',
'manufacturer',
'0000000',
'fffff',
'jedec id:ad 00 00 00 00 00 00 00',
'012000',
'x.x',
'sku',
}
"""Discard a value if any of these values are inside it. """
assert all(v.lower() == v for v in MEANINGLESS), 'All values need to be lower-case'
def sanitize(value, remove=set(), type=None):
if value is None:
return None
remove = remove | TO_REMOVE
regex = r'({})\W'.format('|'.join(s for s in remove))
val = re.sub(regex, '', value, flags=re.IGNORECASE)
val = '' if val.lower() in remove else val # regex's `\W` != whole string
val = re.sub(r'\([^)]*\)', '', val) # Remove everything between
for char_to_remove in chain(REMOVE_CHARS_BETWEEN, CHARS_TO_REMOVE):
val = val.replace(char_to_remove, '')
val = clean(val)
if val and not any(meaningless in val.lower() for meaningless in MEANINGLESS):
return type(val) if type else yaml.load(val, Loader=yaml.SafeLoader)
else:
return None

View file

@ -0,0 +1,143 @@
from inflection import (
camelize,
dasherize,
parameterize,
pluralize,
singularize,
underscore,
)
HID_CONVERSION_DOC = """
The HID is the result of concatenating,
in the following order: the type of device (ex. Computer),
the manufacturer name, the model name, and the S/N. It is joined
with hyphens, and adapted to comply with the URI specification, so
it can be used in the URI identifying the device on the Internet.
The conversion is done as follows:
1. non-ASCII characters are converted to their ASCII equivalent or
removed.
2. Characterst that are not letters or numbers are converted to
underscores, in a way that there are no trailing underscores
and no underscores together, and they are set to lowercase.
Ex. ``laptop-acer-aod270-lusga_0d0242201212c7614``
"""
class Naming:
"""
In DeviceHub there are many ways to name the same resource (yay!), this is because of all the different
types of schemas we work with. But no worries, we offer easy ways to change between naming conventions.
- TypeCase (or resource-type) is the one represented with '@type' and follow PascalCase and always singular.
This is the standard preferred one.
- resource-case is the eve naming, using the standard URI conventions. This one is tricky, as although the types
are represented in singular, the URI convention is to be plural (Event vs events), however just few of them
follow this rule (Snapshot [type] to snapshot [resource]). You can set which ones you want to change their
number.
- python_case is the one used by python for its folders and modules. It is underscored and always singular.
"""
TYPE_PREFIX = ':'
RESOURCE_PREFIX = '_'
@staticmethod
def resource(string: str):
"""
:param string: String can be type, resource or python case
"""
try:
prefix, resulting_type = Naming.pop_prefix(string)
prefix += Naming.RESOURCE_PREFIX
except IndexError:
prefix = ''
resulting_type = string
resulting_type = dasherize(underscore(resulting_type))
return prefix + pluralize(resulting_type)
@staticmethod
def python(string: str):
"""
:param string: String can be type, resource or python case
"""
return underscore(singularize(string))
@staticmethod
def type(string: str):
try:
prefix, resulting_type = Naming.pop_prefix(string)
prefix += Naming.TYPE_PREFIX
except IndexError:
prefix = ''
resulting_type = string
resulting_type = singularize(resulting_type)
resulting_type = resulting_type.replace(
'-', '_'
) # camelize does not convert '-' but '_'
return prefix + camelize(resulting_type)
@staticmethod
def url_word(word: str):
"""
Normalizes a full word to be inserted to an url. If the word has spaces, etc, is used '_' and not '-'
"""
return parameterize(word, '_')
@staticmethod
def pop_prefix(string: str):
"""Erases the prefix and returns it.
:throws IndexError: There is no prefix.
:return A set with two elements: 1- the prefix, 2- the type without it.
"""
result = string.split(Naming.TYPE_PREFIX)
if len(result) == 1:
result = string.split(Naming.RESOURCE_PREFIX)
if len(result) == 1:
raise IndexError()
return result
@staticmethod
def new_type(type_name: str, prefix: str or None = None) -> str:
"""
Creates a resource type with optionally a prefix.
Using the rules of JSON-LD, we use prefixes to disambiguate between different types with the same name:
one can Accept a device or a project. In eReuse.org there are different events with the same names, in
linked-data terms they have different URI. In eReuse.org, we solve this with the following:
"@type": "devices:Accept" // the URI for these events is 'devices/events/accept'
"@type": "projects:Accept" // the URI for these events is 'projects/events/accept
...
Type is only used in events, when there are ambiguities. The rest of
"@type": "devices:Accept"
"@type": "Accept"
But these not:
"@type": "projects:Accept" // it is an event from a project
"@type": "Accept" // it is an event from a device
"""
if Naming.TYPE_PREFIX in type_name:
raise TypeError(
'Cannot create new type: type {} is already prefixed.'.format(type_name)
)
prefix = (prefix + Naming.TYPE_PREFIX) if prefix is not None else ''
return prefix + type_name
@staticmethod
def hid(type: str, manufacturer: str, model: str, serial_number: str) -> str:
(
"""Computes the HID for the given properties of a device.
The HID is suitable to use to an URI.
"""
+ HID_CONVERSION_DOC
)
return '{type}-{mn}-{ml}-{sn}'.format(
type=Naming.url_word(type),
mn=Naming.url_word(manufacturer),
ml=Naming.url_word(model),
sn=Naming.url_word(serial_number),
)

View file

@ -0,0 +1,88 @@
from typing import Generator
class NestedLookup:
@staticmethod
def __new__(cls, document, references, operation):
"""Lookup a key in a nested document, return a list of values
From https://github.com/russellballestrini/nested-lookup/ but in python 3
"""
return list(NestedLookup._nested_lookup(document, references, operation))
@staticmethod
def key_equality_factory(key_to_find):
def key_equality(key, _):
return key == key_to_find
return key_equality
@staticmethod
def is_sub_type_factory(type):
def _is_sub_type(_, value):
return is_sub_type(value, type)
return _is_sub_type
@staticmethod
def key_value_equality_factory(key_to_find, value_to_find):
def key_value_equality(key, value):
return key == key_to_find and value == value_to_find
return key_value_equality
@staticmethod
def key_value_containing_value_factory(key_to_find, value_to_find):
def key_value_containing_value(key, value):
return key == key_to_find and value_to_find in value
return key_value_containing_value
@staticmethod
def _nested_lookup(document, references, operation):
"""Lookup a key in a nested document, yield a value"""
if isinstance(document, list):
for d in document:
for result in NestedLookup._nested_lookup(d, references, operation):
yield result
if isinstance(document, dict):
for k, v in document.items():
if operation(k, v):
references.append((document, k))
yield v
elif isinstance(v, dict):
for result in NestedLookup._nested_lookup(v, references, operation):
yield result
elif isinstance(v, list):
for d in v:
for result in NestedLookup._nested_lookup(
d, references, operation
):
yield result
def is_sub_type(value, resource_type):
try:
return issubclass(value, resource_type)
except TypeError:
return issubclass(value.__class__, resource_type)
def get_nested_dicts_with_key_value(parent_dict: dict, key, value):
"""Return all nested dictionaries that contain a key with a specific value. A sub-case of NestedLookup."""
references = []
NestedLookup(
parent_dict, references, NestedLookup.key_value_equality_factory(key, value)
)
return (document for document, _ in references)
def get_nested_dicts_with_key_containing_value(parent_dict: dict, key, value):
"""Return all nested dictionaries that contain a key with a specific value. A sub-case of NestedLookup."""
references = []
NestedLookup(
parent_dict,
references,
NestedLookup.key_value_containing_value_factory(key, value),
)
return (document for document, _ in references)

View file

@ -0,0 +1,285 @@
import base64
import json
from typing import Any, Dict, Iterable, Tuple, TypeVar, Union
import boltons.urlutils
from requests import Response
from requests_toolbelt.sessions import BaseUrlSession
from urllib3 import Retry
from ereuse_devicehub import ereuse_utils
# mypy
Query = Iterable[Tuple[str, Any]]
Status = Union[int]
try:
from typing import Protocol # Only py 3.6+
except ImportError:
pass
else:
class HasStatusProperty(Protocol):
def __init__(self, *args, **kwargs) -> None:
self.status = ... # type: int
Status = Union[int, HasStatusProperty]
JSON = 'application/json'
ANY = '*/*'
AUTH = 'Authorization'
BASIC = 'Basic {}'
URL = Union[str, boltons.urlutils.URL]
Data = Union[str, dict, ereuse_utils.Dumpeable]
Res = Tuple[Union[Dict[str, Any], str], Response]
# actual code
class Session(BaseUrlSession):
"""A BaseUrlSession that always raises for status and sets a
timeout for all requests by default.
"""
def __init__(self, base_url=None, timeout=15):
"""
:param base_url:
:param timeout: Time requests will wait to receive the first
response bytes (not the whole) from the server. In seconds.
"""
super().__init__(base_url)
self.timeout = timeout
self.hooks['response'] = lambda r, *args, **kwargs: r.raise_for_status()
def request(self, method, url, *args, **kwargs):
kwargs.setdefault('timeout', self.timeout)
return super().request(method, url, *args, **kwargs)
def __repr__(self):
return '<{} base={}>.'.format(self.__class__.__name__, self.base_url)
class DevicehubClient(Session):
"""A Session pre-configured to connect to Devicehub-like APIs."""
def __init__(self, base_url: URL = None,
token: str = None,
inventory: Union[str, bool] = False,
**kwargs):
"""Initializes a session pointing to a Devicehub endpoint.
Authentication can be passed-in as a token for endpoints
that require them, now at ini, after when executing the method,
or in between with ``set_auth``.
:param base_url: An url pointing to a endpoint.
:param token: A Base64 encoded token, as given by a devicehub.
You can encode tokens by executing `encode_token`.
:param inventory: If True, use the default inventory of the user.
If False, do not use inventories (single-inventory
database, this is the option by default).
If a string, always use the set inventory.
"""
if isinstance(base_url, boltons.urlutils.URL):
base_url = base_url.to_text()
else:
base_url = str(base_url)
super().__init__(base_url, **kwargs)
assert base_url[-1] != '/', 'Do not provide a final slash to the URL'
if token:
self.set_auth(token)
self.inventory = inventory
self.user = None # type: Dict[str, object]
def set_auth(self, token):
self.headers['Authorization'] = 'Basic {}'.format(token)
@classmethod
def encode_token(cls, token: str):
"""Encodes a token suitable for a Devicehub endpoint."""
return base64.b64encode(str.encode(str(token) + ':')).decode()
def login(self, email: str, password: str) -> Dict[str, Any]:
"""Performs login, authenticating future requests.
:return: The logged-in user.
"""
user, _ = self.post('/users/login/', {'email': email, 'password': password}, status=200)
self.set_auth(user['token'])
self.user = user
self.inventory = user['inventories'][0]['id']
return user
def get(self,
base_url: URL,
uri=None,
status: Status = 200,
query: Query = tuple(),
accept=JSON,
content_type=JSON,
headers: dict = None,
token=None,
**kwargs) -> Res:
return super().get(base_url,
uri=uri,
status=status,
query=query,
accept=accept,
content_type=content_type,
headers=headers,
token=token, **kwargs)
def post(self, base_url: URL,
data: Data,
uri=None,
status: Status = 201,
query: Query = tuple(),
accept=JSON,
content_type=JSON,
headers: dict = None,
token=None,
**kwargs) -> Res:
return super().post(base_url,
data=data,
uri=uri,
status=status,
query=query,
accept=accept,
content_type=content_type,
headers=headers,
token=token, **kwargs)
def delete(self,
base_url: URL,
uri=None,
status: Status = 204,
query: Query = tuple(),
accept=JSON,
content_type=JSON,
headers: dict = None,
token=None,
**kwargs) -> Res:
return super().delete(base_url,
uri=uri,
status=status,
query=query,
accept=accept,
content_type=content_type,
headers=headers,
token=token, **kwargs)
def patch(self, base_url: URL,
data: Data,
uri=None,
status: Status = 201,
query: Query = tuple(),
accept=JSON,
content_type=JSON,
headers: dict = None,
token=None,
**kwargs) -> Res:
return super().patch(base_url,
data=data,
uri=uri,
status=status,
query=query,
accept=accept,
content_type=content_type,
headers=headers,
token=token, **kwargs)
def request(self,
method,
base_url: URL,
uri=None,
status: Status = 200,
query: Query = tuple(),
accept=JSON,
content_type=JSON,
data=None,
headers: dict = None,
token=None,
**kw) -> Res:
assert not kw.get('json', None), 'Do not use json; use data.'
# We allow uris without slashes for item endpoints
uri = str(uri) if uri else None
headers = headers or {}
headers['Accept'] = accept
headers['Content-Type'] = content_type
if token:
headers['Authorization'] = 'Basic {}'.format(token)
if data and content_type == JSON:
data = json.dumps(data, cls=ereuse_utils.JSONEncoder, sort_keys=True)
url = base_url if not isinstance(base_url, boltons.urlutils.URL) else base_url.to_text()
assert url[-1] == '/', 'base_url should end with a slash'
if self.inventory and not isinstance(self.inventory, bool):
url = '{}/{}'.format(self.inventory, base_url)
assert url[-1] == '/', 'base_url should end with a slash'
if uri:
url = self.parse_uri(url, uri)
if query:
url = self.parse_query(url, query)
response = super().request(method, url, data=data, headers=headers, **kw)
if status:
_status = getattr(status, 'code', status)
if _status != response.status_code:
raise WrongStatus('Req to {} failed bc the status is {} but it should have been {}'
.format(url, response.status_code, _status))
data = response.content if not accept == JSON or not response.content else response.json()
return data, response
@staticmethod
def parse_uri(base_url, uri):
return boltons.urlutils.URL(base_url).navigate(uri).to_text()
@staticmethod
def parse_query(uri, query):
url = boltons.urlutils.URL(uri)
url.query_params = boltons.urlutils.QueryParamDict([
(k, json.dumps(v, cls=ereuse_utils.JSONEncoder) if isinstance(v, (list, dict)) else v)
for k, v in query
])
return url.to_text()
def __repr__(self):
return '<{} base={} inv={} user={}>.'.format(self.__class__.__name__, self.base_url,
self.inventory, self.user)
class WrongStatus(Exception):
pass
import requests
from requests.adapters import HTTPAdapter
T = TypeVar('T', bound=requests.Session)
def retry(session: T,
retries=3,
backoff_factor=1,
status_to_retry=(500, 502, 504)) -> T:
"""Configures requests from the given session to retry in
failed requests due to connection errors, HTTP response codes
with ``status_to_retry`` and 30X redirections.
Remember that you still need
"""
# From https://www.peterbe.com/plog/best-practice-with-retries-with-requests
# Doc in https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#module-urllib3.util.retry
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_to_retry,
method_whitelist=False # Retry too in non-idempotent methods like POST
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session

View file

@ -0,0 +1,165 @@
from contextlib import suppress
from typing import Dict, Tuple, Union
from flask import json
from flask.testing import FlaskClient
from werkzeug.wrappers import Response
from ereuse_devicehub.ereuse_utils.session import ANY, AUTH, BASIC, DevicehubClient, JSON, Query, Status
ANY = ANY
AUTH = AUTH
BASIC = BASIC
Res = Tuple[Union[Dict[str, object], str], Response]
class Client(FlaskClient):
"""
A client for the REST servers of DeviceHub and WorkbenchServer.
- JSON first. By default it sends and expects receiving JSON files.
- Assert regular status responses, like 200 for GET.
- Auto-parses a nested dictionary of URL query params to the
URL version with nested properties to JSON.
- Meaningful headers format: a dictionary of name-values.
"""
def open(self,
uri: str,
status: Status = 200,
query: Query = tuple(),
accept=JSON,
content_type=JSON,
item=None,
headers: dict = None,
**kw) -> Res:
"""
:param uri: The URI without basename and query.
:param status: Assert the response for specified status. Set
None to avoid.
:param query: The query of the URL in the form of
[(key1, value1), (key2, value2), (key1, value3)].
If value is a list or a dict, they will be
converted to JSON.
Please, see :class:`boltons.urlutils`.
QueryParamDict` for more info.
:param accept: The Accept header. If 'application/json'
(default) then it will parse incoming JSON.
:param item: The last part of the path. Useful to do something
like ``get('db/accounts', item='24')``. If you
use ``item``, you can't set a final backslash into
``uri`` (or the parse will fail).
:param headers: A dictionary of headers, where keys are header
names and values their values.
Ex: {'Accept', 'application/json'}.
:param kw: Kwargs passed into parent ``open``.
:return: A tuple with: 1. response data, as a string or JSON
depending of Accept, and 2. the Response object.
"""
j_encoder = self.application.json_encoder
headers = headers or {}
headers['Accept'] = accept
headers['Content-Type'] = content_type
headers = [(k, v) for k, v in headers.items()]
if 'data' in kw and content_type == JSON:
kw['data'] = json.dumps(kw['data'], cls=j_encoder)
if item:
uri = DevicehubClient.parse_uri(uri, item)
if query:
uri = DevicehubClient.parse_query(uri, query)
response = super().open(uri, headers=headers, **kw)
if status:
_status = getattr(status, 'code', status)
assert response.status_code == _status, \
'Expected status code {} but got {}. Returned data is:\n' \
'{}'.format(_status, response.status_code, response.get_data().decode())
data = response.get_data()
with suppress(UnicodeDecodeError):
data = data.decode()
if accept == JSON:
data = json.loads(data) if data else {}
return data, response
def get(self,
uri: str,
query: Query = tuple(),
item: str = None,
status: Status = 200,
accept: str = JSON,
headers: dict = None,
**kw) -> Res:
"""
Performs a GET.
See the parameters in :meth:`ereuse_utils.test.Client.open`.
Moreover:
:param query: A dictionary of query params. If a parameter is a
dict or a list, it will be parsed to JSON, then
all params are encoded with ``urlencode``.
:param kw: Kwargs passed into parent ``open``.
"""
return super().get(uri, item=item, status=status, accept=accept, headers=headers,
query=query, **kw)
def post(self,
uri: str,
data: str or dict,
query: Query = tuple(),
status: Status = 201,
content_type: str = JSON,
accept: str = JSON,
headers: dict = None,
**kw) -> Res:
"""
Performs a POST.
See the parameters in :meth:`ereuse_utils.test.Client.open`.
"""
return super().post(uri, data=data, status=status, content_type=content_type,
accept=accept, headers=headers, query=query, **kw)
def patch(self,
uri: str,
data: str or dict,
query: Query = tuple(),
status: Status = 200,
content_type: str = JSON,
item: str = None,
accept: str = JSON,
headers: dict = None,
**kw) -> Res:
"""
Performs a PATCH.
See the parameters in :meth:`ereuse_utils.test.Client.open`.
"""
return super().patch(uri, item=item, data=data, status=status, content_type=content_type,
accept=accept, headers=headers, query=query, **kw)
def put(self,
uri: str,
data: str or dict,
query: Query = tuple(),
status: Status = 201,
content_type: str = JSON,
item: str = None,
accept: str = JSON,
headers: dict = None,
**kw) -> Res:
return super().put(uri, item=item, data=data, status=status, content_type=content_type,
accept=accept, headers=headers, query=query, **kw)
def delete(self,
uri: str,
query: Query = tuple(),
item: str = None,
status: Status = 204,
accept: str = JSON,
headers: dict = None,
**kw) -> Res:
return super().delete(uri, query=query, item=item, status=status, accept=accept,
headers=headers, **kw)

View file

@ -0,0 +1,72 @@
import ast
import re
from typing import Iterator, Set, Union
def grep(text: str, value: str):
"""An easy 'grep -i' that yields lines where value is found."""
for line in text.splitlines():
if value in line:
yield line
def between(text: str, begin='(', end=')'):
"""Dead easy text between two characters.
Not recursive or repetitions.
"""
return text.split(begin)[-1].split(end)[0]
def numbers(text: str) -> Iterator[Union[int, float]]:
"""Gets numbers in strings with other characters.
Integer Numbers: 1 2 3 987 +4 -8
Decimal Numbers: 0.1 2. .3 .987 +4.0 -0.8
Scientific Notation: 1e2 0.2e2 3.e2 .987e2 +4e-1 -8.e+2
Numbers with percentages: 49% 32.39%
This returns int or float.
"""
# From https://regexr.com/33jqd
for x in re.finditer(r'[+-]?(?=\.\d|\d)(?:\d+)?(?:\.?\d*)(?:[eE][+-]?\d+)?', text):
yield ast.literal_eval(x.group())
def positive_percentages(
text: str, lengths: Set[int] = None, decimal_numbers: int = None
) -> Iterator[Union[int, float]]:
"""Gets numbers postfixed with a '%' in strings with other characters.
1)100% 2)56.78% 3)56 78.90% 4)34.6789% some text
:param text: The text to search for.
:param lengths: A set of lengths that the percentage
number should have to be considered valid.
Ex. {5,6} would validate '90.32' and '100.00'
"""
# From https://regexr.com/3aumh
for x in re.finditer(r'[\d|\.]+%', text):
num = x.group()[:-1]
if lengths:
if not len(num) in lengths:
continue
if decimal_numbers:
try:
pos = num.rindex('.')
except ValueError:
continue
else:
if len(num) - pos - 1 != decimal_numbers:
continue
yield float(num)
def macs(text: str) -> Iterator[str]:
"""Find MACs in strings with other characters."""
for x in re.finditer('{0}:{0}:{0}:{0}:{0}:{0}'.format(r'[a-fA-F0-9.+_-]+'), text):
yield x.group()
def clean(text: str) -> str:
"""Trims the text and replaces multiple spaces with a single space."""
return ' '.join(text.split())

View file

@ -0,0 +1,80 @@
import usb.core
import usb.util
from usb import CLASS_MASS_STORAGE
from ereuse_devicehub.ereuse_utils.naming import Naming
def plugged_usbs(multiple=True) -> map or dict:
"""
Gets the plugged-in USB Flash drives (pen-drives).
If multiple is true, it returns a map, and a dict otherwise.
If multiple is false, this method will raise a :class:`.NoUSBFound` if no USB is found.
"""
class FindPenDrives(object):
# From https://github.com/pyusb/pyusb/blob/master/docs/tutorial.rst
def __init__(self, class_):
self._class = class_
def __call__(self, device):
# first, let's check the device
if device.bDeviceClass == self._class:
return True
# ok, transverse all devices to find an
# interface that matches our class
for cfg in device:
# find_descriptor: what's it?
intf = usb.util.find_descriptor(cfg, bInterfaceClass=self._class)
# We don't want Card readers
if intf is not None:
try:
product = intf.device.product.lower()
except ValueError as e:
if 'langid' in str(e):
raise OSError(
'Cannot get "langid". Do you have permissions?'
)
else:
raise e
if 'crw' not in product and 'reader' not in product:
return True
return False
def get_pendrive(pen: usb.Device) -> dict:
if not pen.manufacturer or not pen.product or not pen.serial_number:
raise UsbDoesNotHaveHid()
manufacturer = pen.manufacturer.strip() or str(pen.idVendor)
model = pen.product.strip() or str(pen.idProduct)
serial_number = pen.serial_number.strip()
hid = Naming.hid('USBFlashDrive', manufacturer, model, serial_number)
return {
'id': hid, # Make live easier to DeviceHubClient by using _id
'hid': hid,
'type': 'USBFlashDrive',
'serialNumber': serial_number,
'model': model,
'manufacturer': manufacturer,
'vendorId': pen.idVendor,
'productId': pen.idProduct,
}
result = usb.core.find(
find_all=multiple, custom_match=FindPenDrives(CLASS_MASS_STORAGE)
)
if multiple:
return map(get_pendrive, result)
else:
if not result:
raise NoUSBFound()
return get_pendrive(result)
class NoUSBFound(Exception):
pass
class UsbDoesNotHaveHid(Exception):
pass

View file

@ -9,7 +9,7 @@ from alembic import op
import sqlalchemy as sa
import sqlalchemy_utils
import citext
import teal
from ereuse_devicehub import teal
${imports if imports else ""}
# revision identifiers, used by Alembic.

View file

@ -10,7 +10,7 @@ from alembic import op
import sqlalchemy as sa
import sqlalchemy_utils
import citext
import teal
from ereuse_devicehub import teal
# revision identifiers, used by Alembic.
@ -26,11 +26,32 @@ def get_inv():
raise ValueError("Inventory value is not specified")
return INV
def upgrade():
op.alter_column('test_data_storage', 'current_pending_sector_count', type_=sa.Integer(), schema=f'{get_inv()}')
op.alter_column('test_data_storage', 'offline_uncorrectable', type_=sa.Integer(), schema=f'{get_inv()}')
op.alter_column(
'test_data_storage',
'current_pending_sector_count',
type_=sa.Integer(),
schema=f'{get_inv()}',
)
op.alter_column(
'test_data_storage',
'offline_uncorrectable',
type_=sa.Integer(),
schema=f'{get_inv()}',
)
def downgrade():
op.alter_column('test_data_storage', 'current_pending_sector_count', type_=sa.SmallInteger(), schema=f'{get_inv()}')
op.alter_column('test_data_storage', 'offline_uncorrectable', type_=sa.SmallInteger(), schema=f'{get_inv()}')
op.alter_column(
'test_data_storage',
'current_pending_sector_count',
type_=sa.SmallInteger(),
schema=f'{get_inv()}',
)
op.alter_column(
'test_data_storage',
'offline_uncorrectable',
type_=sa.SmallInteger(),
schema=f'{get_inv()}',
)

View file

@ -11,7 +11,7 @@ from sqlalchemy.dialects import postgresql
import sqlalchemy as sa
import sqlalchemy_utils
import citext
import teal
from ereuse_devicehub import teal
from ereuse_devicehub.resources.enums import SessionType

View file

@ -5,12 +5,12 @@ Revises: bf600ca861a4
Create Date: 2020-12-16 11:45:13.339624
"""
from alembic import context
from alembic import op
import citext
import sqlalchemy as sa
import sqlalchemy_utils
import citext
import teal
from alembic import context
from alembic import op
from ereuse_devicehub import teal
# revision identifiers, used by Alembic.

View file

@ -5,15 +5,14 @@ Revises: 51439cf24be8
Create Date: 2021-06-15 14:38:59.931818
"""
import teal
import citext
import sqlalchemy as sa
from ereuse_devicehub import teal
from alembic import op
from alembic import context
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = '3a3601ac8224'
down_revision = '51439cf24be8'
@ -27,108 +26,143 @@ def get_inv():
raise ValueError("Inventory value is not specified")
return INV
def upgrade():
op.create_table('trade_document',
op.create_table(
'trade_document',
sa.Column(
'updated',
sa.TIMESTAMP(timezone=True),
server_default=sa.text('CURRENT_TIMESTAMP'),
nullable=False,
comment='The last time Devicehub recorded a change for \n this thing.\n '
comment='The last time Devicehub recorded a change for \n this thing.\n ',
),
sa.Column(
'created',
sa.TIMESTAMP(timezone=True),
server_default=sa.text('CURRENT_TIMESTAMP'),
nullable=False,
comment='When Devicehub created this.'
comment='When Devicehub created this.',
),
sa.Column(
'id',
sa.BigInteger(),
nullable=False,
comment='The identifier of the device for this database. Used only\n internally for software; users should not use this.\n '
comment='The identifier of the device for this database. Used only\n internally for software; users should not use this.\n ',
),
sa.Column(
'date',
sa.DateTime(),
nullable=True,
comment='The date of document, some documents need to have one date\n '
comment='The date of document, some documents need to have one date\n ',
),
sa.Column(
'id_document',
citext.CIText(),
nullable=True,
comment='The id of one document like invoice so they can be linked.'
comment='The id of one document like invoice so they can be linked.',
),
sa.Column(
'description',
citext.CIText(),
nullable=True,
comment='A description of document.'
),
sa.Column(
'owner_id',
postgresql.UUID(as_uuid=True),
nullable=False
),
sa.Column(
'lot_id',
postgresql.UUID(as_uuid=True),
nullable=False
comment='A description of document.',
),
sa.Column('owner_id', postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('lot_id', postgresql.UUID(as_uuid=True), nullable=False),
sa.Column(
'file_name',
citext.CIText(),
nullable=True,
comment='This is the name of the file when user up the document.'
comment='This is the name of the file when user up the document.',
),
sa.Column(
'file_hash',
citext.CIText(),
nullable=True,
comment='This is the hash of the file produced from frontend.'
comment='This is the hash of the file produced from frontend.',
),
sa.Column(
'url',
citext.CIText(),
teal.db.URL(),
nullable=True,
comment='This is the url where resides the document.'
comment='This is the url where resides the document.',
),
sa.ForeignKeyConstraint(
['lot_id'],
[f'{get_inv()}.lot.id'],
),
sa.ForeignKeyConstraint(
['owner_id'],
['common.user.id'],
),
sa.ForeignKeyConstraint(['lot_id'], [f'{get_inv()}.lot.id'],),
sa.ForeignKeyConstraint(['owner_id'], ['common.user.id'],),
sa.PrimaryKeyConstraint('id'),
schema=f'{get_inv()}'
schema=f'{get_inv()}',
)
# Action document table
op.create_table('action_trade_document',
op.create_table(
'action_trade_document',
sa.Column('document_id', sa.BigInteger(), nullable=False),
sa.Column('action_id', postgresql.UUID(as_uuid=True), nullable=False),
sa.ForeignKeyConstraint(['action_id'], [f'{get_inv()}.action.id'], ),
sa.ForeignKeyConstraint(['document_id'], [f'{get_inv()}.trade_document.id'], ),
sa.ForeignKeyConstraint(
['action_id'],
[f'{get_inv()}.action.id'],
),
sa.ForeignKeyConstraint(
['document_id'],
[f'{get_inv()}.trade_document.id'],
),
sa.PrimaryKeyConstraint('document_id', 'action_id'),
schema=f'{get_inv()}'
schema=f'{get_inv()}',
)
op.create_index('document_id', 'trade_document', ['id'], unique=False, postgresql_using='hash', schema=f'{get_inv()}')
op.create_index(op.f('ix_trade_document_created'), 'trade_document', ['created'], unique=False, schema=f'{get_inv()}')
op.create_index(op.f('ix_trade_document_updated'), 'trade_document', ['updated'], unique=False, schema=f'{get_inv()}')
op.create_index(
'document_id',
'trade_document',
['id'],
unique=False,
postgresql_using='hash',
schema=f'{get_inv()}',
)
op.create_index(
op.f('ix_trade_document_created'),
'trade_document',
['created'],
unique=False,
schema=f'{get_inv()}',
)
op.create_index(
op.f('ix_trade_document_updated'),
'trade_document',
['updated'],
unique=False,
schema=f'{get_inv()}',
)
op.create_table('confirm_document',
op.create_table(
'confirm_document',
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('user_id', postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('action_id', postgresql.UUID(as_uuid=True), nullable=False),
sa.ForeignKeyConstraint(['id'], [f'{get_inv()}.action.id'], ),
sa.ForeignKeyConstraint(['action_id'], [f'{get_inv()}.action.id'], ),
sa.ForeignKeyConstraint(['user_id'], ['common.user.id'], ),
sa.ForeignKeyConstraint(
['id'],
[f'{get_inv()}.action.id'],
),
sa.ForeignKeyConstraint(
['action_id'],
[f'{get_inv()}.action.id'],
),
sa.ForeignKeyConstraint(
['user_id'],
['common.user.id'],
),
sa.PrimaryKeyConstraint('id'),
schema=f'{get_inv()}'
schema=f'{get_inv()}',
)
def downgrade():
op.drop_table('action_trade_document', schema=f'{get_inv()}')
op.drop_table('confirm_document', schema=f'{get_inv()}')
op.drop_table('trade_document', schema=f'{get_inv()}')

View file

@ -7,7 +7,7 @@ Create Date: 2023-02-13 18:01:00.092527
"""
import citext
import sqlalchemy as sa
import teal
from ereuse_devicehub import teal
from alembic import context, op
from sqlalchemy.dialects import postgresql

View file

@ -9,7 +9,7 @@ from alembic import op
import sqlalchemy as sa
import sqlalchemy_utils
import citext
import teal
from ereuse_devicehub import teal
from alembic import op
from alembic import context
@ -32,13 +32,23 @@ def get_inv():
def upgrade():
# Document table
op.create_table('document',
op.create_table(
'document',
sa.Column('id', sa.BigInteger(), nullable=False),
sa.Column('updated', sa.TIMESTAMP(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'),
sa.Column(
'updated',
sa.TIMESTAMP(timezone=True),
server_default=sa.text('CURRENT_TIMESTAMP'),
nullable=False,
comment='The last time Document recorded a change for \n this thing.\n '),
sa.Column('created', sa.TIMESTAMP(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'),
nullable=False, comment='When Document created this.'),
comment='The last time Document recorded a change for \n this thing.\n ',
),
sa.Column(
'created',
sa.TIMESTAMP(timezone=True),
server_default=sa.text('CURRENT_TIMESTAMP'),
nullable=False,
comment='When Document created this.',
),
sa.Column('document_type', sa.Unicode(), nullable=False),
sa.Column('date', sa.TIMESTAMP(timezone=True), nullable=True),
sa.Column('id_document', sa.Unicode(), nullable=True),
@ -46,36 +56,73 @@ def upgrade():
sa.Column('file_name', sa.Unicode(), nullable=False),
sa.Column('file_hash', sa.Unicode(), nullable=False),
sa.Column('url', sa.Unicode(), nullable=True),
sa.ForeignKeyConstraint(['owner_id'], ['common.user.id'], ),
sa.ForeignKeyConstraint(
['owner_id'],
['common.user.id'],
),
sa.PrimaryKeyConstraint('id'),
schema=f'{get_inv()}'
schema=f'{get_inv()}',
)
op.create_index(
'generic_document_id',
'document',
['id'],
unique=False,
postgresql_using='hash',
schema=f'{get_inv()}',
)
op.create_index(
op.f('ix_document_created'),
'document',
['created'],
unique=False,
schema=f'{get_inv()}',
)
op.create_index(
op.f('ix_document_updated'),
'document',
['updated'],
unique=False,
schema=f'{get_inv()}',
)
op.create_index(
'document_type_index',
'document',
['document_type'],
unique=False,
postgresql_using='hash',
schema=f'{get_inv()}',
)
op.create_index('generic_document_id', 'document', ['id'], unique=False, postgresql_using='hash', schema=f'{get_inv()}')
op.create_index(op.f('ix_document_created'), 'document', ['created'], unique=False, schema=f'{get_inv()}')
op.create_index(op.f('ix_document_updated'), 'document', ['updated'], unique=False, schema=f'{get_inv()}')
op.create_index('document_type_index', 'document', ['document_type'], unique=False, postgresql_using='hash', schema=f'{get_inv()}')
# DataWipeDocument table
op.create_table('data_wipe_document',
op.create_table(
'data_wipe_document',
sa.Column('id', sa.BigInteger(), nullable=False),
sa.Column('software', sa.Unicode(), nullable=True),
sa.Column('success', sa.Boolean(), nullable=False),
sa.ForeignKeyConstraint(['id'], [f'{get_inv()}.document.id'], ),
sa.ForeignKeyConstraint(
['id'],
[f'{get_inv()}.document.id'],
),
sa.PrimaryKeyConstraint('id'),
schema=f'{get_inv()}'
schema=f'{get_inv()}',
)
# DataWipe table
op.create_table('data_wipe',
op.create_table(
'data_wipe',
sa.Column('document_id', sa.BigInteger(), nullable=False),
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
sa.ForeignKeyConstraint(['document_id'], [f'{get_inv()}.document.id'], ),
sa.ForeignKeyConstraint(['id'], [f'{get_inv()}.action.id'], ),
sa.ForeignKeyConstraint(
['document_id'],
[f'{get_inv()}.document.id'],
),
sa.ForeignKeyConstraint(
['id'],
[f'{get_inv()}.action.id'],
),
sa.PrimaryKeyConstraint('id'),
schema=f'{get_inv()}'
schema=f'{get_inv()}',
)

View file

@ -10,7 +10,7 @@ from alembic import context
import sqlalchemy as sa
import sqlalchemy_utils
import citext
import teal
from ereuse_devicehub import teal
# revision identifiers, used by Alembic.
@ -26,10 +26,10 @@ def get_inv():
raise ValueError("Inventory value is not specified")
return INV
def upgrade():
con = op.get_bind()
confirmsRevokes_sql = f"select * from {get_inv()}.action as action join {get_inv()}.confirm as confirm on action.id=confirm.id where action.type='ConfirmRevoke'"
revokes_sql = f"select confirm.id, confirm.action_id from {get_inv()}.action as action join {get_inv()}.confirm as confirm on action.id=confirm.id where action.type='Revoke'"
confirmsRevokes = [a for a in con.execute(confirmsRevokes_sql)]
@ -40,12 +40,12 @@ def upgrade():
revoke_id = ac.action_id
trade_id = revokes[revoke_id]
sql_action = f"update {get_inv()}.action set type='Revoke' where id='{ac_id}'"
sql_confirm = f"update {get_inv()}.confirm set action_id='{trade_id}' where id='{ac_id}'"
sql_confirm = (
f"update {get_inv()}.confirm set action_id='{trade_id}' where id='{ac_id}'"
)
con.execute(sql_action)
con.execute(sql_confirm)
def downgrade():
pass

View file

@ -6,7 +6,7 @@ Create Date: 2020-12-29 20:19:46.981207
"""
import sqlalchemy as sa
import teal
from ereuse_devicehub import teal
from alembic import context, op
from sqlalchemy.dialects import postgresql

View file

@ -10,7 +10,7 @@ from alembic import op
import sqlalchemy as sa
import sqlalchemy_utils
import citext
import teal
from ereuse_devicehub import teal
# revision identifiers, used by Alembic.
@ -26,6 +26,7 @@ def get_inv():
raise ValueError("Inventory value is not specified")
return INV
def upgrade():
con = op.get_bind()
sql = f"""
@ -60,6 +61,5 @@ def upgrade():
con.execute(sql)
def downgrade():
pass

View file

@ -10,7 +10,7 @@ import sqlalchemy as sa
from alembic import context
import sqlalchemy_utils
import citext
import teal
from ereuse_devicehub import teal
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
@ -26,48 +26,85 @@ def get_inv():
raise ValueError("Inventory value is not specified")
return INV
def upgrade():
# Allocate action
op.drop_table('allocate', schema=f'{get_inv()}')
op.create_table('allocate',
sa.Column('final_user_code', citext.CIText(), default='', nullable=True,
comment = "This is a internal code for mainteing the secrets of the personal datas of the new holder"),
sa.Column('transaction', citext.CIText(), nullable=True, comment='The code used from the owner for relation with external tool.'),
op.create_table(
'allocate',
sa.Column(
'final_user_code',
citext.CIText(),
default='',
nullable=True,
comment="This is a internal code for mainteing the secrets of the personal datas of the new holder",
),
sa.Column(
'transaction',
citext.CIText(),
nullable=True,
comment='The code used from the owner for relation with external tool.',
),
sa.Column('end_users', sa.Numeric(precision=4), nullable=True),
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
sa.ForeignKeyConstraint(['id'], [f'{get_inv()}.action.id'], ),
sa.ForeignKeyConstraint(
['id'],
[f'{get_inv()}.action.id'],
),
sa.PrimaryKeyConstraint('id'),
schema=f'{get_inv()}'
schema=f'{get_inv()}',
)
# Deallocate action
op.drop_table('deallocate', schema=f'{get_inv()}')
op.create_table('deallocate',
sa.Column('transaction', citext.CIText(), nullable=True, comment='The code used from the owner for relation with external tool.'),
op.create_table(
'deallocate',
sa.Column(
'transaction',
citext.CIText(),
nullable=True,
comment='The code used from the owner for relation with external tool.',
),
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
sa.ForeignKeyConstraint(['id'], [f'{get_inv()}.action.id'], ),
sa.ForeignKeyConstraint(
['id'],
[f'{get_inv()}.action.id'],
),
sa.PrimaryKeyConstraint('id'),
schema=f'{get_inv()}'
schema=f'{get_inv()}',
)
# Add allocate as a column in device
op.add_column('device', sa.Column('allocated', sa.Boolean(), nullable=True), schema=f'{get_inv()}')
op.add_column(
'device',
sa.Column('allocated', sa.Boolean(), nullable=True),
schema=f'{get_inv()}',
)
# Receive action
op.drop_table('receive', schema=f'{get_inv()}')
# Live action
op.drop_table('live', schema=f'{get_inv()}')
op.create_table('live',
op.create_table(
'live',
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('serial_number', sa.Unicode(), nullable=True,
comment='The serial number of the Hard Disk in lower case.'),
sa.Column(
'serial_number',
sa.Unicode(),
nullable=True,
comment='The serial number of the Hard Disk in lower case.',
),
sa.Column('usage_time_hdd', sa.Interval(), nullable=True),
sa.Column('snapshot_uuid', postgresql.UUID(as_uuid=True), nullable=False),
sa.ForeignKeyConstraint(['id'], [f'{get_inv()}.action.id'], ),
sa.ForeignKeyConstraint(
['id'],
[f'{get_inv()}.action.id'],
),
sa.PrimaryKeyConstraint('id'),
schema=f'{get_inv()}'
schema=f'{get_inv()}',
)
def downgrade():
op.drop_table('allocate', schema=f'{get_inv()}')

File diff suppressed because it is too large Load diff

View file

@ -7,8 +7,8 @@ from math import hypot
from typing import Iterator, List, Optional, TypeVar
import dateutil.parser
from ereuse_utils import getter, text
from ereuse_utils.nested_lookup import (
from ereuse_devicehub.ereuse_utils import getter, text
from ereuse_devicehub.ereuse_utils.nested_lookup import (
get_nested_dicts_with_key_containing_value,
get_nested_dicts_with_key_value,
)

View file

@ -5,7 +5,7 @@ import struct
from contextlib import contextmanager
from enum import Enum
from ereuse_utils import Dumpeable
from ereuse_devicehub.ereuse_utils import Dumpeable
class Severity(Enum):

View file

@ -4,7 +4,7 @@ from datetime import timedelta
from distutils.version import StrictVersion
from uuid import UUID
import ereuse_utils
import ereuse_devicehub.ereuse_utils
import jwt
from flask import current_app as app
from flask import g, request
@ -203,7 +203,7 @@ def decode_snapshot(data):
data['data'],
app.config['JWT_PASS'],
algorithms="HS256",
json_encoder=ereuse_utils.JSONEncoder,
json_encoder=ereuse_devicehub.ereuse_utils.JSONEncoder,
)
except jwt.exceptions.InvalidSignatureError as err:
txt = 'Invalid snapshot'

View file

@ -12,7 +12,7 @@ from typing import Dict, List, Set
from boltons import urlutils
from citext import CIText
from ereuse_utils.naming import HID_CONVERSION_DOC
from ereuse_devicehub.ereuse_utils.naming import HID_CONVERSION_DOC
from flask import current_app as app
from flask import g, request
from more_itertools import unique_everseen

View file

@ -1,4 +1,4 @@
import teal.marshmallow
import ereuse_devicehub.teal.marshmallow
from marshmallow import fields as mf
from ereuse_devicehub.resources.schemas import Thing
@ -7,4 +7,6 @@ from ereuse_devicehub.resources.schemas import Thing
class Inventory(Thing):
id = mf.String(dump_only=True)
name = mf.String(dump_only=True)
tag_provider = teal.marshmallow.URL(dump_only=True, data_key='tagProvider')
tag_provider = ereuse_devicehub.teal.marshmallow.URL(
dump_only=True, data_key='tagProvider'
)

View file

@ -2,7 +2,7 @@ import csv
import pathlib
from click import argument, option
from ereuse_utils import cli
from ereuse_devicehub.ereuse_utils import cli
from ereuse_devicehub.db import db
from ereuse_devicehub.resources.device.definitions import DeviceDef

View file

@ -1,9 +1,9 @@
from typing import Any, Iterable, Tuple, Type, Union
from boltons.urlutils import URL
from ereuse_utils.test import JSON
from ereuse_utils.test import Client as EreuseUtilsClient
from ereuse_utils.test import Res
from ereuse_devicehub.ereuse_utils.test import JSON
from ereuse_devicehub.ereuse_utils.test import Client as EreuseUtilsClient
from ereuse_devicehub.ereuse_utils.test import Res
from werkzeug.exceptions import HTTPException
from ereuse_devicehub.teal.marshmallow import ValidationError

View file

@ -7,7 +7,7 @@ from typing import Any, Type, Union
from boltons.typeutils import classproperty
from boltons.urlutils import URL as BoltonsUrl
from ereuse_utils import if_none_return_none
from ereuse_devicehub.ereuse_utils import if_none_return_none
from flask_sqlalchemy import BaseQuery
from flask_sqlalchemy import Model as _Model
from flask_sqlalchemy import SignallingSession

View file

@ -1,10 +1,10 @@
import ereuse_utils
import ereuse_devicehub.ereuse_utils
from flask.json import JSONEncoder as FlaskJSONEncoder
from sqlalchemy.ext.baked import Result
from sqlalchemy.orm import Query
class TealJSONEncoder(ereuse_utils.JSONEncoder, FlaskJSONEncoder):
class TealJSONEncoder(ereuse_devicehub.ereuse_utils.JSONEncoder, FlaskJSONEncoder):
def default(self, obj):
if isinstance(obj, (Result, Query)):
return tuple(obj)

View file

@ -4,7 +4,7 @@ from typing import Type, Union
import colour
from boltons import strutils, urlutils
from ereuse_utils import if_none_return_none
from ereuse_devicehub.ereuse_utils import if_none_return_none
from flask import current_app as app
from flask import g
from marshmallow import utils

View file

@ -1,7 +1,7 @@
import json
from json import JSONDecodeError
from ereuse_utils import flatten_mixed
from ereuse_devicehub.ereuse_utils import flatten_mixed
from marshmallow import Schema as MarshmallowSchema
from marshmallow.fields import Boolean, Field, List, Nested, Str, missing_
from sqlalchemy import Column, between, or_

View file

@ -4,7 +4,7 @@ from typing import Callable, Iterable, Iterator, Tuple, Type, Union
import inflection
from anytree import PreOrderIter
from boltons.typeutils import classproperty, issubclass
from ereuse_utils.naming import Naming
from ereuse_devicehub.ereuse_utils.naming import Naming
from flask import Blueprint, current_app, g, request, url_for
from flask.json import jsonify
from flask.views import MethodView

View file

@ -2,12 +2,12 @@ import inspect
from typing import Dict, Type
import click_spinner
import ereuse_utils
import ereuse_devicehub.ereuse_utils
import flask_cors
from anytree import Node
from apispec import APISpec
from click import option
from ereuse_utils import ensure_utf8
from ereuse_devicehub.ereuse_utils import ensure_utf8
from flask import Flask, jsonify
from flask.globals import _app_ctx_stack
from flask_sqlalchemy import SQLAlchemy
@ -284,7 +284,7 @@ class Teal(Flask):
return jsonify(self._apidocs)
class DumpeableHTTPException(ereuse_utils.Dumpeable):
class DumpeableHTTPException(ereuse_devicehub.ereuse_utils.Dumpeable):
"""Exceptions that inherit this class will be able to dump
to dicts and JSONs.
"""

View file

@ -6,7 +6,6 @@ from datetime import datetime
from pathlib import Path
import boltons.urlutils
import ereuse_utils
import jwt
import pytest
import yaml
@ -14,6 +13,7 @@ from decouple import config
from psycopg2 import IntegrityError
from sqlalchemy.exc import ProgrammingError
from ereuse_devicehub import ereuse_utils
from ereuse_devicehub.api.views import api
from ereuse_devicehub.client import Client, UserClient, UserClientFlask
from ereuse_devicehub.config import DevicehubConfig