e2e: add proxy connectivity test via Websocket

This commit is contained in:
Jens Langhammer 2020-09-28 09:04:44 +02:00
parent b6d7847eae
commit 553f184aad
2 changed files with 65 additions and 0 deletions

View file

@ -8,7 +8,9 @@ from docker.client import DockerClient, from_env
from docker.models.containers import Container
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from channels.testing import ChannelsLiveServerTestCase
from passbook import __version__
from e2e.utils import USER, SeleniumTestCase
from passbook.core.models import Application
from passbook.flows.models import Flow
@ -94,3 +96,65 @@ class TestProviderProxy(SeleniumTestCase):
full_body_text = self.driver.find_element(By.CSS_SELECTOR, "pre").text
self.assertIn("X-Forwarded-Preferred-Username: pbadmin", full_body_text)
@skipUnless(platform.startswith("linux"), "requires local docker")
class TestProviderProxyConnect(ChannelsLiveServerTestCase):
"""Test Proxy connectivity over websockets"""
proxy_container: Container
def tearDown(self) -> None:
self.proxy_container.kill()
super().tearDown()
def start_proxy(self, outpost: Outpost) -> Container:
"""Start proxy container based on outpost created"""
client: DockerClient = from_env()
container = client.containers.run(
image="beryju/passbook-proxy:latest",
detach=True,
network_mode="host",
auto_remove=True,
environment={
"PASSBOOK_HOST": self.live_server_url,
"PASSBOOK_TOKEN": outpost.token.token_uuid.hex,
},
)
return container
def test_proxy_connectivity(self):
SeleniumTestCase().apply_default_data()
proxy: ProxyProvider = ProxyProvider.objects.create(
name="proxy_provider",
authorization_flow=Flow.objects.get(
slug="default-provider-authorization-implicit-consent"
),
internal_host="http://localhost:80",
external_host="http://localhost:4180",
)
# Ensure OAuth2 Params are set
proxy.set_oauth_defaults()
proxy.save()
# we need to create an application to actually access the proxy
Application.objects.create(name="proxy", slug="proxy", provider=proxy)
outpost: Outpost = Outpost.objects.create(
name="proxy_outpost",
type=OutpostType.PROXY,
deployment_type=OutpostDeploymentType.CUSTOM,
)
outpost.providers.add(proxy)
outpost.save()
self.proxy_container = self.start_proxy(outpost)
# Wait until outpost healthcheck succeeds
healthcheck_retries = 0
while healthcheck_retries < 50:
if outpost.deployment_health:
break
healthcheck_retries += 1
sleep(0.5)
self.assertIsNotNone(outpost.deployment_health)
self.assertEqual(outpost.deployment_version.get("version"), __version__)

View file

@ -315,6 +315,7 @@ class TestSourceOAuth1(SeleniumTestCase):
self.wait.until(
ec.presence_of_element_located((By.CSS_SELECTOR, "[name='confirm']"))
)
sleep(1)
self.driver.find_element(By.CSS_SELECTOR, "[name='confirm']").click()
# Wait until we've loaded the user info page