SpringCloud+Tornado实现请求安全校验 SpringCloud+Tornado基于jwt实现请求安全校验功能
95.8℃ 人气:0项目背景
在实际项目中,Tornado
项目作为一个微服务纳入SpringCloud
体系,该过程中涉及到Tornado
与Spring
体系的安全验证,也就是权限调用校验,在该项目中Tornado
是通过SpringCloud
中的Feign
调用的,经过一系列实验,最后选用jwt
来实现这个权限效验的过程。
实现思路
用户进行登陆认证(后台微服务),认证成功后调用Tornado
项目的认证接口生成token
,该值返回到后台微服务保存在会话中,下一次请求时带上该token
值让服务器进行校验,校验成功则返回正常的响应,否则返回错误信息。
项目结构
common - authServer.py是认证接口 common - basicServer.py是示例接口 handlers - baseHandler.py中放了两种校验方式,会在basicServer.py的调用示例中贴出 utils - jwtUtils.py是生成与校验token的 utils - responseUtils.py是返回结果工具类
具体实现
jwtUtils.py
# -*- coding:utf-8 -*- import jwt import datetime from jwt import exceptions from utils.responseUtils import JsonUtils JWT_SALT = '1qazxdr5' def create_token(payload, timeout=12): """ 创建token :param payload: 例如:{'user_id':1,'username':'xxx@xxx.xx'}用户信息 :param timeout: token的过期时间,默认20分钟 :return: """ headers = { 'typ': 'jwt', 'alg': 'HS256' } payload['exp'] = datetime.datetime.utcnow() + datetime.timedelta(minutes=timeout) result = jwt.encode(payload=payload, key=JWT_SALT, algorithm="HS256", headers=headers).decode('utf-8') return result def parse_payload(token): """ 对token进行校验并获取payload :param token: :return: """ try: verified_payload = jwt.decode(token, JWT_SALT, True) print(verified_payload) return JsonUtils.success('认证通过') except exceptions.ExpiredSignatureError: return JsonUtils.noAuth('token已失效') except jwt.DecodeError: return JsonUtils.noAuth('token认证失败') except jwt.InvalidTokenError: return JsonUtils.noAuth('非法的token')
baseHandler.py
# -*- coding:utf-8 -*- import functools import json import tornado.web from utils import jwtUtils from utils.responseUtils import JsonUtils # 方式一:authenticated 装饰器 def authenticated(method): @functools.wraps(method) def wrapper(self, *args, **kwargs): """ 这里调用的是 current_user 的 get 方法(property装饰), """ # 通过token请求头传递token head = self.request.headers token = head.get("token", "") if not token: self.write(JsonUtils.noAuth("未获取到token请求头")) self.set_header('Content-Type', 'application/json') return result = json.loads(jwtUtils.parse_payload(token)) # 将json解码 print(result) token_msg = json.dumps(result) if result['sta'] != '00': self.write(token_msg) self.set_header('Content-Type', 'application/json') return return method(self, *args, **kwargs) return wrapper # 方式二:进行预设 继承tornado的RequestHandler class BaseHandler(tornado.web.RequestHandler): def prepare(self): super(BaseHandler, self).prepare() def set_default_headers(self): super().set_default_headers() # 进行token校验,继承上面的BaseHandler class TokenHandler(BaseHandler): def prepare(self): # 通过token请求头传递token head = self.request.headers token = head.get("token","") if not token: self.authMsg = json.dumps(JsonUtils.noAuth("未获取到token请求头")) result = json.loads(jwtUtils.parse_payload(token)) # 将json解码 print(result) if result['sta'] != '00': self.isAuth = False else: self.isAuth = True self.authMsg = json.dumps(result)
authServer.py
import tornado.web from utils import jwtUtils from utils.responseUtils import JsonUtils class authHandler(tornado.web.RequestHandler): def post(self, *args, **kwargs): """ 安全认证接口 :param args: :param kwargs: :return: """ username = self.get_argument("username") print("authHandler:" + username) if not username: self.write(JsonUtils.error("参数异常")) else: token = jwtUtils.create_token({"username": username}) print("token:" + token) self.write(JsonUtils.success(token)) self.set_header('Content-Type', 'application/json')
basicServer.py
import tornado.web import json from pandas.core.frame import DataFrame from handlers import baseHandler from utils.responseUtils import JsonUtils from handlers.baseHandler import authenticated class StringHandler(baseHandler.TokenHandler, tornado.web.RequestHandler): """ *** TokenHandler验证,对应baseHandler.py中的方式二 *** """ def get(self): username = self.get_argument('username', 'Hello') # 权限认证通过 if self.isAuth: self.write(JsonUtils.success(username)) else: self.write(self.authMsg)) self.set_header('Content-Type', 'application/json') class TestHandler(tornado.web.RequestHandler): """ *** authenticated验证,对应baseHandler.py中的方式一 *** """ @authenticated def post(self): username = self.get_argument('username', 'Hello') self.write(JsonUtils.success(username)) self.set_header('Content-Type', 'application/json')
responseUtils.py
from tornado.escape import json_encode, utf8 class JsonUtils(object): @staticmethod def success(response): """ 正确返回 :param response: 返回结果 :return: string, {"message": "ok", "sta": "00", "data": } """ return json_encode({"message": "ok", "sta": "00", "data": response}) @staticmethod def info(message): """ 提示返回 :param message: 提示信息 :return: string, """ return json_encode({"message": str(message), "sta": "99001", "data": None}) @staticmethod def error(message): """ 错误返回 :param message: 错误信息 :return: string, """ return json_encode({"message": str(message), "sta": "9999", "data": None}) @staticmethod def noAuth(message): """ 无权限返回 :param message: 错误信息 :return: string, """ return json_encode({"message": str(message), "sta": "403", "data": None})
下面是一些调用的结果图示:
.end
加载全部内容