[TOC]
## 第6章 Token與HTTPBasic驗證 —— 用令牌來管理用戶
在我的TP5課程里,我們使用令牌的方式是服務器緩存的方式。那么在Python Flask中我們換一種令牌的發放方式。我們將用戶的信息加密后作為令牌返回到客戶端,客戶端在訪問服務器API時必須以HTTP Basic的方式攜帶令牌,我們再讀取令牌信息后,將用戶信息存入到g變量中,共業務代碼全局使用...
### 6-1 Token概述


### 6-2 獲取Token令牌
理論上來說 get\_token 的 HTTP 動詞應該設為 get,但是由于獲取 token 需要傳入賬號和密碼,所以這里要使用 POST 方法。此外相對于 get 方法,post 方法的傳參相對安全。如果使用 get 的方式來傳遞用戶的賬號和密碼,我們只能夠把這兩個參數放在 url 后面的?(問號)里作為查詢參數來傳遞,但是如果使用post 的話,我們就可以把賬號和密碼放到 HTTP 的 body 里面來傳遞。
先編寫 ginger/app/api/v1/token.py,思路:
1. 創建 token 紅圖,將 token 紅圖注冊到 v1藍圖中去
2. 注冊 token 路由,使用 POST 方法
3. 實例化 ClientForm,驗證 validate\_for\_api(驗證數據格式)
4. 使用 promise 辨別客戶端類型
5. 使用 promise 驗證客戶端提交的賬戶密碼,查詢用戶返回用戶 id
6. 生成 token,使用 TimedJSONWebSignatureSerializer 序列化器生成 token 生成的 token 是byte 類型的字符串,需要使用 `decode('ascii')`解碼
7. 將 token 序列化返回,并返回 HTTP 狀態碼
~~~
?from flask import current_app, jsonify
??
?from app.libs.enums import ClientTypeEnum
?from app.libs.red_print import RedPrint
?from app.models.user import User
?from app.validators.forms import ClientForm
?from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
??
?api = RedPrint('token')
??
??
?@api.route('', methods=['POST'])
?def get_token():
? ? ?form = ClientForm().validate_for_api()
? ? ?promise = {
? ? ? ? ?ClientTypeEnum.USER_EMAIL: User.verify_by_email,
? ? ? ? ?ClientTypeEnum.USER_MOBILE: User.verify_by_mobile,
? ? ? ? ?ClientTypeEnum.USER_MINA: User.verify_by_mina,
? ? ? ? ?ClientTypeEnum.USER_WX: User.verify_by_wx,
? ? }
? ? ?identity = promise[ClientTypeEnum(form.type.data)](form.account.data, form.secret.data)
? ? ?expiration = current_app.config['TOKEN_EXPIRATION']
? ? ?token = generate_auth_token(identity['uid'], form.type.data, None, expiration)
? ? ?t = {
? ? ? ? ?'token': token.decode('ascii')
? ? }
? ? ?return jsonify(t), 201
??
??
?def generate_auth_token(uid, ac_type, scope=None, expiration=7200):
? ? ?s = Serializer(current_app.config['SECRET_KEY'], expires_in=expiration)
? ? ?return s.dumps({
? ? ? ? ?'uid': uid,
? ? ? ? ?'token': ac_type.value
? ? })
~~~
在 User 模型內編寫驗證客戶端賬戶密碼的方法:
1. 先查詢用戶
2. 如果用戶不存在,返回找不到
3. 使用 check\_password\_hash 比對客戶端傳入的密碼與數據庫中保存的密碼
4. 如果用戶密碼錯誤,返回授權失敗
~~~
?@staticmethod
? ? ?def verify_by_email(email, password):
? ? ? ? ?user = User.query.filter_by(email=email).first()
? ? ? ? ?if not user:
? ? ? ? ? ? ?raise NotFound(msg='user not found')
? ? ? ? ?if not user.check_password(password):
? ? ? ? ? ? ?raise AuthFailed()
? ? ? ? ?return {'uid': user.id}
??
? ? ?def check_password(self, raw):
? ? ? ? ?if not self.password:
? ? ? ? ? ? ?return False
? ? ? ? ?return check_password_hash(self.password, raw)
~~~
在配置文件中配置 TOKEN\_EXPIRATION,這是作為開發用途,所以過期時間比較長,正式上線之后過期時間設置為兩小時比較合適
~~~
?TOKEN_EXPIRATION = 30 * 24 * 3600
~~~
### 6-3 Token的用處
### 6-4 @auth攔截器執行流程
本節主要了解一下 @auth.login\_required 和 @auth.verify\_password 兩個裝飾器的作用。
現在需要保護的視圖函數前打上 @auth.login\_required 裝飾器,
~~~
?@api.route('', methods=['GET'])
?@auth.login_required
?def get_user():
? ? ?return 'user'
~~~
再在 ginger/app/libs/token\_auth.py 內為 verify\_password 函數打上 @verify\_password 裝飾器。
~~~
?from flask_httpauth import HTTPBasicAuth
??
?auth = HTTPBasicAuth()
??
?@auth.verify_password
?def verify_password(account, password):
? ? ?pass
~~~
@auth 攔截器的請求流程:
1. 先訪問到 get\_user 視圖函數,
2. 再轉到 verify\_password 函數,
* 如果 verify\_password 函數返回 True,則返回到 get\_user 視圖函數內執行 get\_user 的內容,最后返回給客戶端
* 否則,直接給客戶端返回 Unauthorized Access
### 6-5 HTTPBasicAuth基本原理
在 verify\_password 函數中需要接收 account、password 兩個參數,那么如何獲取這兩個參數呢?
之前在 ClientForm 中已經可以接收到客戶端傳來的 account、password 了,這種接收賬號和密碼的方式是我們自己定義的一種方式,毫無疑問賬號和密碼只是兩個普通的參數,當然可以通過自定義的方式傳送到服務器。
除了自定義的方式傳參方式,HTTP 這種協議本身就有一種規范,這種規范允許我們傳遞賬號和密碼。HTTP 自帶的發送賬號和密碼的方式有很多種,其中一種就是 HTTPBasicAuth,還有其他的諸如 HTTPDigestAuth 等其他規范。
HTTPBasicAuth 規定:
必須將賬號、密碼放在 HTTP 的 Headers 里面。(之前 ClientForm 是將賬戶和密碼放在 HTTP 的 body 里面發送的)
HTTP 的頭是一組一組的 key:value 的鍵值對。
* 需要在 HTTP 的頭中設置一個固定的 key,key 的名字叫做 Authorization,
* key 的 value 是 basic base64(account:password)
> 注意:
>
> 1. 前面是 basic + 一個空格
>
> 2. base64() 表示 base64 加密
>
postman 演示:


