outposts: separate websocket re-connection logic to decrease requests on reconnect

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
Jens Langhammer 2021-10-30 21:33:50 +02:00
parent 5ba2c80813
commit 8d5460a132
2 changed files with 23 additions and 5 deletions

View File

@ -35,7 +35,8 @@ type APIController struct {
logger *log.Entry logger *log.Entry
reloadOffset time.Duration reloadOffset time.Duration
lastWsReconnect time.Time
wsConn *recws.RecConn wsConn *recws.RecConn
instanceUUID uuid.UUID instanceUUID uuid.UUID
@ -142,6 +143,10 @@ func (a *APIController) StartBackgorundTasks() error {
"build": constants.BUILD(), "build": constants.BUILD(),
}).SetToCurrentTime() }).SetToCurrentTime()
} }
go func() {
a.logger.Debug("Starting WS reconnector...")
a.startWSReConnector()
}()
go func() { go func() {
a.logger.Debug("Starting WS Handler...") a.logger.Debug("Starting WS Handler...")
a.startWSHandler() a.startWSHandler()

View File

@ -56,6 +56,7 @@ func (ac *APIController) initWS(akURL url.URL, outpostUUID strfmt.UUID) {
if err != nil { if err != nil {
ac.logger.WithField("logger", "authentik.outpost.ak-ws").WithError(err).Warning("Failed to hello to authentik") ac.logger.WithField("logger", "authentik.outpost.ak-ws").WithError(err).Warning("Failed to hello to authentik")
} }
ac.lastWsReconnect = time.Now()
} }
// Shutdown Gracefully stops all workers, disconnects from websocket // Shutdown Gracefully stops all workers, disconnects from websocket
@ -69,6 +70,20 @@ func (ac *APIController) Shutdown() {
} }
} }
func (ac *APIController) startWSReConnector() {
for {
time.Sleep(time.Second * 5)
if ac.wsConn.IsConnected() {
continue
}
if time.Since(ac.lastWsReconnect).Seconds() > 30 {
ac.wsConn.CloseAndReconnect()
ac.logger.Info("Reconnecting websocket")
ac.lastWsReconnect = time.Now()
}
}
}
func (ac *APIController) startWSHandler() { func (ac *APIController) startWSHandler() {
logger := ac.logger.WithField("loop", "ws-handler") logger := ac.logger.WithField("loop", "ws-handler")
for { for {
@ -80,8 +95,7 @@ func (ac *APIController) startWSHandler() {
"outpost_type": ac.Server.Type(), "outpost_type": ac.Server.Type(),
"uuid": ac.instanceUUID.String(), "uuid": ac.instanceUUID.String(),
}).Set(0) }).Set(0)
logger.WithError(err).Warning("ws write error, reconnecting") logger.WithError(err).Warning("ws read error")
ac.wsConn.CloseAndReconnect()
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
continue continue
} }
@ -126,8 +140,7 @@ func (ac *APIController) startWSHealth() {
err := ac.wsConn.WriteJSON(aliveMsg) err := ac.wsConn.WriteJSON(aliveMsg)
ac.logger.WithField("loop", "ws-health").Trace("hello'd") ac.logger.WithField("loop", "ws-health").Trace("hello'd")
if err != nil { if err != nil {
ac.logger.WithField("loop", "ws-health").WithError(err).Warning("ws write error, reconnecting") ac.logger.WithField("loop", "ws-health").WithError(err).Warning("ws write error")
ac.wsConn.CloseAndReconnect()
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
continue continue
} else { } else {