This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
devicehub-teal/ereuse_devicehub/client.py

290 lines
7.3 KiB
Python
Raw Normal View History

2018-06-14 13:14:23 +00:00
from inspect import isclass
from typing import Dict, Iterable, Type, Union
from ereuse_utils.test import JSON, Res
from flask.testing import FlaskClient
from flask_wtf.csrf import generate_csrf
from teal.client import Client as TealClient
from teal.client import Query, Status
2018-04-27 17:16:43 +00:00
from werkzeug.exceptions import HTTPException
2018-06-14 13:14:23 +00:00
from ereuse_devicehub.resources import models, schemas
2018-04-10 15:06:39 +00:00
ResourceLike = Union[Type[Union[models.Thing, schemas.Thing]], str]
2018-04-10 15:06:39 +00:00
class Client(TealClient):
"""A client suited for Devicehub main usage."""
def __init__(
self,
application,
response_wrapper=None,
use_cookies=False,
allow_subdomain_redirects=False,
):
super().__init__(
application, response_wrapper, use_cookies, allow_subdomain_redirects
)
def open(
self,
uri: str,
res: ResourceLike = None,
status: Status = 200,
query: Query = tuple(),
accept=JSON,
content_type=JSON,
item=None,
headers: dict = None,
token: str = None,
**kw,
) -> Res:
2018-06-14 13:14:23 +00:00
if isclass(res) and issubclass(res, (models.Thing, schemas.Thing)):
res = res.t
return super().open(
uri, res, status, query, accept, content_type, item, headers, token, **kw
)
def get(
self,
uri: str = '',
res: ResourceLike = None,
query: Query = tuple(),
status: Status = 200,
item: Union[int, str] = None,
accept: str = JSON,
headers: dict = None,
token: str = None,
**kw,
) -> Res:
return super().get(uri, res, query, status, item, accept, headers, token, **kw)
def post(
self,
data: str or dict,
uri: str = '',
res: ResourceLike = None,
query: Query = tuple(),
status: Status = 201,
content_type: str = JSON,
accept: str = JSON,
headers: dict = None,
token: str = None,
**kw,
) -> Res:
return super().post(
data, uri, res, query, status, content_type, accept, headers, token, **kw
)
def patch(
self,
data: str or dict,
uri: str = '',
res: ResourceLike = None,
query: Query = tuple(),
item: Union[int, str] = None,
status: Status = 200,
content_type: str = JSON,
accept: str = JSON,
headers: dict = None,
token: str = None,
**kw,
) -> Res:
return super().patch(
data,
uri,
res,
query,
item,
status,
content_type,
accept,
token,
headers,
**kw,
)
def put(
self,
data: str or dict,
uri: str = '',
res: ResourceLike = None,
query: Query = tuple(),
item: Union[int, str] = None,
status: Status = 201,
content_type: str = JSON,
accept: str = JSON,
headers: dict = None,
token: str = None,
**kw,
) -> Res:
return super().put(
data,
uri,
res,
query,
item,
status,
content_type,
accept,
token,
headers,
**kw,
)
def delete(
self,
uri: str = '',
res: ResourceLike = None,
query: Query = tuple(),
status: Status = 204,
item: Union[int, str] = None,
accept: str = JSON,
headers: dict = None,
token: str = None,
**kw,
) -> Res:
return super().delete(
uri, res, query, status, item, accept, headers, token, **kw
)
2018-04-27 17:16:43 +00:00
def login(self, email: str, password: str):
assert isinstance(email, str)
assert isinstance(password, str)
return self.post(
{'email': email, 'password': password}, '/users/login/', status=200
)
2018-04-27 17:16:43 +00:00
def get_many(
self,
res: ResourceLike,
resources: Iterable[Union[dict, int]],
key: str = None,
**kw,
) -> Iterable[Union[Dict[str, object], str]]:
2018-06-10 16:47:49 +00:00
"""Like :meth:`.get` but with many resources."""
return (
self.get(res=res, item=r[key] if key else r, **kw)[0] for r in resources
2018-06-10 16:47:49 +00:00
)
2018-04-27 17:16:43 +00:00
class UserClient(Client):
"""A client that identifies all of its requests with a specific user.
2018-04-27 17:16:43 +00:00
It will automatically perform login on the first request.
"""
def __init__(
self,
application,
email: str,
password: str,
response_wrapper=None,
use_cookies=False,
allow_subdomain_redirects=False,
):
super().__init__(
application, response_wrapper, use_cookies, allow_subdomain_redirects
)
2018-04-27 17:16:43 +00:00
self.email = email # type: str
self.password = password # type: str
self.user = None # type: dict
def open(
self,
uri: str,
res: ResourceLike = None,
status: int or HTTPException = 200,
query: Query = tuple(),
accept=JSON,
content_type=JSON,
item=None,
headers: dict = None,
token: str = None,
**kw,
) -> Res:
return super().open(
uri,
res,
status,
query,
accept,
content_type,
item,
headers,
self.user['token'] if self.user else token,
**kw,
)
# noinspection PyMethodOverriding
def login(self):
response = super().login(self.email, self.password)
self.user = response[0]
return response
2022-03-17 10:50:57 +00:00
class UserClientFlask:
def __init__(
self,
application,
email: str,
password: str,
response_wrapper=None,
use_cookies=True,
follow_redirects=True,
):
2022-03-17 10:50:57 +00:00
self.email = email
self.password = password
self.follow_redirects = follow_redirects
self.user = None
self.client = FlaskClient(application, use_cookies=use_cookies)
self.client.get('/login/')
data = {
'email': email,
'password': password,
'csrf_token': generate_csrf(),
}
body, status, headers = self.client.post(
'/login/', data=data, follow_redirects=True
)
2022-03-17 10:50:57 +00:00
self.headers = headers
body = next(body).decode("utf-8")
assert "Unassgined" in body
2022-04-19 16:39:42 +00:00
def get(
self,
uri='',
data=None,
follow_redirects=True,
content_type='text/html; charset=utf-8',
**kw,
):
2022-03-17 10:50:57 +00:00
body, status, headers = self.client.get(
uri, data=data, follow_redirects=follow_redirects, headers=self.headers
2022-03-17 10:50:57 +00:00
)
body = next(body).decode("utf-8")
return (body, status)
2022-04-19 16:39:42 +00:00
def post(
self,
uri='',
data=None,
follow_redirects=True,
content_type='application/x-www-form-urlencoded',
**kw,
):
2022-03-17 10:50:57 +00:00
body, status, headers = self.client.post(
2022-04-19 16:39:42 +00:00
uri,
data=data,
follow_redirects=follow_redirects,
headers=self.headers,
content_type=content_type,
2022-03-17 10:50:57 +00:00
)
body = next(body).decode("utf-8")
return (body, status)