import urllib.parse import requests from django.conf import settings from django.http import Http404 from django.urls.exceptions import NoReverseMatch from django.utils.translation import gettext_lazy as _ from .models import Address, DatabaseService, Domain, Mailbox, SaasService, UserAccount, WebSite DOMAINS_PATH = 'domains/' TOKEN_PATH = '/api-token-auth/' API_PATHS = { # auth 'token-auth': '/api-token-auth/', 'my-account': 'accounts/', # services 'database-list': 'databases/', 'domain-list': 'domains/', 'domain-detail': 'domains/{pk}/', 'address-list': 'addresses/', 'address-detail': 'addresses/{pk}/', 'mailbox-list': 'mailboxes/', 'mailbox-detail': 'mailboxes/{pk}/', 'mailbox-password': 'mailboxes/{pk}/set_password/', 'mailinglist-list': 'lists/', 'saas-list': 'saas/', 'website-list': 'websites/', # other 'bill-list': 'bills/', 'bill-document': 'bills/{pk}/document/', 'payment-source-list': 'payment-sources/', } class Orchestra(object): def __init__(self, *args, username=None, password=None, **kwargs): self.base_url = kwargs.pop('base_url', settings.API_BASE_URL) self.username = username self.session = requests.Session() self.auth_token = kwargs.pop("auth_token", None) if self.auth_token is None: self.auth_token = self.authenticate(self.username, password) def build_absolute_uri(self, path_name): path = API_PATHS.get(path_name, None) if path is None: raise NoReverseMatch( "Not found API path name '{}'".format(path_name)) return urllib.parse.urljoin(self.base_url, path) def authenticate(self, username, password): url = self.build_absolute_uri('token-auth') response = self.session.post( url, data={"username": username, "password": password}, ) return response.json().get("token", None) def request(self, verb, resource=None, url=None, data=None, render_as="json", querystring=None, raise_exception=True): assert verb in ["HEAD", "GET", "POST", "PATCH", "PUT", "DELETE"] if resource is not None: url = self.build_absolute_uri(resource) elif url is None: raise AttributeError("Provide `resource` or `url` params") if querystring is not None: url = "{}?{}".format(url, querystring) verb = getattr(self.session, verb.lower()) headers = { "Authorization": "Token {}".format(self.auth_token), "Content-Type": "application/json", } response = verb(url, json=data, headers=headers, allow_redirects=False) if raise_exception: response.raise_for_status() status = response.status_code if status < 500 and render_as == "json": output = response.json() else: output = response.content return status, output def retrieve_service_list(self, service_name, querystring=None): pattern_name = '{}-list'.format(service_name) if pattern_name not in API_PATHS: raise ValueError("Unknown service {}".format(service_name)) _, output = self.request("GET", pattern_name, querystring=querystring) return output def retrieve_profile(self): status, output = self.request("GET", 'my-account') if status >= 400: raise PermissionError("Cannot retrieve profile of an anonymous user.") return UserAccount.new_from_json(output[0]) def retrieve_bill_document(self, pk): path = API_PATHS.get('bill-document').format_map({'pk': pk}) url = urllib.parse.urljoin(self.base_url, path) status, bill_pdf = self.request("GET", render_as="html", url=url, raise_exception=False) if status == 404: raise Http404(_("No domain found matching the query")) return bill_pdf def create_mail_address(self, data): resource = '{}-list'.format(Address.api_name) return self.request("POST", resource=resource, data=data) def retrieve_mail_address(self, pk): path = API_PATHS.get('address-detail').format_map({'pk': pk}) url = urllib.parse.urljoin(self.base_url, path) status, data = self.request("GET", url=url, raise_exception=False) if status == 404: raise Http404(_("No object found matching the query")) return Address.new_from_json(data) def update_mail_address(self, pk, data): path = API_PATHS.get('address-detail').format_map({'pk': pk}) url = urllib.parse.urljoin(self.base_url, path) return self.request("PUT", url=url, data=data) def retrieve_mail_address_list(self, querystring=None): # retrieve mails applying filters (if any) raw_data = self.retrieve_service_list( Address.api_name, querystring=querystring, ) addresses = [Address.new_from_json(data) for data in raw_data] # PATCH to include Pangea addresses not shown by orchestra # described on issue #4 # TODO(@slamora) disabled hacky patch because breaks another funtionalities # XXX Fix it on orchestra instead of here??? # raw_mailboxes = self.retrieve_mailbox_list() # for mailbox in raw_mailboxes: # if mailbox['addresses'] == []: # address_data = { # 'names': [mailbox['name']], # 'forward': '', # 'domain': { # 'name': 'pangea.org.', # }, # 'mailboxes': [mailbox], # } # pangea_address = Address.new_from_json(address_data) # addresses.append(pangea_address) return addresses def delete_mail_address(self, pk): path = API_PATHS.get('address-detail').format_map({'pk': pk}) url = urllib.parse.urljoin(self.base_url, path) return self.request("DELETE", url=url, render_as=None) def create_mailbox(self, data): resource = '{}-list'.format(Mailbox.api_name) return self.request("POST", resource=resource, data=data, raise_exception=False) def retrieve_mailbox(self, pk): path = API_PATHS.get('mailbox-detail').format_map({'pk': pk}) url = urllib.parse.urljoin(self.base_url, path) status, data_json = self.request("GET", url=url, raise_exception=False) if status == 404: raise Http404(_("No mailbox found matching the query")) return Mailbox.new_from_json(data_json) def update_mailbox(self, pk, data): path = API_PATHS.get('mailbox-detail').format_map({'pk': pk}) url = urllib.parse.urljoin(self.base_url, path) status, response = self.request("PATCH", url=url, data=data, raise_exception=False) return status, response def retrieve_mailbox_list(self): mailboxes = self.retrieve_service_list(Mailbox.api_name) return [Mailbox.new_from_json(mailbox_data) for mailbox_data in mailboxes] def delete_mailbox(self, pk): path = API_PATHS.get('mailbox-detail').format_map({'pk': pk}) url = urllib.parse.urljoin(self.base_url, path) # Mark as inactive instead of deleting # return self.request("DELETE", url=url, render_as=None) return self.request("PATCH", url=url, data={"is_active": False}) def set_password_mailbox(self, pk, data): path = API_PATHS.get('mailbox-password').format_map({'pk': pk}) url = urllib.parse.urljoin(self.base_url, path) status, response = self.request("POST", url=url, data=data, raise_exception=False) return status, response def retrieve_domain(self, pk): path = API_PATHS.get('domain-detail').format_map({'pk': pk}) url = urllib.parse.urljoin(self.base_url, path) status, domain_json = self.request("GET", url=url, raise_exception=False) if status == 404: raise Http404(_("No domain found matching the query")) return Domain.new_from_json(domain_json) def retrieve_domain_list(self): output = self.retrieve_service_list(Domain.api_name) websites = self.retrieve_website_list() domains = [] for domain_json in output: # filter querystring querystring = "domain={}".format(domain_json['id']) # retrieve services associated to a domain domain_json['addresses'] = self.retrieve_service_list( Address.api_name, querystring) # retrieve websites (as they cannot be filtered by domain on the API we should do it here) domain_json['websites'] = self.filter_websites_by_domain(websites, domain_json['id']) # TODO(@slamora): update when backend provides resource disk usage data domain_json['usage'] = { # 'usage': 300, # 'total': 650, # 'unit': 'MB', # 'percent': 50, } # append to list a Domain object domains.append(Domain.new_from_json(domain_json)) return domains def retrieve_website_list(self): output = self.retrieve_service_list(WebSite.api_name) return [WebSite.new_from_json(website_data) for website_data in output] def filter_websites_by_domain(self, websites, domain_id): matching = [] for website in websites: web_domains = [web_domain.id for web_domain in website.domains] if domain_id in web_domains: matching.append(website) return matching def verify_credentials(self): """ Returns: A user profile info if the credentials are valid, None otherwise. """ status, output = self.request("GET", 'my-account', raise_exception=False) if status < 400: return output return None