django-musician/musician/api.py

178 lines
6.1 KiB
Python
Raw Permalink Normal View History

import requests
import urllib.parse
from django.conf import settings
2019-12-17 09:25:10 +00:00
from django.http import Http404
from django.urls.exceptions import NoReverseMatch
2019-12-17 09:25:10 +00:00
from django.utils.translation import gettext_lazy as _
from .models import Domain, DatabaseService, MailService, SaasService, UserAccount, WebSite
DOMAINS_PATH = 'domains/'
TOKEN_PATH = '/api-token-auth/'
API_PATHS = {
# auth
'token-auth': '/api-token-auth/',
2019-10-30 13:00:12 +00:00
'my-account': 'accounts/',
# services
2019-11-13 11:27:25 +00:00
'database-list': 'databases/',
'domain-list': 'domains/',
2019-12-13 14:08:01 +00:00
'domain-detail': 'domains/{pk}/',
'address-list': 'addresses/',
'mailbox-list': 'mailboxes/',
'mailinglist-list': 'lists/',
2019-12-06 09:27:18 +00:00
'saas-list': 'saas/',
'website-list': 'websites/',
2019-11-20 19:07:35 +00:00
# other
2019-12-17 13:48:21 +00:00
'bill-list': 'bills/',
2019-12-17 14:15:58 +00:00
'bill-document': 'bills/{pk}/document/',
2019-11-20 19:07:35 +00:00
'payment-source-list': 'payment-sources/',
}
class Orchestra(object):
def __init__(self, *args, username=None, password=None, **kwargs):
self.base_url = kwargs.pop('base_url', settings.API_BASE_URL)
self.username = username
self.session = requests.Session()
self.auth_token = kwargs.pop("auth_token", None)
if self.auth_token is None:
self.auth_token = self.authenticate(self.username, password)
def build_absolute_uri(self, path_name):
path = API_PATHS.get(path_name, None)
if path is None:
raise NoReverseMatch(
"Not found API path name '{}'".format(path_name))
return urllib.parse.urljoin(self.base_url, path)
def authenticate(self, username, password):
url = self.build_absolute_uri('token-auth')
response = self.session.post(
url,
data={"username": username, "password": password},
)
return response.json().get("token", None)
2019-12-17 14:15:58 +00:00
def request(self, verb, resource=None, url=None, render_as="json", querystring=None, raise_exception=True):
assert verb in ["HEAD", "GET", "POST", "PATCH", "PUT", "DELETE"]
2019-12-13 14:08:01 +00:00
if resource is not None:
url = self.build_absolute_uri(resource)
elif url is None:
raise AttributeError("Provide `resource` or `url` params")
if querystring is not None:
url = "{}?{}".format(url, querystring)
verb = getattr(self.session, verb.lower())
response = verb(url, headers={"Authorization": "Token {}".format(
self.auth_token)}, allow_redirects=False)
2019-10-30 13:00:12 +00:00
if raise_exception:
response.raise_for_status()
status = response.status_code
2019-12-17 14:15:58 +00:00
if render_as == "json":
output = response.json()
else:
output = response.content
return status, output
def retrieve_service_list(self, service_name, querystring=None):
pattern_name = '{}-list'.format(service_name)
if pattern_name not in API_PATHS:
raise ValueError("Unknown service {}".format(service_name))
_, output = self.request("GET", pattern_name, querystring=querystring)
return output
2019-10-30 13:00:12 +00:00
def retrieve_profile(self):
status, output = self.request("GET", 'my-account')
if status >= 400:
raise PermissionError("Cannot retrieve profile of an anonymous user.")
return UserAccount.new_from_json(output[0])
2019-12-17 14:15:58 +00:00
def retrieve_bill_document(self, pk):
path = API_PATHS.get('bill-document').format_map({'pk': pk})
url = urllib.parse.urljoin(self.base_url, path)
status, bill_pdf = self.request("GET", render_as="html", url=url, raise_exception=False)
if status == 404:
raise Http404(_("No domain found matching the query"))
return bill_pdf
2019-12-13 14:08:01 +00:00
def retrieve_domain(self, pk):
path = API_PATHS.get('domain-detail').format_map({'pk': pk})
url = urllib.parse.urljoin(self.base_url, path)
2019-12-17 09:25:10 +00:00
status, domain_json = self.request("GET", url=url, raise_exception=False)
if status == 404:
raise Http404(_("No domain found matching the query"))
2019-12-13 14:08:01 +00:00
return Domain.new_from_json(domain_json)
def retrieve_domain_list(self):
output = self.retrieve_service_list(Domain.api_name)
websites = self.retrieve_website_list()
domains = []
for domain_json in output:
# filter querystring
querystring = "domain={}".format(domain_json['id'])
# retrieve services associated to a domain
domain_json['mails'] = self.retrieve_service_list(
MailService.api_name, querystring)
# retrieve websites (as they cannot be filtered by domain on the API we should do it here)
domain_json['websites'] = self.filter_websites_by_domain(websites, domain_json['id'])
# TODO(@slamora): databases and sass are not related to a domain, so cannot be filtered
# domain_json['databases'] = self.retrieve_service_list(DatabaseService.api_name, querystring)
# domain_json['saas'] = self.retrieve_service_list(SaasService.api_name, querystring)
# TODO(@slamora): update when backend provides resource disk usage data
domain_json['usage'] = {
'usage': 300,
'total': 650,
'unit': 'MB',
'percent': 50,
}
# append to list a Domain object
domains.append(Domain.new_from_json(domain_json))
return domains
2019-10-30 13:00:12 +00:00
def retrieve_website_list(self):
output = self.retrieve_service_list(WebSite.api_name)
return [WebSite.new_from_json(website_data) for website_data in output]
def filter_websites_by_domain(self, websites, domain_id):
matching = []
for website in websites:
web_domains = [web_domain.id for web_domain in website.domains]
if domain_id in web_domains:
matching.append(website)
return matching
2019-10-30 13:00:12 +00:00
def verify_credentials(self):
"""
Returns:
A user profile info if the
credentials are valid, None otherwise.
"""
status, output = self.request("GET", 'my-account', raise_exception=False)
if status < 400:
return output
return None