| |
@@ -1,146 +0,0 @@
|
| |
- # Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING
|
| |
-
|
| |
- from ipsilon.providers.common import ProviderPageBase
|
| |
- from ipsilon.util.user import UserSession
|
| |
- from ipsilon.util.endpoint import allow_iframe
|
| |
-
|
| |
- import base64
|
| |
- import cherrypy
|
| |
- import time
|
| |
- import json
|
| |
- import M2Crypto
|
| |
-
|
| |
-
|
| |
- class AuthenticateRequest(ProviderPageBase):
|
| |
-
|
| |
- def __init__(self, site, provider, *args, **kwargs):
|
| |
- super(AuthenticateRequest, self).__init__(site, provider)
|
| |
- self.trans = None
|
| |
-
|
| |
- def _preop(self, *args, **kwargs):
|
| |
- self.trans = self.get_valid_transaction('persona', **kwargs)
|
| |
-
|
| |
- def pre_GET(self, *args, **kwargs):
|
| |
- self._preop(*args, **kwargs)
|
| |
-
|
| |
- def pre_POST(self, *args, **kwargs):
|
| |
- self._preop(*args, **kwargs)
|
| |
-
|
| |
-
|
| |
- class Sign(AuthenticateRequest):
|
| |
-
|
| |
- def _base64_url_decode(self, inp):
|
| |
- inp += '=' * (4 - (len(inp) % 4))
|
| |
- return base64.urlsafe_b64decode(inp)
|
| |
-
|
| |
- def _base64_url_encode(self, inp):
|
| |
- return base64.urlsafe_b64encode(inp).replace('=', '')
|
| |
-
|
| |
- def _persona_sign(self, email, publicKey, certDuration):
|
| |
- self.debug('Signing for %s with duration of %s' % (email,
|
| |
- certDuration))
|
| |
- header = {'alg': 'RS256'}
|
| |
- header = json.dumps(header)
|
| |
- header = self._base64_url_encode(header)
|
| |
-
|
| |
- claim = {}
|
| |
- # Valid from 10 seconds before now to account for clock skew
|
| |
- claim['iat'] = 1000 * int(time.time() - 10)
|
| |
- # Validity of at most 24 hours
|
| |
- claim['exp'] = 1000 * int(time.time() +
|
| |
- min(certDuration, 24 * 60 * 60))
|
| |
-
|
| |
- claim['iss'] = self.cfg.issuer_domain
|
| |
- claim['public-key'] = json.loads(publicKey)
|
| |
- claim['principal'] = {'email': email}
|
| |
-
|
| |
- claim = json.dumps(claim)
|
| |
- claim = self._base64_url_encode(claim)
|
| |
-
|
| |
- certificate = '%s.%s' % (header, claim)
|
| |
- digest = M2Crypto.EVP.MessageDigest('sha256')
|
| |
- digest.update(certificate)
|
| |
- signature = self.cfg.key.sign(digest.digest(), 'sha256')
|
| |
- signature = self._base64_url_encode(signature)
|
| |
- signed_certificate = '%s.%s' % (certificate, signature)
|
| |
-
|
| |
- return signed_certificate
|
| |
-
|
| |
- def _willing_to_sign(self, email, username):
|
| |
- for domain in self.cfg.allowed_domains:
|
| |
- if email == ('%s@%s' % (username, domain)):
|
| |
- return True
|
| |
- return False
|
| |
-
|
| |
- @allow_iframe
|
| |
- def POST(self, *args, **kwargs):
|
| |
- if 'email' not in kwargs or 'publicKey' not in kwargs \
|
| |
- or 'certDuration' not in kwargs or '@' not in kwargs['email']:
|
| |
- cherrypy.response.status = 400
|
| |
- raise Exception('Invalid request: %s' % kwargs)
|
| |
-
|
| |
- us = UserSession()
|
| |
- user = us.get_user()
|
| |
-
|
| |
- if user.is_anonymous:
|
| |
- raise cherrypy.HTTPError(401, 'Not signed in')
|
| |
-
|
| |
- if not self._willing_to_sign(kwargs['email'], user.name):
|
| |
- self.log('Not willing to sign for %s, logged in as %s' % (
|
| |
- kwargs['email'], user.name))
|
| |
- raise cherrypy.HTTPError(403, 'Incorrect user')
|
| |
-
|
| |
- return self._persona_sign(kwargs['email'], kwargs['publicKey'],
|
| |
- kwargs['certDuration'])
|
| |
-
|
| |
-
|
| |
- class SignInResult(AuthenticateRequest):
|
| |
- @allow_iframe
|
| |
- def GET(self, *args, **kwargs):
|
| |
- user = UserSession().get_user()
|
| |
-
|
| |
- return self._template('persona/signin_result.html',
|
| |
- loggedin=not user.is_anonymous)
|
| |
-
|
| |
-
|
| |
- class SignIn(AuthenticateRequest):
|
| |
- def __init__(self, *args, **kwargs):
|
| |
- super(SignIn, self).__init__(*args, **kwargs)
|
| |
- self.result = SignInResult(*args, **kwargs)
|
| |
- self.trans = None
|
| |
-
|
| |
- @allow_iframe
|
| |
- def GET(self, *args, **kwargs):
|
| |
- username = None
|
| |
- domain = None
|
| |
- if 'email' in kwargs:
|
| |
- if '@' in kwargs['email']:
|
| |
- username, domain = kwargs['email'].split('@', 2)
|
| |
- self.debug('Persona SignIn requested for: %s@%s' % (username,
|
| |
- domain))
|
| |
-
|
| |
- returl = '%s/persona/SignIn/result?%s' % (
|
| |
- self.basepath, self.trans.get_GET_arg())
|
| |
- data = {'login_return': returl,
|
| |
- 'login_target': 'Persona',
|
| |
- 'login_username': username}
|
| |
- self.trans.store(data)
|
| |
- redirect = '%s/login?%s' % (self.basepath,
|
| |
- self.trans.get_GET_arg())
|
| |
- self.debug('Redirecting: %s' % redirect)
|
| |
- raise cherrypy.HTTPRedirect(redirect)
|
| |
-
|
| |
-
|
| |
- class Persona(AuthenticateRequest):
|
| |
-
|
| |
- def __init__(self, *args, **kwargs):
|
| |
- super(Persona, self).__init__(*args, **kwargs)
|
| |
- self.Sign = Sign(*args, **kwargs)
|
| |
- self.SignIn = SignIn(*args, **kwargs)
|
| |
- self.trans = None
|
| |
-
|
| |
- @allow_iframe
|
| |
- def GET(self, *args, **kwargs):
|
| |
- user = UserSession().get_user()
|
| |
- return self._template('persona/provisioning.html',
|
| |
- loggedin=not user.is_anonymous)
|
| |
I have explicitly not removed the entries from the dbupgrades script, to make sure that we verify we work correctly in upgrade cases.