outposts: fix issue with duplicate outpost health

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
Jens Langhammer 2021-05-11 21:46:30 +02:00
parent fd4e8a59f4
commit 5a0e78c698
5 changed files with 19 additions and 10 deletions

View File

@ -42,6 +42,8 @@ class OutpostConsumer(AuthJsonConsumer):
outpost: Optional[Outpost] = None outpost: Optional[Outpost] = None
last_uid: Optional[str] = None
def connect(self): def connect(self):
super().connect() super().connect()
uuid = self.scope["url_route"]["kwargs"]["pk"] uuid = self.scope["url_route"]["kwargs"]["pk"]
@ -52,9 +54,7 @@ class OutpostConsumer(AuthJsonConsumer):
raise DenyConnection() raise DenyConnection()
self.accept() self.accept()
self.outpost = outpost.first() self.outpost = outpost.first()
OutpostState( self.last_uid = self.channel_name
uid=self.channel_name, last_seen=datetime.now(), _outpost=self.outpost
).save(timeout=OUTPOST_HELLO_INTERVAL * 1.5)
LOGGER.debug( LOGGER.debug(
"added outpost instace to cache", "added outpost instace to cache",
outpost=self.outpost, outpost=self.outpost,
@ -63,18 +63,20 @@ class OutpostConsumer(AuthJsonConsumer):
# pylint: disable=unused-argument # pylint: disable=unused-argument
def disconnect(self, close_code): def disconnect(self, close_code):
if self.outpost: if self.outpost and self.last_uid:
OutpostState.for_channel(self.outpost, self.channel_name).delete() OutpostState.for_channel(self.outpost, self.last_uid).delete()
LOGGER.debug( LOGGER.debug(
"removed outpost instance from cache", "removed outpost instance from cache",
outpost=self.outpost, outpost=self.outpost,
channel_name=self.channel_name, instance_uuid=self.last_uid,
) )
def receive_json(self, content: Data): def receive_json(self, content: Data):
msg = from_dict(WebsocketMessage, content) msg = from_dict(WebsocketMessage, content)
uid = msg.args.get("uuid", self.channel_name)
self.last_uid = uid
state = OutpostState( state = OutpostState(
uid=self.channel_name, uid=uid,
last_seen=datetime.now(), last_seen=datetime.now(),
_outpost=self.outpost, _outpost=self.outpost,
) )
@ -82,7 +84,6 @@ class OutpostConsumer(AuthJsonConsumer):
state.version = msg.args.get("version", None) state.version = msg.args.get("version", None)
elif msg.instruction == WebsocketMessageInstruction.ACK: elif msg.instruction == WebsocketMessageInstruction.ACK:
return return
if state.version:
state.save(timeout=OUTPOST_HELLO_INTERVAL * 1.5) state.save(timeout=OUTPOST_HELLO_INTERVAL * 1.5)
response = WebsocketMessage(instruction=WebsocketMessageInstruction.ACK) response = WebsocketMessage(instruction=WebsocketMessageInstruction.ACK)

View File

@ -17,6 +17,7 @@ require (
github.com/go-redis/redis/v7 v7.4.0 // indirect github.com/go-redis/redis/v7 v7.4.0 // indirect
github.com/go-swagger/go-swagger v0.27.0 // indirect github.com/go-swagger/go-swagger v0.27.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.2.0 // indirect
github.com/gorilla/websocket v1.4.2 github.com/gorilla/websocket v1.4.2
github.com/jinzhu/copier v0.0.0-20190924061706-b57f9002281a github.com/jinzhu/copier v0.0.0-20190924061706-b57f9002281a
github.com/justinas/alice v1.2.0 github.com/justinas/alice v1.2.0

View File

@ -352,6 +352,8 @@ github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm4
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM= github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=

View File

@ -8,6 +8,7 @@ import (
"time" "time"
"github.com/go-openapi/runtime" "github.com/go-openapi/runtime"
"github.com/google/uuid"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/recws-org/recws" "github.com/recws-org/recws"
"goauthentik.io/outpost/pkg" "goauthentik.io/outpost/pkg"
@ -36,6 +37,7 @@ type APIController struct {
reloadOffset time.Duration reloadOffset time.Duration
wsConn *recws.RecConn wsConn *recws.RecConn
instanceUUID uuid.UUID
} }
// NewAPIController initialise new API Controller instance from URL and API token // NewAPIController initialise new API Controller instance from URL and API token
@ -70,6 +72,7 @@ func NewAPIController(akURL url.URL, token string) *APIController {
logger: log, logger: log,
reloadOffset: time.Duration(rand.Intn(10)) * time.Second, reloadOffset: time.Duration(rand.Intn(10)) * time.Second,
instanceUUID: uuid.New(),
} }
ac.logger.Debugf("HA Reload offset: %s", ac.reloadOffset) ac.logger.Debugf("HA Reload offset: %s", ac.reloadOffset)
ac.initWS(akURL, outpost.Pk) ac.initWS(akURL, outpost.Pk)

View File

@ -47,6 +47,7 @@ func (ac *APIController) initWS(akURL url.URL, outpostUUID strfmt.UUID) {
Instruction: WebsocketInstructionHello, Instruction: WebsocketInstructionHello,
Args: map[string]interface{}{ Args: map[string]interface{}{
"version": pkg.VERSION, "version": pkg.VERSION,
"uuid": ac.instanceUUID.String(),
}, },
} }
err := ws.WriteJSON(msg) err := ws.WriteJSON(msg)
@ -100,6 +101,7 @@ func (ac *APIController) startWSHealth() {
Instruction: WebsocketInstructionHello, Instruction: WebsocketInstructionHello,
Args: map[string]interface{}{ Args: map[string]interface{}{
"version": pkg.VERSION, "version": pkg.VERSION,
"uuid": ac.instanceUUID.String(),
}, },
} }
err := ac.wsConn.WriteJSON(aliveMsg) err := ac.wsConn.WriteJSON(aliveMsg)