django-orchestra/orchestra/apps/services/tests/test_handler.py

369 lines
16 KiB
Python
Raw Normal View History

2014-09-10 16:53:09 +00:00
import datetime
import decimal
import sys
2014-09-10 16:53:09 +00:00
from dateutil import relativedelta
from django.contrib.contenttypes.models import ContentType
from django.utils import timezone
from orchestra.apps.accounts.models import Account
from orchestra.apps.users.models import User
2014-09-14 09:52:45 +00:00
from orchestra.utils.tests import BaseTestCase, random_ascii
2014-09-10 16:53:09 +00:00
2014-09-17 10:32:29 +00:00
from .. import settings, helpers
from ..models import Service, Plan, Rate
2014-09-10 16:53:09 +00:00
2014-09-17 10:32:29 +00:00
class Order(object):
""" Fake order for testing """
last_id = 0
def __init__(self, **kwargs):
self.registered_on = kwargs.get('registered_on', timezone.now().date())
self.billed_until = kwargs.get('billed_until', None)
self.cancelled_on = kwargs.get('cancelled_on', None)
type(self).last_id += 1
self.id = self.last_id
self.pk = self.id
class HandlerTests(BaseTestCase):
2014-09-10 16:53:09 +00:00
DEPENDENCIES = (
'orchestra.apps.orders',
'orchestra.apps.users',
'orchestra.apps.users.roles.posix',
)
def create_account(self):
account = Account.objects.create()
user = User.objects.create_user(username='rata_palida', account=account)
account.user = user
account.save()
return account
def create_ftp_service(self):
2014-09-10 16:53:09 +00:00
service = Service.objects.create(
description="FTP Account",
content_type=ContentType.objects.get_for_model(User),
match='not user.is_main and user.has_posix()',
billing_period=Service.ANUAL,
billing_point=Service.FIXED_DATE,
is_fee=False,
metric='',
2014-09-22 15:59:53 +00:00
pricing_period=Service.NEVER,
2014-09-15 15:36:24 +00:00
rate_algorithm=Service.STEP_PRICE,
2014-09-10 16:53:09 +00:00
on_cancel=Service.DISCOUNT,
payment_style=Service.PREPAY,
tax=0,
2014-09-10 16:53:09 +00:00
nominal_price=10,
)
return service
def test_get_chunks(self):
service = self.create_ftp_service()
2014-09-14 09:52:45 +00:00
handler = service.handler
porders = []
2014-09-10 16:53:09 +00:00
now = timezone.now().date()
2014-09-14 09:52:45 +00:00
2014-09-17 10:32:29 +00:00
order = Order()
2014-09-14 09:52:45 +00:00
porders.append(order)
end = handler.get_billing_point(order)
chunks = helpers.get_chunks(porders, now, end)
2014-09-14 09:52:45 +00:00
self.assertEqual(1, len(chunks))
self.assertIn([now, end, []], chunks)
2014-09-17 10:32:29 +00:00
order1 = Order(
billed_until=now+datetime.timedelta(days=2)
)
2014-09-14 09:52:45 +00:00
porders.append(order1)
chunks = helpers.get_chunks(porders, now, end)
2014-09-14 09:52:45 +00:00
self.assertEqual(2, len(chunks))
self.assertIn([order1.registered_on, order1.billed_until, [order1]], chunks)
self.assertIn([order1.billed_until, end, []], chunks)
2014-09-17 10:32:29 +00:00
order2 = Order(
billed_until = now+datetime.timedelta(days=700)
)
2014-09-14 09:52:45 +00:00
porders.append(order2)
chunks = helpers.get_chunks(porders, now, end)
2014-09-14 09:52:45 +00:00
self.assertEqual(2, len(chunks))
self.assertIn([order.registered_on, order1.billed_until, [order1, order2]], chunks)
self.assertIn([order1.billed_until, end, [order2]], chunks)
2014-09-17 10:32:29 +00:00
order3 = Order(
billed_until = now+datetime.timedelta(days=700)
)
2014-09-14 09:52:45 +00:00
porders.append(order3)
chunks = helpers.get_chunks(porders, now, end)
2014-09-14 09:52:45 +00:00
self.assertEqual(2, len(chunks))
self.assertIn([order.registered_on, order1.billed_until, [order1, order2, order3]], chunks)
self.assertIn([order1.billed_until, end, [order2, order3]], chunks)
2014-09-17 10:32:29 +00:00
order4 = Order(
registered_on=now+datetime.timedelta(days=5),
billed_until = now+datetime.timedelta(days=10)
)
2014-09-14 09:52:45 +00:00
porders.append(order4)
chunks = helpers.get_chunks(porders, now, end)
2014-09-14 09:52:45 +00:00
self.assertEqual(4, len(chunks))
self.assertIn([order.registered_on, order1.billed_until, [order1, order2, order3]], chunks)
self.assertIn([order1.billed_until, order4.registered_on, [order2, order3]], chunks)
self.assertIn([order4.registered_on, order4.billed_until, [order2, order3, order4]], chunks)
self.assertIn([order4.billed_until, end, [order2, order3]], chunks)
2014-09-17 10:32:29 +00:00
order5 = Order(
registered_on=now+datetime.timedelta(days=700),
billed_until=now+datetime.timedelta(days=780)
)
2014-09-14 09:52:45 +00:00
porders.append(order5)
chunks = helpers.get_chunks(porders, now, end)
2014-09-14 09:52:45 +00:00
self.assertEqual(4, len(chunks))
self.assertIn([order.registered_on, order1.billed_until, [order1, order2, order3]], chunks)
self.assertIn([order1.billed_until, order4.registered_on, [order2, order3]], chunks)
self.assertIn([order4.registered_on, order4.billed_until, [order2, order3, order4]], chunks)
self.assertIn([order4.billed_until, end, [order2, order3]], chunks)
2014-09-17 10:32:29 +00:00
order6 = Order(
registered_on=now+datetime.timedelta(days=780),
billed_until=now+datetime.timedelta(days=700)
)
2014-09-14 09:52:45 +00:00
porders.append(order6)
chunks = helpers.get_chunks(porders, now, end)
2014-09-14 09:52:45 +00:00
self.assertEqual(4, len(chunks))
self.assertIn([order.registered_on, order1.billed_until, [order1, order2, order3]], chunks)
self.assertIn([order1.billed_until, order4.registered_on, [order2, order3]], chunks)
self.assertIn([order4.registered_on, order4.billed_until, [order2, order3, order4]], chunks)
self.assertIn([order4.billed_until, end, [order2, order3]], chunks)
def test_sort_billed_until_or_registered_on(self):
2014-09-18 15:07:39 +00:00
now = timezone.now().date()
2014-09-14 09:52:45 +00:00
order = Order(
billed_until=now+datetime.timedelta(days=200))
order1 = Order(
registered_on=now+datetime.timedelta(days=5),
billed_until=now+datetime.timedelta(days=200))
order2 = Order(
registered_on=now+datetime.timedelta(days=6),
billed_until=now+datetime.timedelta(days=200))
order3 = Order(
registered_on=now+datetime.timedelta(days=6),
billed_until=now+datetime.timedelta(days=201))
order4 = Order(
registered_on=now+datetime.timedelta(days=6))
order5 = Order(
registered_on=now+datetime.timedelta(days=7))
order6 = Order(
registered_on=now+datetime.timedelta(days=8))
orders = [order3, order, order1, order2, order4, order5, order6]
2014-09-14 19:36:27 +00:00
self.assertEqual(orders, sorted(orders, cmp=helpers.cmp_billed_until_or_registered_on))
2014-09-14 09:52:45 +00:00
def test_compensation(self):
2014-09-18 15:07:39 +00:00
now = timezone.now().date()
2014-09-14 09:52:45 +00:00
order = Order(
2014-09-14 19:36:27 +00:00
description='0',
billed_until=now+datetime.timedelta(days=220),
2014-09-14 09:52:45 +00:00
cancelled_on=now+datetime.timedelta(days=100))
order1 = Order(
2014-09-14 19:36:27 +00:00
description='1',
2014-09-14 09:52:45 +00:00
registered_on=now+datetime.timedelta(days=5),
cancelled_on=now+datetime.timedelta(days=190),
billed_until=now+datetime.timedelta(days=200))
order2 = Order(
2014-09-14 19:36:27 +00:00
description='2',
2014-09-14 09:52:45 +00:00
registered_on=now+datetime.timedelta(days=6),
cancelled_on=now+datetime.timedelta(days=200),
billed_until=now+datetime.timedelta(days=200))
order3 = Order(
2014-09-14 19:36:27 +00:00
description='3',
2014-09-14 09:52:45 +00:00
registered_on=now+datetime.timedelta(days=6),
billed_until=now+datetime.timedelta(days=200))
2014-09-14 19:36:27 +00:00
tests = []
2014-09-14 09:52:45 +00:00
order4 = Order(
2014-09-14 19:36:27 +00:00
description='4',
registered_on=now+datetime.timedelta(days=6),
billed_until=now+datetime.timedelta(days=102))
order4.new_billed_until = now+datetime.timedelta(days=200)
tests.append([
[now+datetime.timedelta(days=102), now+datetime.timedelta(days=220), order],
])
2014-09-14 09:52:45 +00:00
order5 = Order(
2014-09-14 19:36:27 +00:00
description='5',
registered_on=now+datetime.timedelta(days=7),
billed_until=now+datetime.timedelta(days=102))
order5.new_billed_until = now+datetime.timedelta(days=195)
tests.append([
[now+datetime.timedelta(days=190), now+datetime.timedelta(days=200), order1]
])
2014-09-14 09:52:45 +00:00
order6 = Order(
2014-09-14 19:36:27 +00:00
description='6',
2014-09-14 09:52:45 +00:00
registered_on=now+datetime.timedelta(days=8))
2014-09-14 19:36:27 +00:00
order6.new_billed_until = now+datetime.timedelta(days=200)
tests.append([
[now+datetime.timedelta(days=100), now+datetime.timedelta(days=102), order],
])
2014-09-14 09:52:45 +00:00
porders = [order3, order, order1, order2, order4, order5, order6]
2014-09-14 19:36:27 +00:00
porders = sorted(porders, cmp=helpers.cmp_billed_until_or_registered_on)
2014-09-14 09:52:45 +00:00
compensations = []
2014-09-14 19:36:27 +00:00
receivers = []
2014-09-14 09:52:45 +00:00
for order in porders:
if order.billed_until and order.cancelled_on and order.cancelled_on < order.billed_until:
compensations.append(helpers.Interval(order.cancelled_on, order.billed_until, order=order))
2014-09-14 19:36:27 +00:00
elif hasattr(order, 'new_billed_until') and (not order.billed_until or order.billed_until < order.new_billed_until):
receivers.append(order)
for order, test in zip(receivers, tests):
ini = order.billed_until or order.registered_on
end = order.cancelled_on or now+datetime.timedelta(days=20000)
order_interval = helpers.Interval(ini, end)
(compensations, used_compensations) = helpers.compensate(order_interval, compensations)
for compensation, test_line in zip(used_compensations, test):
self.assertEqual(test_line[0], compensation.ini)
self.assertEqual(test_line[1], compensation.end)
self.assertEqual(test_line[2], compensation.order)
2014-09-14 09:52:45 +00:00
2014-09-15 12:15:32 +00:00
def test_rates(self):
service = self.create_ftp_service()
account = self.create_account()
2014-09-17 10:32:29 +00:00
superplan = Plan.objects.create(
name='SUPER', allow_multiple=False, is_combinable=True)
2014-09-15 12:15:32 +00:00
service.rates.create(plan=superplan, quantity=1, price=0)
service.rates.create(plan=superplan, quantity=3, price=10)
service.rates.create(plan=superplan, quantity=4, price=9)
service.rates.create(plan=superplan, quantity=10, price=1)
account.plans.create(plan=superplan)
results = service.get_rates(account, cache=False)
2014-09-15 15:36:24 +00:00
results = service.rate_method(results, 30)
2014-09-15 12:15:32 +00:00
rates = [
{'price': decimal.Decimal('0.00'), 'quantity': 2},
{'price': decimal.Decimal('10.00'), 'quantity': 1},
{'price': decimal.Decimal('9.00'), 'quantity': 6},
{'price': decimal.Decimal('1.00'), 'quantity': 21}
2014-09-15 12:15:32 +00:00
]
2014-09-15 15:36:24 +00:00
for rate, result in zip(rates, results):
self.assertEqual(rate['price'], result.price)
self.assertEqual(rate['quantity'], result.quantity)
2014-09-17 10:32:29 +00:00
dupeplan = Plan.objects.create(
name='DUPE', allow_multiple=True, is_combinable=True)
2014-09-15 12:15:32 +00:00
service.rates.create(plan=dupeplan, quantity=1, price=0)
service.rates.create(plan=dupeplan, quantity=3, price=9)
results = service.get_rates(account, cache=False)
2014-09-15 15:36:24 +00:00
results = service.rate_method(results, 30)
for rate, result in zip(rates, results):
self.assertEqual(rate['price'], result.price)
self.assertEqual(rate['quantity'], result.quantity)
2014-09-15 12:15:32 +00:00
account.plans.create(plan=dupeplan)
results = service.get_rates(account, cache=False)
2014-09-15 15:36:24 +00:00
results = service.rate_method(results, 30)
2014-09-15 12:15:32 +00:00
rates = [
{'price': decimal.Decimal('0.00'), 'quantity': 4},
{'price': decimal.Decimal('9.00'), 'quantity': 5},
{'price': decimal.Decimal('1.00'), 'quantity': 21},
2014-09-15 12:15:32 +00:00
]
2014-09-15 15:36:24 +00:00
for rate, result in zip(rates, results):
self.assertEqual(rate['price'], result.price)
self.assertEqual(rate['quantity'], result.quantity)
2014-09-17 10:32:29 +00:00
hyperplan = Plan.objects.create(
name='HYPER', allow_multiple=False, is_combinable=False)
2014-09-15 15:36:24 +00:00
service.rates.create(plan=hyperplan, quantity=1, price=0)
service.rates.create(plan=hyperplan, quantity=20, price=5)
account.plans.create(plan=hyperplan)
results = service.get_rates(account, cache=False)
2014-09-15 15:36:24 +00:00
results = service.rate_method(results, 30)
2014-09-15 12:15:32 +00:00
rates = [
{'price': decimal.Decimal('0.00'), 'quantity': 19},
{'price': decimal.Decimal('5.00'), 'quantity': 11}
2014-09-15 12:15:32 +00:00
]
2014-09-15 15:36:24 +00:00
for rate, result in zip(rates, results):
self.assertEqual(rate['price'], result.price)
self.assertEqual(rate['quantity'], result.quantity)
hyperplan.is_combinable = True
hyperplan.save()
results = service.get_rates(account, cache=False)
2014-09-15 15:36:24 +00:00
results = service.rate_method(results, 30)
rates = [
{'price': decimal.Decimal('0.00'), 'quantity': 23},
{'price': decimal.Decimal('1.00'), 'quantity': 7}
2014-09-15 15:36:24 +00:00
]
for rate, result in zip(rates, results):
self.assertEqual(rate['price'], result.price)
self.assertEqual(rate['quantity'], result.quantity)
service.rate_algorithm = service.MATCH_PRICE
service.save()
results = service.get_rates(account, cache=False)
2014-09-15 15:36:24 +00:00
results = service.rate_method(results, 30)
self.assertEqual(1, len(results))
self.assertEqual(decimal.Decimal('1.00'), results[0].price)
2014-09-15 15:36:24 +00:00
self.assertEqual(30, results[0].quantity)
hyperplan.delete()
results = service.get_rates(account, cache=False)
2014-09-15 15:36:24 +00:00
results = service.rate_method(results, 8)
self.assertEqual(1, len(results))
self.assertEqual(decimal.Decimal('9.00'), results[0].price)
2014-09-15 15:36:24 +00:00
self.assertEqual(8, results[0].quantity)
superplan.delete()
results = service.get_rates(account, cache=False)
2014-09-15 15:36:24 +00:00
results = service.rate_method(results, 30)
self.assertEqual(1, len(results))
self.assertEqual(decimal.Decimal('9.00'), results[0].price)
2014-09-15 15:36:24 +00:00
self.assertEqual(30, results[0].quantity)
def test_rates_allow_multiple(self):
service = self.create_ftp_service()
account = self.create_account()
2014-09-17 10:32:29 +00:00
dupeplan = Plan.objects.create(
name='DUPE', allow_multiple=True, is_combinable=True)
account.plans.create(plan=dupeplan)
service.rates.create(plan=dupeplan, quantity=1, price=0)
service.rates.create(plan=dupeplan, quantity=3, price=9)
results = service.get_rates(account, cache=False)
results = service.rate_method(results, 30)
rates = [
{'price': decimal.Decimal('0.00'), 'quantity': 2},
{'price': decimal.Decimal('9.00'), 'quantity': 28},
]
for rate, result in zip(rates, results):
self.assertEqual(rate['price'], result.price)
self.assertEqual(rate['quantity'], result.quantity)
2014-09-15 15:36:24 +00:00
account.plans.create(plan=dupeplan)
results = service.get_rates(account, cache=False)
results = service.rate_method(results, 30)
rates = [
{'price': decimal.Decimal('0.00'), 'quantity': 4},
{'price': decimal.Decimal('9.00'), 'quantity': 26},
]
for rate, result in zip(rates, results):
self.assertEqual(rate['price'], result.price)
self.assertEqual(rate['quantity'], result.quantity)
2014-09-17 10:32:29 +00:00
account.plans.create(plan=dupeplan)
results = service.get_rates(account, cache=False)
results = service.rate_method(results, 30)
rates = [
{'price': decimal.Decimal('0.00'), 'quantity': 6},
{'price': decimal.Decimal('9.00'), 'quantity': 24},
]
for rate, result in zip(rates, results):
self.assertEqual(rate['price'], result.price)
self.assertEqual(rate['quantity'], result.quantity)
2014-09-18 15:07:39 +00:00
def test_generate_bill_lines_with_compensation(self):
service = self.create_ftp_service()
account = self.create_account()
now = timezone.now().date()
order = Order(
cancelled_on=now,
billed_until=now+relativedelta.relativedelta(years=2)
)
order1 = Order()
orders = [order, order1]
lines = service.handler.generate_bill_lines(orders, account, commit=False)
print lines
print len(lines)