只要 HTTP 傳入的參數符合規范,verify\_password 函數就能正確的獲到賬戶和密碼。
> 面試問題:
>
> 以上就是 HTTPBasicAuth 的基本原理和傳遞規范,關于 HTTP 的協議在很多服務器面試的時候都會經常問到,這種 HTTPBasicAuth 面試的問題也是非常多的,一定要熟記。
### 6-6 以BasicAuth的方式發送Token
第一次登陸的時候需要賬戶密碼,但是后續的訪問需要的是 token 而不是 account、password,但是我們可以通過相同的方式傳遞 token,我們只需要將 token 當做 account 來傳就可以了,密碼不傳或者傳空。
### 6-7 驗證Token
1. 我們需要編寫 verify\_auth\_token 函數來驗證 token
1. 使用 SECRET\_KEY 實例化一個序列化器
2. 使用 `s.loads(token)`將 token 反序列化賦給 data
* 如果遇到 BadSignature,則表示 token 錯誤,返回 token 非法的錯誤信息
* 如果遇到 SignatureExpired,則表示 token 過期,返回 token 過期的錯誤信息
3. 從 data 中取出 uid 和 ac\_type(這兩個是之前我們生產 token 的時候放進去的)
4. 然后將 uid 和 ac\_type 打包返回,我們可以選擇元組、字典的方式返回,但是最好的方式是使用一個對象式的結構返回回去:
~~~
?User = namedtuple('User', ['uid', 'ac_type', 'scope'])
~~~
這里使用 namedtuple 有什么優勢,我們在后面時候 User 對象的時候就知道了。
2. 將 verify\_auth\_token 的返回值賦給 user\_info
3. 如果 user\_info 為空,則表示驗證失敗,返回 False
4. 否則將 user\_info 保存到 g 變量的 user 屬性中。 (這個 g 變量就跟 flask 的 request 對象是一樣的,它們都是一個代理模式的實現,至于 g 變量怎么用后面再說,線程安全。)
~~~
?from collections import namedtuple
??
?from flask import current_app, g
?from flask_httpauth import HTTPBasicAuth
?from itsdangerous import TimedJSONWebSignatureSerializer as Serializer, BadSignature, SignatureExpired
??
?from app.libs.error_code import AuthFailed
??
?auth = HTTPBasicAuth()
?User = namedtuple('User', ['uid', 'ac_type', 'scope'])
??
??
?@auth.verify_password
?def verify_password(token, password):
? ? ?user_info = verify_auth_token(token)
? ? ?if not user_info:
? ? ? ? ?return False
? ? ?else:
? ? ? ? ?g.user = user_info
? ? ? ? ?return True
??
??
?def verify_auth_token(token):
? ? ?s = Serializer(current_app.config['SECRET_KEY'])
? ? ?try:
? ? ? ? ?data = s.loads(token)
? ? ?except BadSignature:
? ? ? ? ?raise AuthFailed(msg='token is invalid', error_code=1002)
? ? ?except SignatureExpired:
? ? ? ? ?raise AuthFailed(msg='token is expired', error_code=1003)
? ? ?uid = data['uid']
? ? ?ac_type = data['type']
? ? ?return User(uid, ac_type, '')
~~~
### 6-8 重寫first\_or\_404與get\_or\_404
前面對 get\_user 視圖函數的保護工作已經做完了,現在我們來正式編寫 get\_user 視圖函數。
1. 查詢用戶
2. 判斷查詢的用戶是否存在,不存在則返回 NotFound
~~~
?@api.route('/<int:uid>', methods=['GET'])
?@auth.login_required
?def get_user(uid):
? ? ?user = User.query.get(uid)
? ? ?if not user:
? ? ? ? ?raise NotFound()
? ? ?return user
~~~
那么問題來了,我們每次查詢數據庫的時候都需要判斷查詢的內容是否存在,這是一件很麻煩的事情。我們可不可以不寫呢?好在有一個 get\_or\_404 方法,但是 get\_or\_404 方法放回的是系統內部的錯誤信息,并不是 APIException,不是 APIException 的話那么返回的格式就不是我們要求的 JSON 格式,所以我們需要重寫 get\_or\_404 方法,讓它返回 APIException。get\_or\_404 是 query 的方法,我們找到 query 在 ginger/models/base.py 中:


我們按照 BaseQuery.get\_or\_404 仿寫就可以了,其他都一樣,只是`abort(404)`那里改成 `raise APIException`就可以了。同理仿寫 first\_or\_404。
~~~
?class Query(BaseQuery):
? ? ?def filter_by(self, **kwargs):
? ? ? ? ?if 'status' not in kwargs.keys():
? ? ? ? ? ? ?kwargs['status'] = 1
? ? ? ? ?return super(Query, self).filter_by(**kwargs)
??
? ? ?def get_or_404(self, ident):
? ? ? ? ?rv = self.get(ident)
? ? ? ? ?if not rv:
? ? ? ? ? ? ?raise NotFound()
? ? ? ? ?return rv
??
? ? ?def first_or_404(self):
? ? ? ? ?rv = self.first()
? ? ? ? ?if not rv:
? ? ? ? ? ? ?raise NotFound()
? ? ? ? ?return rv
~~~
以下是需要修改的地方:
1. 第一處改成:
2. 第二處 
改成: 