import email.utils import imaplib import os import poplib import smtplib import time from email.mime.text import MIMEText from django.conf import settings as djsettings from django.contrib.contenttypes.models import ContentType from django.core.management.base import CommandError from django.core.urlresolvers import reverse from selenium.webdriver.support.select import Select from orchestra.apps.accounts.models import Account from orchestra.apps.orchestration.models import Server, Route from orchestra.apps.resources.models import Resource from orchestra.utils.system import run, sshrun from orchestra.utils.tests import BaseLiveServerTestCase, random_ascii, snapshot_on_error, save_response_on_error from ... import backends, settings from ...models import Mailbox #>>> mail.list() #('OK', ['(\\HasNoChildren) "." INBOX']) #>>> mail.select('INBOX') #('OK', ['18']) #>>> mail.getquota('INBOX') #imaplib.error: GETQUOTA command error: BAD ['Error in IMAP command GETQUOTA: Unknown command.'] #mail.fetch(10, '(RFC822)') #('OK', [('10 (FLAGS (\\Seen) RFC822 {550}', 'Return-Path: \r\nDelivered-To: \r\nReceived: from test3.orchestra.lan\r\n\tby test3.orchestra.lan (Dovecot) with LMTP id hvDUEAIKL1QlOQAAL4hJug\r\n\tfor ; Fri, 03 Oct 2014 16:41:38 -0400\r\nReceived: by test3.orchestra.lan (Postfix, from userid 0)\r\n\tid 43BB1F94633; Fri, 3 Oct 2014 16:41:38 -0400 (EDT)\r\nTo: rata@orchestra.lan\r\nSubject: hola\r\nMessage-Id: <20141003204138.43BB1F94633@test3.orchestra.lan>\r\nDate: Fri, 3 Oct 2014 16:41:38 -0400 (EDT)\r\nFrom: root@test3.orchestra.lan (root)\r\n\r\n\r\n\r\n'), ')']) #>>> mail.close() #('OK', ['Close completed.']) #pop = poplib.POP3('localhost') #pop.user('rata') #pop.pass_('3') #>>> pop.list() #('+OK 18 messages:', ['1 552', '2 550', '3 550', '4 548', '5 546', '6 546', '7 554', '8 548', '9 550', '10 550', '11 546', '12 546', '13 546', '14 544', '15 548', '16 577', '17 546', '18 546'], 135) #>>> pop.quit() #'+OK Logging out.' # FIXME django load production database at the begining of tests class MailboxMixin(object): MASTER_SERVER = os.environ.get('ORCHESTRA_SLAVE_SERVER', 'localhost') DEPENDENCIES = ( 'orchestra.apps.orchestration', 'orchestra.apps.mails', 'orchestra.apps.resources', ) def setUp(self): super(MailboxMixin, self).setUp() self.add_route() # apps.get_app_config('resources').reload_relations() doesn't work djsettings.DEBUG = True def add_route(self): server = Server.objects.create(name=self.MASTER_SERVER) backend = backends.PasswdVirtualUserBackend.get_name() Route.objects.create(backend=backend, match=True, host=server) backend = backends.PostfixAddressBackend.get_name() Route.objects.create(backend=backend, match=True, host=server) def add_quota_resource(self): Resource.objects.create( name='disk', content_type=ContentType.objects.get_for_model(Mailbox), period=Resource.LAST, verbose_name='Mail quota', unit='MB', scale=10**6, on_demand=False, default_allocation=2000 ) def save(self): raise NotImplementedError def add(self): raise NotImplementedError def delete(self): raise NotImplementedError def update(self): raise NotImplementedError def disable(self): raise NotImplementedError def add_group(self, username, groupname): raise NotImplementedError def validate_user(self, username): idcmd = sshr(self.MASTER_SERVER, "id %s" % username) self.assertEqual(0, idcmd.return_code) user = SystemUser.objects.get(username=username) groups = list(user.groups.values_list('username', flat=True)) groups.append(user.username) idgroups = idcmd.stdout.strip().split(' ')[2] idgroups = re.findall(r'\d+\((\w+)\)', idgroups) self.assertEqual(set(groups), set(idgroups)) def validate_delete(self, username): self.assertRaises(SystemUser.DoesNotExist, SystemUser.objects.get, username=username) self.assertRaises(CommandError, sshrun, self.MASTER_SERVER,'id %s' % username, display=False) self.assertRaises(CommandError, sshrun, self.MASTER_SERVER, 'grep "^%s:" /etc/groups' % username, display=False) self.assertRaises(CommandError, sshrun, self.MASTER_SERVER, 'grep "^%s:" /etc/passwd' % username, display=False) self.assertRaises(CommandError, sshrun, self.MASTER_SERVER, 'grep "^%s:" /etc/shadow' % username, display=False) def login_imap(self, username, password): mail = imaplib.IMAP4_SSL(self.MASTER_SERVER) status, msg = mail.login(username, password) self.assertEqual('OK', status) self.assertEqual(['Logged in'], msg) return mail def login_pop3(self, username, password): pop = poplib.POP3(self.MASTER_SERVER) pop.user(username) pop.pass_(password) return pop def send_email(self, to, token): msg = MIMEText(token) msg['To'] = to msg['From'] = 'orchestra@test.orchestra.lan' msg['Subject'] = 'test' server = smtplib.SMTP(self.MASTER_SERVER, 25) try: server.ehlo() server.starttls() server.ehlo() server.sendmail(msg['From'], msg['To'], msg.as_string()) finally: server.quit() def validate_email(self, username, token): home = Mailbox.objects.get(name=username).get_home() sshrun(self.MASTER_SERVER, "grep '%s' %s/Maildir/new/*" % (token, home), display=False) def test_add(self): username = '%s_mailbox' % random_ascii(10) password = '@!?%spppP001' % random_ascii(5) self.add(username, password) self.addCleanup(partial(self.delete, username)) imap = self.login_imap(username, password) def test_change_password(self): username = '%s_systemuser' % random_ascii(10) password = '@!?%spppP001' % random_ascii(5) self.add(username, password) self.addCleanup(partial(self.delete, username)) imap = self.login_imap(username, password) new_password = '@!?%spppP001' % random_ascii(5) self.change_password(username, new_password) imap = self.login_imap(username, new_password) def test_quota(self): username = '%s_mailbox' % random_ascii(10) password = '@!?%spppP001' % random_ascii(5) self.add_quota_resource() quota = 100 self.add(username, password, quota=quota) self.addCleanup(partial(self.delete, username)) get_quota = "doveadm quota get -u %s 2>&1|grep STORAGE|awk {'print $5'}" % username stdout = sshrun(self.MASTER_SERVER, get_quota, display=False).stdout self.assertEqual(quota*1024, int(stdout)) imap = self.login_imap(username, password) imap_quota = int(imap.getquotaroot("INBOX")[1][1][0].split(' ')[-1].split(')')[0]) self.assertEqual(quota*1024, imap_quota) def test_send_email(self): username = '%s_mailbox' % random_ascii(10) password = '@!?%spppP001' % random_ascii(5) self.add(username, password) self.addCleanup(partial(self.delete, username)) msg = MIMEText("Hola bishuns") msg['To'] = 'noexists@example.com' msg['From'] = '%s@%s' % (username, self.MASTER_SERVER) msg['Subject'] = "test" server = smtplib.SMTP(self.MASTER_SERVER, 25) server.login(username, password) try: server.sendmail(msg['From'], msg['To'], msg.as_string()) finally: server.quit() def test_address(self): username = '%s_mailbox' % random_ascii(10) password = '@!?%spppP001' % random_ascii(5) self.add(username, password) self.addCleanup(partial(self.delete, username)) domain = '%s_domain.lan' % random_ascii(5) name = '%s_name' % random_ascii(5) domain = self.account.domains.create(name=domain) self.add_address(username, name, domain) token = random_ascii(100) self.send_email("%s@%s" % (name, domain), token) self.validate_email(username, token) class RESTMailboxMixin(MailboxMixin): def setUp(self): super(RESTMailboxMixin, self).setUp() self.rest_login() @save_response_on_error def add(self, username, password, quota=None): extra = {} if quota: extra = { "resources": [ { "name": "disk", "allocated": quota }, ] } self.rest.mailboxes.create(name=username, password=password, **extra) @save_response_on_error def delete(self, username): mailbox = self.rest.mailboxes.retrieve(name=username).get() mailbox.delete() @save_response_on_error def change_password(self, username, password): mailbox = self.rest.mailboxes.retrieve(name=username).get() mailbox.change_password(password) @save_response_on_error def add_address(self, username, name, domain): mailbox = self.rest.mailboxes.retrieve(name=username).get() domain = self.rest.domains.retrieve(name=domain.name).get() self.rest.addresses.create(name=name, domain=domain, mailboxes=[mailbox]) class AdminMailboxMixin(MailboxMixin): def setUp(self): super(AdminMailboxMixin, self).setUp() self.admin_login() @snapshot_on_error def add(self, username, password, quota=None): url = self.live_server_url + reverse('admin:mails_mailbox_add') self.selenium.get(url) account_input = self.selenium.find_element_by_id('id_account') account_select = Select(account_input) account_select.select_by_value(str(self.account.pk)) name_field = self.selenium.find_element_by_id('id_name') name_field.send_keys(username) password_field = self.selenium.find_element_by_id('id_password1') password_field.send_keys(password) password_field = self.selenium.find_element_by_id('id_password2') password_field.send_keys(password) if quota is not None: quota_field = self.selenium.find_element_by_id( 'id_resources-resourcedata-content_type-object_id-0-allocated') quota_field.clear() quota_field.send_keys(quota) name_field.submit() self.assertNotEqual(url, self.selenium.current_url) @snapshot_on_error def delete(self, username): mailbox = Mailbox.objects.get(name=username) delete = reverse('admin:mails_mailbox_delete', args=(mailbox.pk,)) url = self.live_server_url + delete self.selenium.get(url) confirmation = self.selenium.find_element_by_name('post') confirmation.submit() self.assertNotEqual(url, self.selenium.current_url) @snapshot_on_error def change_password(self, username, password): mailbox = Mailbox.objects.get(name=username) change_password = reverse('admin:mails_mailbox_change_password', args=(mailbox.pk,)) url = self.live_server_url + change_password self.selenium.get(url) password_field = self.selenium.find_element_by_id('id_password1') password_field.send_keys(password) password_field = self.selenium.find_element_by_id('id_password2') password_field.send_keys(password) password_field.submit() self.assertNotEqual(url, self.selenium.current_url) @snapshot_on_error def add_address(self, username, name, domain): url = self.live_server_url + reverse('admin:mails_address_add') self.selenium.get(url) name_field = self.selenium.find_element_by_id('id_name') name_field.send_keys(name) domain_input = self.selenium.find_element_by_id('id_domain') domain_select = Select(domain_input) domain_select.select_by_value(str(domain.pk)) mailboxes = self.selenium.find_element_by_id('id_mailboxes_add_all_link') mailboxes.click() time.sleep(0.5) name_field.submit() self.assertNotEqual(url, self.selenium.current_url) class RESTMailboxTest(RESTMailboxMixin, BaseLiveServerTestCase): pass class AdminMailboxTest(AdminMailboxMixin, BaseLiveServerTestCase): pass