From 8d5460a1328172f360641af1d4d634378fbdf689 Mon Sep 17 00:00:00 2001 From: Jens Langhammer Date: Sat, 30 Oct 2021 21:33:50 +0200 Subject: [PATCH] outposts: separate websocket re-connection logic to decrease requests on reconnect Signed-off-by: Jens Langhammer --- internal/outpost/ak/api.go | 7 ++++++- internal/outpost/ak/api_ws.go | 21 +++++++++++++++++---- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/internal/outpost/ak/api.go b/internal/outpost/ak/api.go index 4026025b2..7db5a2110 100644 --- a/internal/outpost/ak/api.go +++ b/internal/outpost/ak/api.go @@ -35,7 +35,8 @@ type APIController struct { logger *log.Entry - reloadOffset time.Duration + reloadOffset time.Duration + lastWsReconnect time.Time wsConn *recws.RecConn instanceUUID uuid.UUID @@ -142,6 +143,10 @@ func (a *APIController) StartBackgorundTasks() error { "build": constants.BUILD(), }).SetToCurrentTime() } + go func() { + a.logger.Debug("Starting WS reconnector...") + a.startWSReConnector() + }() go func() { a.logger.Debug("Starting WS Handler...") a.startWSHandler() diff --git a/internal/outpost/ak/api_ws.go b/internal/outpost/ak/api_ws.go index fbd9295a5..b9e992540 100644 --- a/internal/outpost/ak/api_ws.go +++ b/internal/outpost/ak/api_ws.go @@ -56,6 +56,7 @@ func (ac *APIController) initWS(akURL url.URL, outpostUUID strfmt.UUID) { if err != nil { ac.logger.WithField("logger", "authentik.outpost.ak-ws").WithError(err).Warning("Failed to hello to authentik") } + ac.lastWsReconnect = time.Now() } // Shutdown Gracefully stops all workers, disconnects from websocket @@ -69,6 +70,20 @@ func (ac *APIController) Shutdown() { } } +func (ac *APIController) startWSReConnector() { + for { + time.Sleep(time.Second * 5) + if ac.wsConn.IsConnected() { + continue + } + if time.Since(ac.lastWsReconnect).Seconds() > 30 { + ac.wsConn.CloseAndReconnect() + ac.logger.Info("Reconnecting websocket") + ac.lastWsReconnect = time.Now() + } + } +} + func (ac *APIController) startWSHandler() { logger := ac.logger.WithField("loop", "ws-handler") for { @@ -80,8 +95,7 @@ func (ac *APIController) startWSHandler() { "outpost_type": ac.Server.Type(), "uuid": ac.instanceUUID.String(), }).Set(0) - logger.WithError(err).Warning("ws write error, reconnecting") - ac.wsConn.CloseAndReconnect() + logger.WithError(err).Warning("ws read error") time.Sleep(time.Second * 5) continue } @@ -126,8 +140,7 @@ func (ac *APIController) startWSHealth() { err := ac.wsConn.WriteJSON(aliveMsg) ac.logger.WithField("loop", "ws-health").Trace("hello'd") if err != nil { - ac.logger.WithField("loop", "ws-health").WithError(err).Warning("ws write error, reconnecting") - ac.wsConn.CloseAndReconnect() + ac.logger.WithField("loop", "ws-health").WithError(err).Warning("ws write error") time.Sleep(time.Second * 5) continue } else {