Added preliminar functional tests for systemusers
This commit is contained in:
parent
f984d28709
commit
276c02c2fd
12
TODO.md
12
TODO.md
|
@ -142,3 +142,15 @@ Remember that, as always with QuerySets, any subsequent chained methods which im
|
|||
* Redirect junk emails and delete every 30 days?
|
||||
|
||||
* Complitely decouples scripts execution, billing, service definition
|
||||
|
||||
* Create SystemUser on account creation. username=username, is_main=True,
|
||||
* Exclude is_main=True from queryset filter default is_main=False
|
||||
* self referencing group.
|
||||
* Unify all users
|
||||
|
||||
|
||||
* backend message with link
|
||||
|
||||
* test fucking user
|
||||
|
||||
* delete main user -> delete account or prevent delete main user
|
||||
|
|
|
@ -1,121 +0,0 @@
|
|||
import textwrap
|
||||
|
||||
from django.utils import timezone
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from orchestra.apps.orchestration import ServiceController
|
||||
from orchestra.apps.resources import ServiceMonitor
|
||||
|
||||
from . import settings
|
||||
|
||||
# TODO create a base backend for SystemUsers!
|
||||
|
||||
class MainUserBackend(ServiceController):
|
||||
verbose_name = _("Main user")
|
||||
model = 'accounts.Account'
|
||||
ignore_fields = ['last_login']
|
||||
|
||||
def save(self, user):
|
||||
context = self.get_context(user)
|
||||
self.append(textwrap.dedent("""
|
||||
if [[ $( id %(username)s ) ]]; then
|
||||
usermod --password '%(password)s' %(username)s
|
||||
else
|
||||
useradd %(username)s --password '%(password)s' --shell %(shell)s
|
||||
fi
|
||||
mkdir -p %(home)s
|
||||
chown %(username)s.%(username)s %(home)s""" % context
|
||||
))
|
||||
|
||||
def delete(self, user):
|
||||
context = self.get_context(user)
|
||||
self.append("{ sleep 2 && killall -u %(username)s -s KILL; } &" % context)
|
||||
self.append("killall -u %(username)s" % context)
|
||||
self.append("userdel %(username)s" % context)
|
||||
|
||||
def get_context(self, user):
|
||||
context = {
|
||||
'username': user.username,
|
||||
'password': user.password if user.is_active else '*%s' % user.password,
|
||||
'shell': getattr(user, 'shell', settings.ACCOUNTS_DEFAULT_SHELL)
|
||||
}
|
||||
context['home'] = settings.ACCOUNTS_HOME % context
|
||||
return context
|
||||
|
||||
|
||||
class MainUserDisk(ServiceMonitor):
|
||||
model = 'accounts.Account'
|
||||
resource = ServiceMonitor.DISK
|
||||
verbose_name = _('Main user disk')
|
||||
|
||||
def monitor(self, user):
|
||||
context = self.get_context(user)
|
||||
self.append("du -s %(home)s | xargs echo %(object_id)s" % context)
|
||||
|
||||
def get_context(self, user):
|
||||
context = SystemUserBackend().get_context(user)
|
||||
context['object_id'] = user.pk
|
||||
return context
|
||||
|
||||
|
||||
class MainFTPTraffic(ServiceMonitor):
|
||||
model = 'accounts.Account'
|
||||
resource = ServiceMonitor.TRAFFIC
|
||||
verbose_name = _('Main FTP traffic')
|
||||
|
||||
def prepare(self):
|
||||
current_date = timezone.localtime(self.current_date)
|
||||
current_date = current_date.strftime("%Y%m%d%H%M%S")
|
||||
self.append(textwrap.dedent("""
|
||||
function monitor () {
|
||||
OBJECT_ID=$1
|
||||
INI_DATE=$2
|
||||
USERNAME="$3"
|
||||
LOG_FILE="$4"
|
||||
grep "UPLOAD\|DOWNLOAD" "${LOG_FILE}" \\
|
||||
| grep " \\[${USERNAME}\\] " \\
|
||||
| awk -v ini="${INI_DATE}" '
|
||||
BEGIN {
|
||||
end = "%s"
|
||||
sum = 0
|
||||
months["Jan"] = "01"
|
||||
months["Feb"] = "02"
|
||||
months["Mar"] = "03"
|
||||
months["Apr"] = "04"
|
||||
months["May"] = "05"
|
||||
months["Jun"] = "06"
|
||||
months["Jul"] = "07"
|
||||
months["Aug"] = "08"
|
||||
months["Sep"] = "09"
|
||||
months["Oct"] = "10"
|
||||
months["Nov"] = "11"
|
||||
months["Dec"] = "12"
|
||||
} {
|
||||
# log: Fri Jul 11 13:23:17 2014
|
||||
split($4, t, ":")
|
||||
# line_date = year month day hour minute second
|
||||
line_date = $5 months[$2] $3 t[1] t[2] t[3]
|
||||
if ( line_date > ini && line_date < end)
|
||||
split($0, l, "\\", ")
|
||||
split(l[3], b, " ")
|
||||
sum += b[1]
|
||||
} END {
|
||||
print sum
|
||||
}
|
||||
' | xargs echo ${OBJECT_ID}
|
||||
}""" % current_date))
|
||||
|
||||
def monitor(self, user):
|
||||
context = self.get_context(user)
|
||||
self.append(
|
||||
'monitor %(object_id)i %(last_date)s "%(username)s" "%(log_file)s"' % context)
|
||||
|
||||
def get_context(self, user):
|
||||
last_date = timezone.localtime(self.get_last_date(user.pk))
|
||||
return {
|
||||
'log_file': settings.ACCOUNTS_FTP_LOG_PATH,
|
||||
'last_date': last_date.strftime("%Y%m%d%H%M%S"),
|
||||
'object_id': user.pk,
|
||||
'username': user.username,
|
||||
}
|
||||
|
|
@ -62,8 +62,8 @@ class Account(auth.AbstractBaseUser):
|
|||
def save(self, *args, **kwargs):
|
||||
created = not self.pk
|
||||
super(Account, self).save(*args, **kwargs)
|
||||
if created and hasattr(self, 'systemgroups'):
|
||||
self.systemgroups.create(name=self.username, account=self)
|
||||
if created and hasattr(self, 'systemusers'):
|
||||
self.systemusers.create_user(self.username, account=self, password=self.password, is_main=True)
|
||||
|
||||
def send_email(self, template, context, contacts=[], attachments=[], html=None):
|
||||
contacts = self.contacts.filter(email_usages=contacts)
|
||||
|
|
|
@ -13,5 +13,5 @@ class AccountSerializer(serializers.HyperlinkedModelSerializer):
|
|||
|
||||
class AccountSerializerMixin(object):
|
||||
def save_object(self, obj, **kwargs):
|
||||
obj.account = self.context['request'].user.account
|
||||
obj.account = self.context['request'].user
|
||||
super(AccountSerializerMixin, self).save_object(obj, **kwargs)
|
||||
|
|
|
@ -22,12 +22,3 @@ ACCOUNTS_DEFAULT_LANGUAGE = getattr(settings, 'ACCOUNTS_DEFAULT_LANGUAGE', 'en')
|
|||
|
||||
|
||||
ACCOUNTS_MAIN_PK = getattr(settings, 'ACCOUNTS_MAIN_PK', 1)
|
||||
|
||||
|
||||
ACCOUNTS_HOME = getattr(settings, 'ACCOUNTS_HOME', '/home/%(username)s')
|
||||
|
||||
|
||||
ACCOUNTS_FTP_LOG_PATH = getattr(settings, 'ACCOUNTS_FTP_LOG_PATH', '/var/log/vsftpd.log')
|
||||
|
||||
|
||||
ACCOUNTS_DEFAULT_SHELL = getattr(settings, 'ACCOUNTS_DEFAULT_SHELL', '/bin/false')
|
||||
|
|
|
@ -230,6 +230,7 @@ class AdminDomainMixin(DomainTestMixin):
|
|||
return value_input
|
||||
|
||||
def add(self, domain_name, records):
|
||||
# TODO use reverse
|
||||
url = self.live_server_url + '/admin/domains/domain/add/'
|
||||
self.selenium.get(url)
|
||||
name = self.selenium.find_element_by_id('id_name')
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import logging
|
||||
import threading
|
||||
|
||||
from django import db
|
||||
|
@ -8,6 +9,9 @@ from . import settings
|
|||
from .helpers import send_report
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def as_task(execute):
|
||||
def wrapper(*args, **kwargs):
|
||||
with db.transaction.commit_manually():
|
||||
|
@ -37,6 +41,7 @@ def execute(operations):
|
|||
scripts = {}
|
||||
cache = {}
|
||||
for operation in operations:
|
||||
logger.info("Queued %s" % str(operation))
|
||||
servers = router.get_servers(operation, cache=cache)
|
||||
for server in servers:
|
||||
key = (server, operation.backend)
|
||||
|
@ -64,7 +69,9 @@ def execute(operations):
|
|||
logs = []
|
||||
for execution, operations in executions:
|
||||
for operation in operations:
|
||||
logger.info("Executed %s" % str(operation))
|
||||
operation.log = execution.log
|
||||
operation.save()
|
||||
logger.info(execution.log.stderr)
|
||||
logs.append(execution.log)
|
||||
return logs
|
||||
|
|
|
@ -48,7 +48,7 @@ class SystemUserAdmin(SelectAccountAdminMixin, ExtendedModelAdmin):
|
|||
# derived from monkeypatching formfield.widget.render on AccountAdminMinxin,
|
||||
# don't ask.
|
||||
formfield = form.base_fields['groups']
|
||||
formfield.queryset = formfield.queryset.exclude(name=obj.username)
|
||||
formfield.queryset = formfield.queryset.exclude(id=obj.id)
|
||||
return form
|
||||
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@ from .serializers import SystemUserSerializer
|
|||
class SystemUserViewSet(AccountApiMixin, SetPasswordApiMixin, viewsets.ModelViewSet):
|
||||
model = SystemUser
|
||||
serializer_class = SystemUserSerializer
|
||||
filter_fields = ('username',)
|
||||
|
||||
|
||||
router.register(r'systemusers', SystemUserViewSet)
|
||||
|
|
|
@ -1,17 +1,132 @@
|
|||
import os
|
||||
import textwrap
|
||||
|
||||
from django.utils import timezone
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from orchestra.apps.accounts.backends import MainUserBackend, MainFTPTraffic
|
||||
from orchestra.apps.orchestration import ServiceController
|
||||
from orchestra.apps.resources import ServiceMonitor
|
||||
|
||||
from . import settings
|
||||
|
||||
|
||||
class SystemUserBackend(MainUserBackend):
|
||||
class SystemUserBackend(ServiceController):
|
||||
verbose_name = _("System user")
|
||||
model = 'systemusers.SystemUser'
|
||||
ignore_fields = []
|
||||
|
||||
def save(self, user):
|
||||
context = self.get_context(user)
|
||||
groups = ','.join(self.get_groups(user))
|
||||
context['groups_arg'] = '--groups %s' % groups if groups else ''
|
||||
self.append(textwrap.dedent("""
|
||||
if [[ $( id %(username)s ) ]]; then
|
||||
usermod %(username)s --password '%(password)s' --shell %(shell)s %(groups_arg)s
|
||||
else
|
||||
useradd %(username)s --password '%(password)s' --shell %(shell)s %(groups_arg)s
|
||||
usermod -a -G %(username)s %(mainusername)s
|
||||
fi
|
||||
mkdir -p %(home)s
|
||||
chown %(username)s.%(username)s %(home)s""" % context
|
||||
))
|
||||
|
||||
def delete(self, user):
|
||||
context = self.get_context(user)
|
||||
self.append("{ sleep 2 && killall -u %(username)s -s KILL; } &" % context)
|
||||
self.append("killall -u %(username)s || true" % context)
|
||||
self.append("userdel %(username)s || true" % context)
|
||||
self.append("groupdel %(username)s || true" % context)
|
||||
|
||||
def get_groups(self, user):
|
||||
if user.is_main:
|
||||
return user.account.systemusers.exclude(id=user.id).values_list('username', flat=True)
|
||||
groups = list(user.groups.values_list('username', flat=True))
|
||||
return groups
|
||||
|
||||
def get_context(self, user):
|
||||
context = {
|
||||
'username': user.username,
|
||||
'password': user.password if user.active else '*%s' % user.password,
|
||||
'shell': user.shell,
|
||||
'mainusername': user.username if user.is_main else user.account.username,
|
||||
}
|
||||
basehome = settings.SYSTEMUSERS_HOME % context
|
||||
context['home'] = os.path.join(basehome, user.home)
|
||||
return context
|
||||
|
||||
|
||||
class SystemUserFTPTraffic(MainFTPTraffic):
|
||||
class SystemUserDisk(ServiceMonitor):
|
||||
model = 'systemusers.SystemUser'
|
||||
verbose_name = _('System user FTP traffic')
|
||||
resource = ServiceMonitor.DISK
|
||||
verbose_name = _('Main user disk')
|
||||
|
||||
def monitor(self, user):
|
||||
context = self.get_context(user)
|
||||
self.append("du -s %(home)s | xargs echo %(object_id)s" % context)
|
||||
|
||||
def get_context(self, user):
|
||||
context = SystemUserBackend().get_context(user)
|
||||
context['object_id'] = user.pk
|
||||
return context
|
||||
|
||||
|
||||
class FTPTraffic(ServiceMonitor):
|
||||
model = 'systemusers.SystemUser'
|
||||
resource = ServiceMonitor.TRAFFIC
|
||||
verbose_name = _('Main FTP traffic')
|
||||
|
||||
def prepare(self):
|
||||
current_date = timezone.localtime(self.current_date)
|
||||
current_date = current_date.strftime("%Y%m%d%H%M%S")
|
||||
self.append(textwrap.dedent("""
|
||||
function monitor () {
|
||||
OBJECT_ID=$1
|
||||
INI_DATE=$2
|
||||
USERNAME="$3"
|
||||
LOG_FILE="$4"
|
||||
grep "UPLOAD\|DOWNLOAD" "${LOG_FILE}" \\
|
||||
| grep " \\[${USERNAME}\\] " \\
|
||||
| awk -v ini="${INI_DATE}" '
|
||||
BEGIN {
|
||||
end = "%s"
|
||||
sum = 0
|
||||
months["Jan"] = "01"
|
||||
months["Feb"] = "02"
|
||||
months["Mar"] = "03"
|
||||
months["Apr"] = "04"
|
||||
months["May"] = "05"
|
||||
months["Jun"] = "06"
|
||||
months["Jul"] = "07"
|
||||
months["Aug"] = "08"
|
||||
months["Sep"] = "09"
|
||||
months["Oct"] = "10"
|
||||
months["Nov"] = "11"
|
||||
months["Dec"] = "12"
|
||||
} {
|
||||
# log: Fri Jul 11 13:23:17 2014
|
||||
split($4, t, ":")
|
||||
# line_date = year month day hour minute second
|
||||
line_date = $5 months[$2] $3 t[1] t[2] t[3]
|
||||
if ( line_date > ini && line_date < end)
|
||||
split($0, l, "\\", ")
|
||||
split(l[3], b, " ")
|
||||
sum += b[1]
|
||||
} END {
|
||||
print sum
|
||||
}
|
||||
' | xargs echo ${OBJECT_ID}
|
||||
}""" % current_date))
|
||||
|
||||
def monitor(self, user):
|
||||
context = self.get_context(user)
|
||||
self.append(
|
||||
'monitor %(object_id)i %(last_date)s "%(username)s" "%(log_file)s"' % context)
|
||||
|
||||
def get_context(self, user):
|
||||
last_date = timezone.localtime(self.get_last_date(user.pk))
|
||||
return {
|
||||
'log_file': settings.SYSTEMUSERS_FTP_LOG_PATH,
|
||||
'last_date': last_date.strftime("%Y%m%d%H%M%S"),
|
||||
'object_id': user.pk,
|
||||
'username': user.username,
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ from django.contrib.auth.hashers import make_password
|
|||
from django.core import validators
|
||||
from django.core.mail import send_mail
|
||||
from django.db import models
|
||||
from django.utils.functional import cached_property
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from orchestra.core import services
|
||||
|
@ -28,10 +29,11 @@ class SystemUser(models.Model):
|
|||
home = models.CharField(_("home"), max_length=256, blank=True,
|
||||
help_text=_("Home directory relative to account's ~main_user"))
|
||||
shell = models.CharField(_("shell"), max_length=32,
|
||||
choices=settings.USERS_SHELLS, default=settings.USERS_DEFAULT_SHELL)
|
||||
groups = models.ManyToManyField('systemusers.SystemGroup', blank=True,
|
||||
choices=settings.SYSTEMUSERS_SHELLS, default=settings.SYSTEMUSERS_DEFAULT_SHELL)
|
||||
groups = models.ManyToManyField('self', blank=True,
|
||||
help_text=_("A new group will be created for the user. "
|
||||
"Which additional groups would you like them to be a member of?"))
|
||||
is_main = models.BooleanField(_("is main"), default=False)
|
||||
is_active = models.BooleanField(_("active"), default=True,
|
||||
help_text=_("Designates whether this account should be treated as active. "
|
||||
"Unselect this instead of deleting accounts."))
|
||||
|
@ -41,37 +43,25 @@ class SystemUser(models.Model):
|
|||
def __unicode__(self):
|
||||
return self.username
|
||||
|
||||
def clean(self):
|
||||
""" unique usernames between accounts and system users """
|
||||
if not self.pk:
|
||||
field = self._meta.get_field_by_name('account')[0]
|
||||
account_model = field.rel.to
|
||||
if account_model.objects.filter(username=self.username).exists():
|
||||
raise validators.ValidationError(self.error_messages['duplicate_username'])
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
created = not self.pk
|
||||
super(SystemUser, self).save(*args, **kwargs)
|
||||
if created:
|
||||
self.groups.create(name=self.username, account=self.account)
|
||||
|
||||
def set_password(self, raw_password):
|
||||
self.password = make_password(raw_password)
|
||||
|
||||
def get_is_active(self):
|
||||
return self.account.is_active and self.is_active
|
||||
@cached_property
|
||||
def active(self):
|
||||
return self.is_active and self.account.is_active
|
||||
|
||||
|
||||
class SystemGroup(models.Model):
|
||||
name = models.CharField(_("name"), max_length=64, unique=True,
|
||||
help_text=_("Required. 30 characters or fewer. Letters, digits and ./-/_ only."),
|
||||
validators=[validators.RegexValidator(r'^[\w.-]+$',
|
||||
_("Enter a valid group name."), 'invalid')])
|
||||
account = models.ForeignKey('accounts.Account', verbose_name=_("Account"),
|
||||
related_name='systemgroups')
|
||||
|
||||
def __unicode__(self):
|
||||
return self.name
|
||||
## TODO user deletion and group handling.
|
||||
#class SystemGroup(models.Model):
|
||||
# name = models.CharField(_("name"), max_length=64, unique=True,
|
||||
# help_text=_("Required. 30 characters or fewer. Letters, digits and ./-/_ only."),
|
||||
# validators=[validators.RegexValidator(r'^[\w.-]+$',
|
||||
# _("Enter a valid group name."), 'invalid')])
|
||||
# account = models.ForeignKey('accounts.Account', verbose_name=_("Account"),
|
||||
# related_name='systemgroups')
|
||||
#
|
||||
# def __unicode__(self):
|
||||
# return self.name
|
||||
|
||||
|
||||
services.register(SystemUser)
|
||||
|
|
|
@ -23,7 +23,7 @@ class SystemUserSerializer(AccountSerializerMixin, serializers.HyperlinkedModelS
|
|||
|
||||
def validate_password(self, attrs, source):
|
||||
""" POST only password """
|
||||
if self.object.pk:
|
||||
if self.object:
|
||||
if 'password' in attrs:
|
||||
raise serializers.ValidationError(_("Can not set password"))
|
||||
elif 'password' not in attrs:
|
||||
|
|
|
@ -3,11 +3,17 @@ from django.conf import settings
|
|||
from django.utils.translation import ugettext, ugettext_lazy as _
|
||||
|
||||
|
||||
USERS_SHELLS = getattr(settings, 'USERS_SHELLS', (
|
||||
SYSTEMUSERS_SHELLS = getattr(settings, 'SYSTEMUSERS_SHELLS', (
|
||||
('/bin/false', _("No shell, FTP only")),
|
||||
('/bin/rsync', _("No shell, SFTP/RSYNC only")),
|
||||
('/bin/bash', "/bin/bash"),
|
||||
('/bin/sh', "/bin/sh"),
|
||||
))
|
||||
|
||||
USERS_DEFAULT_SHELL = getattr(settings, 'USERS_DEFAULT_SHELL', '/bin/false')
|
||||
SYSTEMUSERS_DEFAULT_SHELL = getattr(settings, 'SYSTEMUSERS_DEFAULT_SHELL', '/bin/false')
|
||||
|
||||
|
||||
SYSTEMUSERS_HOME = getattr(settings, 'SYSTEMUSERS_HOME', '/home/%(username)s')
|
||||
|
||||
|
||||
SYSTEMUSERS_FTP_LOG_PATH = getattr(settings, 'SYSTEMUSERS_FTP_LOG_PATH', '/var/log/vsftpd.log')
|
||||
|
|
0
orchestra/apps/systemusers/tests/__init__.py
Normal file
0
orchestra/apps/systemusers/tests/__init__.py
Normal file
126
orchestra/apps/systemusers/tests/functional_tests/tests.py
Normal file
126
orchestra/apps/systemusers/tests/functional_tests/tests.py
Normal file
|
@ -0,0 +1,126 @@
|
|||
from functools import partial
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.urlresolvers import reverse
|
||||
|
||||
from orchestra.apps.accounts.models import Account
|
||||
from orchestra.apps.orchestration.models import Server, Route
|
||||
from orchestra.utils.system import run
|
||||
from orchestra.utils.tests import BaseLiveServerTestCase, random_ascii
|
||||
|
||||
from ... import backends
|
||||
|
||||
|
||||
r = partial(run, silent=True, display=False)
|
||||
|
||||
|
||||
class SystemUserMixin(object):
|
||||
MASTER_ADDR = 'localhost'
|
||||
ACCOUNT_USERNAME = '%s_account' % random_ascii(10)
|
||||
DEPENDENCIES = (
|
||||
'orchestra.apps.orchestration',
|
||||
'orcgestra.apps.systemusers',
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super(SystemUserMixin, self).setUp()
|
||||
self.add_route()
|
||||
|
||||
def add_route(self):
|
||||
master = Server.objects.create(name=self.MASTER_ADDR)
|
||||
backend = backends.SystemUserBackend.get_name()
|
||||
Route.objects.create(backend=backend, match=True, host=master)
|
||||
|
||||
def add(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def delete(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def update(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def test_create_systemuser(self):
|
||||
username = '%s_systemuser' % random_ascii(10)
|
||||
password = '@!?%spppP001' % random_ascii(5)
|
||||
self.add(username, password)
|
||||
self.addCleanup(partial(self.delete, username))
|
||||
self.assertEqual(0, r("id %s" % username).return_code)
|
||||
# TODO test group membership and everything
|
||||
|
||||
def test_delete_systemuser(self):
|
||||
username = '%s_systemuser' % random_ascii(10)
|
||||
password = '@!?%sppppP001' % random_ascii(5)
|
||||
self.add(username, password)
|
||||
self.assertEqual(0, r("id %s" % username).return_code)
|
||||
self.delete(username)
|
||||
self.assertEqual(1, r("id %s" % username, error_codes=[0,1]).return_code)
|
||||
|
||||
|
||||
class RESTSystemUserMixin(SystemUserMixin):
|
||||
def setUp(self):
|
||||
super(RESTSystemUserMixin, self).setUp()
|
||||
self.rest_login()
|
||||
|
||||
def add(self, username, password):
|
||||
self.rest.systemusers.create(username=username, password=password)
|
||||
|
||||
def delete(self, username):
|
||||
user = self.rest.systemusers.retrieve(username=username).get()
|
||||
user.delete()
|
||||
|
||||
def update(self):
|
||||
pass
|
||||
|
||||
|
||||
# TODO
|
||||
class AdminSystemUserMixin(SystemUserMixin):
|
||||
def setUp(self):
|
||||
super(AdminSystemUserMixin, self).setUp()
|
||||
self.admin_login()
|
||||
|
||||
def add(self, username, password):
|
||||
pass
|
||||
|
||||
def delete(self, username):
|
||||
pass
|
||||
|
||||
def update(self):
|
||||
pass
|
||||
|
||||
|
||||
class RESTSystemUserTest(RESTSystemUserMixin, BaseLiveServerTestCase):
|
||||
pass
|
||||
|
||||
|
||||
class AdminSystemUserTest(AdminSystemUserMixin, BaseLiveServerTestCase):
|
||||
def test_create_account(self):
|
||||
url = self.live_server_url + reverse('admin:accounts_account_add')
|
||||
self.selenium.get(url)
|
||||
|
||||
account_username = '%s_account' % random_ascii(10)
|
||||
username = self.selenium.find_element_by_id('id_username')
|
||||
username.send_keys(account_username)
|
||||
|
||||
account_password = '@!?%spppP001' % random_ascii(5)
|
||||
password = self.selenium.find_element_by_id('id_password1')
|
||||
password.send_keys(account_password)
|
||||
password = self.selenium.find_element_by_id('id_password2')
|
||||
password.send_keys(account_password)
|
||||
|
||||
account_email = 'orchestra@orchestra.lan'
|
||||
email = self.selenium.find_element_by_id('id_email')
|
||||
email.send_keys(account_email)
|
||||
|
||||
contact_short_name = random_ascii(10)
|
||||
short_name = self.selenium.find_element_by_id('id_contacts-0-short_name')
|
||||
short_name.send_keys(contact_short_name)
|
||||
|
||||
email = self.selenium.find_element_by_id('id_contacts-0-email')
|
||||
email.send_keys(account_email)
|
||||
email.submit()
|
||||
|
||||
account = Account.objects.get(username=account_username)
|
||||
self.addCleanup(account.delete)
|
||||
self.assertNotEqual(url, self.selenium.current_url)
|
||||
self.assertEqual(0, r("id %s" % account.username).return_code)
|
|
@ -12,9 +12,6 @@ from xvfbwrapper import Xvfb
|
|||
from orchestra.apps.accounts.models import Account
|
||||
|
||||
|
||||
User = get_user_model()
|
||||
|
||||
|
||||
class AppDependencyMixin(object):
|
||||
DEPENDENCIES = ()
|
||||
|
||||
|
@ -56,6 +53,9 @@ class BaseTestCase(TestCase, AppDependencyMixin):
|
|||
|
||||
|
||||
class BaseLiveServerTestCase(AppDependencyMixin, LiveServerTestCase):
|
||||
ACCOUNT_USERNAME = 'orchestra'
|
||||
ACCOUNT_PASSWORD = 'orchestra'
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.vdisplay = Xvfb()
|
||||
|
@ -69,19 +69,21 @@ class BaseLiveServerTestCase(AppDependencyMixin, LiveServerTestCase):
|
|||
cls.vdisplay.stop()
|
||||
super(BaseLiveServerTestCase, cls).tearDownClass()
|
||||
|
||||
def create_account(self, superuser=False):
|
||||
if superuser:
|
||||
return Account.objects.create_superuser(self.ACCOUNT_USERNAME,
|
||||
password=self.ACCOUNT_PASSWORD, email='orchestra@orchestra.org')
|
||||
return Account.objects.create_user(self.ACCOUNT_USERNAME,
|
||||
password=self.ACCOUNT_PASSWORD, email='orchestra@orchestra.org')
|
||||
|
||||
def setUp(self):
|
||||
super(BaseLiveServerTestCase, self).setUp()
|
||||
self.rest = Api(self.live_server_url + '/api/')
|
||||
self.account = Account.objects.create(name='orchestra')
|
||||
self.username = 'orchestra'
|
||||
self.password = 'orchestra'
|
||||
self.user = User.objects.create_superuser(username='orchestra',
|
||||
password='orchestra', email='orchestra@orchestra.org',
|
||||
account=self.account)
|
||||
self.account = self.create_account(superuser=True)
|
||||
|
||||
def admin_login(self):
|
||||
session = SessionStore()
|
||||
session[SESSION_KEY] = self.user.pk
|
||||
session[SESSION_KEY] = self.account.pk
|
||||
session[BACKEND_SESSION_KEY] = settings.AUTHENTICATION_BACKENDS[0]
|
||||
session.save()
|
||||
## to set a cookie we need to first visit the domain.
|
||||
|
@ -93,7 +95,7 @@ class BaseLiveServerTestCase(AppDependencyMixin, LiveServerTestCase):
|
|||
))
|
||||
|
||||
def rest_login(self):
|
||||
self.rest.login(username=self.username, password=self.password)
|
||||
self.rest.login(username=self.ACCOUNT_USERNAME, password=self.ACCOUNT_PASSWORD)
|
||||
|
||||
|
||||
def random_ascii(length):
|
||||
|
|
Loading…
Reference in a new issue