From 5a0e78c6988e80933ee793b1a7643944c0ca1596 Mon Sep 17 00:00:00 2001 From: Jens Langhammer Date: Tue, 11 May 2021 21:46:30 +0200 Subject: [PATCH] outposts: fix issue with duplicate outpost health Signed-off-by: Jens Langhammer --- authentik/outposts/channels.py | 19 ++++++++++--------- outpost/go.mod | 1 + outpost/go.sum | 2 ++ outpost/pkg/ak/api.go | 5 ++++- outpost/pkg/ak/api_ws.go | 2 ++ 5 files changed, 19 insertions(+), 10 deletions(-) diff --git a/authentik/outposts/channels.py b/authentik/outposts/channels.py index ab9739f1b..918b79881 100644 --- a/authentik/outposts/channels.py +++ b/authentik/outposts/channels.py @@ -42,6 +42,8 @@ class OutpostConsumer(AuthJsonConsumer): outpost: Optional[Outpost] = None + last_uid: Optional[str] = None + def connect(self): super().connect() uuid = self.scope["url_route"]["kwargs"]["pk"] @@ -52,9 +54,7 @@ class OutpostConsumer(AuthJsonConsumer): raise DenyConnection() self.accept() self.outpost = outpost.first() - OutpostState( - uid=self.channel_name, last_seen=datetime.now(), _outpost=self.outpost - ).save(timeout=OUTPOST_HELLO_INTERVAL * 1.5) + self.last_uid = self.channel_name LOGGER.debug( "added outpost instace to cache", outpost=self.outpost, @@ -63,18 +63,20 @@ class OutpostConsumer(AuthJsonConsumer): # pylint: disable=unused-argument def disconnect(self, close_code): - if self.outpost: - OutpostState.for_channel(self.outpost, self.channel_name).delete() + if self.outpost and self.last_uid: + OutpostState.for_channel(self.outpost, self.last_uid).delete() LOGGER.debug( "removed outpost instance from cache", outpost=self.outpost, - channel_name=self.channel_name, + instance_uuid=self.last_uid, ) def receive_json(self, content: Data): msg = from_dict(WebsocketMessage, content) + uid = msg.args.get("uuid", self.channel_name) + self.last_uid = uid state = OutpostState( - uid=self.channel_name, + uid=uid, last_seen=datetime.now(), _outpost=self.outpost, ) @@ -82,8 +84,7 @@ class OutpostConsumer(AuthJsonConsumer): state.version = msg.args.get("version", None) elif msg.instruction == WebsocketMessageInstruction.ACK: return - if state.version: - state.save(timeout=OUTPOST_HELLO_INTERVAL * 1.5) + state.save(timeout=OUTPOST_HELLO_INTERVAL * 1.5) response = WebsocketMessage(instruction=WebsocketMessageInstruction.ACK) self.send_json(asdict(response)) diff --git a/outpost/go.mod b/outpost/go.mod index 7d7fcc00e..667cccb93 100644 --- a/outpost/go.mod +++ b/outpost/go.mod @@ -17,6 +17,7 @@ require ( github.com/go-redis/redis/v7 v7.4.0 // indirect github.com/go-swagger/go-swagger v0.27.0 // indirect github.com/golang/protobuf v1.5.2 // indirect + github.com/google/uuid v1.2.0 // indirect github.com/gorilla/websocket v1.4.2 github.com/jinzhu/copier v0.0.0-20190924061706-b57f9002281a github.com/justinas/alice v1.2.0 diff --git a/outpost/go.sum b/outpost/go.sum index d08d4491c..1382969c3 100644 --- a/outpost/go.sum +++ b/outpost/go.sum @@ -352,6 +352,8 @@ github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm4 github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= diff --git a/outpost/pkg/ak/api.go b/outpost/pkg/ak/api.go index 62571163b..c897003a4 100644 --- a/outpost/pkg/ak/api.go +++ b/outpost/pkg/ak/api.go @@ -8,6 +8,7 @@ import ( "time" "github.com/go-openapi/runtime" + "github.com/google/uuid" "github.com/pkg/errors" "github.com/recws-org/recws" "goauthentik.io/outpost/pkg" @@ -35,7 +36,8 @@ type APIController struct { reloadOffset time.Duration - wsConn *recws.RecConn + wsConn *recws.RecConn + instanceUUID uuid.UUID } // NewAPIController initialise new API Controller instance from URL and API token @@ -70,6 +72,7 @@ func NewAPIController(akURL url.URL, token string) *APIController { logger: log, reloadOffset: time.Duration(rand.Intn(10)) * time.Second, + instanceUUID: uuid.New(), } ac.logger.Debugf("HA Reload offset: %s", ac.reloadOffset) ac.initWS(akURL, outpost.Pk) diff --git a/outpost/pkg/ak/api_ws.go b/outpost/pkg/ak/api_ws.go index cdf891ed0..fb5526961 100644 --- a/outpost/pkg/ak/api_ws.go +++ b/outpost/pkg/ak/api_ws.go @@ -47,6 +47,7 @@ func (ac *APIController) initWS(akURL url.URL, outpostUUID strfmt.UUID) { Instruction: WebsocketInstructionHello, Args: map[string]interface{}{ "version": pkg.VERSION, + "uuid": ac.instanceUUID.String(), }, } err := ws.WriteJSON(msg) @@ -100,6 +101,7 @@ func (ac *APIController) startWSHealth() { Instruction: WebsocketInstructionHello, Args: map[string]interface{}{ "version": pkg.VERSION, + "uuid": ac.instanceUUID.String(), }, } err := ac.wsConn.WriteJSON(aliveMsg)