add PasswordPolicyRule (not used yet)
This commit is contained in:
parent
58ebd15ada
commit
0e73702fca
35
passbook/core/migrations/0002_auto_20190208_1514.py
Normal file
35
passbook/core/migrations/0002_auto_20190208_1514.py
Normal file
|
@ -0,0 +1,35 @@
|
||||||
|
# Generated by Django 2.1.5 on 2019-02-08 15:14
|
||||||
|
|
||||||
|
import django.db.models.deletion
|
||||||
|
from django.db import migrations, models
|
||||||
|
|
||||||
|
|
||||||
|
class Migration(migrations.Migration):
|
||||||
|
|
||||||
|
dependencies = [
|
||||||
|
('passbook_core', '0001_initial'),
|
||||||
|
]
|
||||||
|
|
||||||
|
operations = [
|
||||||
|
migrations.CreateModel(
|
||||||
|
name='PasswordPolicyRule',
|
||||||
|
fields=[
|
||||||
|
('rule_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='passbook_core.Rule')),
|
||||||
|
('amount_uppercase', models.IntegerField(default=0)),
|
||||||
|
('amount_lowercase', models.IntegerField(default=0)),
|
||||||
|
('amount_symbols', models.IntegerField(default=0)),
|
||||||
|
('length_min', models.IntegerField(default=0)),
|
||||||
|
('symbol_charset', models.TextField(default='!\\"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ ')),
|
||||||
|
],
|
||||||
|
options={
|
||||||
|
'verbose_name': 'Password Policy Rule',
|
||||||
|
'verbose_name_plural': 'Password Policy Rules',
|
||||||
|
},
|
||||||
|
bases=('passbook_core.rule',),
|
||||||
|
),
|
||||||
|
migrations.AlterField(
|
||||||
|
model_name='fieldmatcherrule',
|
||||||
|
name='user_field',
|
||||||
|
field=models.TextField(choices=[('username', 'Username'), ('first_name', 'First Name'), ('last_name', 'Last Name'), ('email', 'E-Mail'), ('is_staff', 'Is staff'), ('is_active', 'Is active'), ('data_joined', 'Date joined')]),
|
||||||
|
),
|
||||||
|
]
|
|
@ -82,7 +82,7 @@ class Application(RuleModel):
|
||||||
def user_is_authorized(self, user: User) -> bool:
|
def user_is_authorized(self, user: User) -> bool:
|
||||||
"""Check if user is authorized to use this application"""
|
"""Check if user is authorized to use this application"""
|
||||||
from passbook.core.rules import RuleEngine
|
from passbook.core.rules import RuleEngine
|
||||||
return RuleEngine(self).for_user(user).result
|
return RuleEngine(self.rules.all()).for_user(user).result
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return self.name
|
return self.name
|
||||||
|
@ -160,6 +160,7 @@ class FieldMatcherRule(Rule):
|
||||||
MATCH_CONTAINS = 'contains'
|
MATCH_CONTAINS = 'contains'
|
||||||
MATCH_REGEXP = 'regexp'
|
MATCH_REGEXP = 'regexp'
|
||||||
MATCH_EXACT = 'exact'
|
MATCH_EXACT = 'exact'
|
||||||
|
|
||||||
MATCHES = (
|
MATCHES = (
|
||||||
(MATCH_STARTSWITH, _('Starts with')),
|
(MATCH_STARTSWITH, _('Starts with')),
|
||||||
(MATCH_ENDSWITH, _('Ends with')),
|
(MATCH_ENDSWITH, _('Ends with')),
|
||||||
|
@ -169,13 +170,13 @@ class FieldMatcherRule(Rule):
|
||||||
)
|
)
|
||||||
|
|
||||||
USER_FIELDS = (
|
USER_FIELDS = (
|
||||||
('username', 'username',),
|
('username', _('Username'),),
|
||||||
('first_name', 'first_name',),
|
('first_name', _('First Name'),),
|
||||||
('last_name', 'last_name',),
|
('last_name', _('Last Name'),),
|
||||||
('email', 'email',),
|
('email', _('E-Mail'),),
|
||||||
('is_staff', 'is_staff',),
|
('is_staff', _('Is staff'),),
|
||||||
('is_active', 'is_active',),
|
('is_active', _('Is active'),),
|
||||||
('data_joined', 'data_joined',),
|
('data_joined', _('Date joined'),),
|
||||||
)
|
)
|
||||||
|
|
||||||
user_field = models.TextField(choices=USER_FIELDS)
|
user_field = models.TextField(choices=USER_FIELDS)
|
||||||
|
@ -218,6 +219,41 @@ class FieldMatcherRule(Rule):
|
||||||
verbose_name = _('Field matcher Rule')
|
verbose_name = _('Field matcher Rule')
|
||||||
verbose_name_plural = _('Field matcher Rules')
|
verbose_name_plural = _('Field matcher Rules')
|
||||||
|
|
||||||
|
@reversion.register()
|
||||||
|
class PasswordPolicyRule(Rule):
|
||||||
|
"""Rule to make sure passwords have certain properties"""
|
||||||
|
|
||||||
|
amount_uppercase = models.IntegerField(default=0)
|
||||||
|
amount_lowercase = models.IntegerField(default=0)
|
||||||
|
amount_symbols = models.IntegerField(default=0)
|
||||||
|
length_min = models.IntegerField(default=0)
|
||||||
|
symbol_charset = models.TextField(default=r"!\"#$%&'()*+,-./:;<=>?@[\]^_`{|}~ ")
|
||||||
|
|
||||||
|
form = 'passbook.core.forms.rules.PasswordPolicyRuleForm'
|
||||||
|
|
||||||
|
def passes(self, user: User) -> bool:
|
||||||
|
# Only check if password is being set
|
||||||
|
if not hasattr(user, '__password__'):
|
||||||
|
return True
|
||||||
|
password = getattr(user, '__password__')
|
||||||
|
|
||||||
|
filter_regex = r''
|
||||||
|
if self.amount_lowercase > 0:
|
||||||
|
filter_regex += r'[a-z]{%d,}' % self.amount_lowercase
|
||||||
|
if self.amount_uppercase > 0:
|
||||||
|
filter_regex += r'[A-Z]{%d,}' % self.amount_uppercase
|
||||||
|
if self.amount_symbols > 0:
|
||||||
|
filter_regex += r'[%s]{%d,}' % (self.symbol_charset, self.amount_symbols)
|
||||||
|
result = bool(re.compile(filter_regex).match(password))
|
||||||
|
LOGGER.debug("User got %r", result)
|
||||||
|
return result
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
|
||||||
|
verbose_name = _('Password Policy Rule')
|
||||||
|
verbose_name_plural = _('Password Policy Rules')
|
||||||
|
|
||||||
|
|
||||||
@reversion.register()
|
@reversion.register()
|
||||||
class WebhookRule(Rule):
|
class WebhookRule(Rule):
|
||||||
"""Rule that asks webhook"""
|
"""Rule that asks webhook"""
|
||||||
|
|
|
@ -9,27 +9,32 @@ from passbook.core.models import Rule, User
|
||||||
LOGGER = getLogger(__name__)
|
LOGGER = getLogger(__name__)
|
||||||
|
|
||||||
@CELERY_APP.task()
|
@CELERY_APP.task()
|
||||||
def _rule_engine_task(user_pk, rule_pk):
|
def _rule_engine_task(user_pk, rule_pk, **kwargs):
|
||||||
"""Task wrapper to run rule checking"""
|
"""Task wrapper to run rule checking"""
|
||||||
rule_obj = Rule.objects.filter(pk=rule_pk).select_subclasses().first()
|
rule_obj = Rule.objects.filter(pk=rule_pk).select_subclasses().first()
|
||||||
user_obj = User.objects.get(pk=user_pk)
|
user_obj = User.objects.get(pk=user_pk)
|
||||||
|
for key, value in kwargs.items():
|
||||||
|
setattr(user_obj, key, value)
|
||||||
LOGGER.debug("Running rule `%s`#%s for user %s...", rule_obj.name, rule_obj.pk.hex, user_obj)
|
LOGGER.debug("Running rule `%s`#%s for user %s...", rule_obj.name, rule_obj.pk.hex, user_obj)
|
||||||
return rule_obj.passes(user_obj)
|
return rule_obj.passes(user_obj)
|
||||||
|
|
||||||
class RuleEngine:
|
class RuleEngine:
|
||||||
"""Orchestrate rule checking, launch tasks and return result"""
|
"""Orchestrate rule checking, launch tasks and return result"""
|
||||||
|
|
||||||
_rule_model = None
|
rules = None
|
||||||
_group = None
|
_group = None
|
||||||
|
|
||||||
def __init__(self, rule_model):
|
def __init__(self, rules):
|
||||||
self._rule_model = rule_model
|
self.rules = rules
|
||||||
|
|
||||||
def for_user(self, user):
|
def for_user(self, user):
|
||||||
"""Check rules for user"""
|
"""Check rules for user"""
|
||||||
signatures = []
|
signatures = []
|
||||||
for rule in self._rule_model.rules.all():
|
kwargs = {
|
||||||
signatures.append(_rule_engine_task.s(user.pk, rule.pk.hex))
|
'__password__': getattr(user, '__password__')
|
||||||
|
}
|
||||||
|
for rule in self.rules:
|
||||||
|
signatures.append(_rule_engine_task.s(user.pk, rule.pk.hex, **kwargs))
|
||||||
self._group = group(signatures)()
|
self._group = group(signatures)()
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
|
Reference in a new issue