From 950cc59cae4c86f3370aba0fd0ab2eb85c3bbd10 Mon Sep 17 00:00:00 2001 From: Cayo Puigdefabregas Date: Wed, 6 Dec 2023 14:03:49 +0100 Subject: [PATCH] new endpoint --- ereuse_devicehub/modules/oidc/views.py | 76 ++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/ereuse_devicehub/modules/oidc/views.py b/ereuse_devicehub/modules/oidc/views.py index dfe23ccd..cdb7a036 100644 --- a/ereuse_devicehub/modules/oidc/views.py +++ b/ereuse_devicehub/modules/oidc/views.py @@ -213,12 +213,88 @@ class AllowCodeView(GenericMixin): return self.userinfo +class AllowCodeOidc4vpView(GenericMixin): + methods = ['POST'] + decorators = [] + userinfo = None + token = None + discovery = {} + + def dispatch_request(self): + self.code = request.args.get('code') + self.oidc = session.get('oidc') + return jsonify({"result": "ok"}) + # if not self.code or not self.oidc: + # return self.redirect() + + # self.member = MemberFederated.query.filter( + # MemberFederated.dlt_id_provider == self.oidc, + # MemberFederated.client_id.isnot(None), + # MemberFederated.client_secret.isnot(None), + # ).first() + + # if not self.member: + # return self.redirect() + + # self.get_token() + # if 'error' in self.token: + # messages.error(self.token.get('error', '')) + # return self.redirect() + + # self.get_user_info() + # return self.redirect() + + def get_discovery(self): + if self.discovery: + return self.discovery + + try: + url_well_known = self.member.domain + '.well-known/openid-configuration' + self.discovery = requests.get(url_well_known).json() + except Exception: + self.discovery = {'code': 404} + + return self.discovery + + def get_token(self): + data = {'grant_type': 'authorization_code', 'code': self.code} + url = self.member.domain + '/oauth/token' + url = self.get_discovery().get('token_endpoint', url) + + auth = (self.member.client_id, self.member.client_secret) + msg = requests.post(url, data=data, auth=auth) + self.token = json.loads(msg.text) + + def redirect(self): + url = session.get('next_url') or '/login' + return redirect(url) + + def get_user_info(self): + if self.userinfo: + return self.userinfo + if 'access_token' not in self.token: + return + + url = self.member.domain + '/oauth/userinfo' + url = self.get_discovery().get('userinfo_endpoint', url) + access_token = self.token['access_token'] + token_type = self.token.get('token_type', 'Bearer') + headers = {"Authorization": f"{token_type} {access_token}"} + + msg = requests.get(url, headers=headers) + self.userinfo = json.loads(msg.text) + rols = self.userinfo.get('rols', []) + session['rols'] = [(k, k) for k in rols] + return self.userinfo + + ########## # Routes # ########## oidc.add_url_rule('/create_client', view_func=CreateClientView.as_view('create_client')) oidc.add_url_rule('/oauth/authorize', view_func=AuthorizeView.as_view('autorize_oidc')) oidc.add_url_rule('/allow_code', view_func=AllowCodeView.as_view('allow_code')) +oidc.add_url_rule('/allow_code_oidc4vp', view_func=AllowCodeOidc4vpView.as_view('allow_code_oidc4vp')) oidc.add_url_rule('/oauth/token', view_func=IssueTokenView.as_view('oauth_issue_token')) oidc.add_url_rule( '/oauth/userinfo', view_func=OauthProfileView.as_view('oauth_user_info')