From eb2968b7777a5441ae561303195a7df80ca703ff Mon Sep 17 00:00:00 2001 From: Enrique Barcelli Date: Sat, 24 Jun 2023 13:47:56 +0800 Subject: [PATCH] Add authorization and cache handlers --- authhandler.py | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ cachehandler.py | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+) create mode 100644 authhandler.py create mode 100644 cachehandler.py diff --git a/authhandler.py b/authhandler.py new file mode 100644 index 0000000..2efa33f --- /dev/null +++ b/authhandler.py @@ -0,0 +1,49 @@ +from pathlib import Path +from requests_oauthlib import OAuth2Session +import config + + +class AuthHandler: + + def __init__(self, api, clientId, clientSecret): + self.clientId = clientId + self.clientSecret = clientSecret + self.api = api + self.cacheHandler = api.cacheHandler + self.authUrl = config.bq_auth_url + self.tokenUrl = config.bq_access_token_url + self.redirectUri = None + self.state = None + self.token = None + + def getAuthURL(self, redirectUrl): + self.redirectUrl = redirectUrl + oauth = OAuth2Session(self.clientId, redirect_uri=self.redirectUri, scope='openid') + return authorizationUrl + + def retrieveToken(self, response, state=None, redirectUri=None): + if not redirectUri: + if not self.redirectUri: print('redirect uri is not found. init the auth flow first or give the uri as a parameter.') + redirectUri = self.redirectUri + if not state: + if not self.state: print('state is not found. init the auth flow first or give the state as a parameter.') + state = self.state + oauth = OAuth2Session(self.clientId, state=state, redirect_uri=redirectUri) + oauthToken = oauth.fetch_token(self.tokenUrl, client_secret=self.clientSecret,authorization_response=response) + self.token = oauth._client.access_token + self.cacheHandler.setCache(self.clientId, self.token) + return self.token + + def getToken(self): + if self.token: return self.token + print('token is not found. init the auth flow first.') + + def setTokenHeader(self, token): + bearerStr = 'Bearer {token}'.format(token=token) + self.api.headers.update({'Authorization' : bearerStr}) + + def checkHeaderTokens(self): + if 'Authorization' not in self.api.headers: + token = self.cacheHandler.getCache(self.clientId) + if token is None: token = self.getToken() + self.setTokenHeader(token) diff --git a/cachehandler.py b/cachehandler.py new file mode 100644 index 0000000..90b7879 --- /dev/null +++ b/cachehandler.py @@ -0,0 +1,34 @@ +from pathlib import Path +import config + +class CacheHandler: + + def __init__(self): + self.rootDir = Path(__file__).parent.absolute() + self.cacheDir = '{root}/cache'.format(root=self.rootDir) + + def getCache(self, key): + if key in config.CACHE[key]: return config.CACHE[key] + keyPath = '{cache}/{key}.txt'.format(cache=self.cacheDir, key=key) + + try: + with open(keyPath, 'r') as f: + value = f.readlines()[0] + config.CACHE[key] = value + return value + + except IOError: + return None + + def setCache(self, key, value): + keyPath = '{cache}/{key}.txt'.format(cache=self.cacheDir, key=key) + + try: + with open(keyPath, 'w+') as f: + f.write(value) + + config.CACHE[key] = value + return value + + except IOError: + return None