c8b175f9d26f422afd56a6a20285302e.png

关于JWT

JWT的意思是JSON Web Tokens,它是现阶段非常流行的认证系统。互联网服务总是离不开认证,在过去,开发者们通常使用 cookiessession来做认证系统,但随着互联网的发展,这种方式被淘汰。取而代之的,便是JWT。

JWT的原理

JWT的原理较为简单,为了方便理解,假设我们在登录场景。

  1. 用户在网页上填写账号和密码后,通过JavaScript POST到服务端。
  2. 服务端对接收的信息验证通过后,会生成一段json信息返回给客户端,信息大概是这样的:
{"user_name": "张三", "uid": 1, "id": 1}
  1. 客户端收到该信息后,存储在浏览器的local Storage
  2. 客户端发送请求时,都携带这一段数据,那么服务端就知道客户端是谁在请求了。

但做到这些还不够,为了防止伪造和篡改,我们还需要一个只有服务器知道的密钥,并且使用HS256算法基于此密钥在签发的时候对信息加密,在接收信息时对信息解密。

安装相关模块

使用jose来签发和验证 JWT tokens

pip install python-jose[cryptography]

使用passlib来处理密码hash

pip install passlib[bcrypt]

如果遇到安装失败,可以尝试执行如下命令:

pip install -i http://pypi.douban.com/simple/ --trusted-host pypi.douban.com python-jose[cryptography]
pip install -i http://pypi.douban.com/simple/ --trusted-host pypi.douban.com passlib[bcrypt]

处理密码hash和验证

密码本身只是一段字符串,我们存储在数据库中的密码都是经过hash处理的,这样的好处在于,即便是数据库因为入侵或者其他原因泄露,那些不法分子也不能轻易破解出用户的密码,有利于保护用户隐私。

from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 验证密码hash
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)
# 计算密码hash值
def get_password_hash(password):
    return pwd_context.hash(password)

一般而言,在用户设置密码时,我们将计算出密码的hash值并存储在数据库中,用户登录时,服务端计算用户输入的密码hash值与数据库中的hash值比对。如果一致,则密码正确。

完整的JWT处理过程

为了程序的连贯性,这里直接贴上完整的例程,请仔细看注释。


# 导入相关的模块
from datetime import datetime, timedelta
from typing import Optional

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
from starlette.middleware.cors import CORSMiddleware

"""定义关于token的相关常量
SECRET_KEY : 用于加密解密的密钥,只允许服务器知道,打死不告诉别人
            可以执行 openssl rand -hex 32 获取一串随机的字符
ALGORITHM  : 定义加密解密所使用的算法
ACCESS_TOKEN_EXPIRE_MINUTES : token的有效期  
"""
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


# 这里定义一个字典,来模拟数据库中的数据
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
}


class Token(BaseModel):
    """定义token的数据模型"""
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: Optional[str] = None

class FormData(BaseModel):
    uname: str
    passwd: str

class User(BaseModel):
    """定义用户的数据模型"""
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


# 创建一个加密解密上下文环境(甚至可以不用管这两句话啥意思)
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# 实例化一个FastAPI实例
app = FastAPI()


# 设置允许访问的域名
origins = ["http://127.0.0.1"]  #也可以设置为"*",即为所有。


# 设置跨域传参
app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,  # 设置允许的origins来源
    allow_credentials=True,
    allow_methods=["*"],  # 设置允许跨域的http方法,比如 get、post、put等。
    allow_headers=["*"])  # 允许跨域的headers,可以用来鉴别来源等作用。


def verify_password(plain_password, hashed_password):
    """验证密码是否正确
    :param plain_password: 明文
    :param hashed_password: 明文hash值
    :return:
    """
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password):
    """获取密码的hash值
    :param password: 欲获取hash的明文密码
    :return: 返回一个hash字符串
    """
    return pwd_context.hash(password)


def get_user(db, username: str):
    """查询用户
    :param db: 模拟的数据库
    :param username: 用户名
    :return: 返回一个用户的BaseModel(其实就是字典的BaseModel对象,二者可互相转换)
    """
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def authenticate_user(fake_db, username: str, password: str):
    """验证用户
    :param fake_db: 存储用户的数据库(这里是上面用字典模拟的)
    :param username: 用户名
    :param password: 密码
    :return:
    """
    # 从数据库获取用户信息
    user = get_user(fake_db, username)
    # 如果获取为空,返回False
    if not user:
        return False
    # 如果密码不正确,也是返回False
    if not verify_password(password, user.hashed_password):
        return False
    # 如果存在此用户,且密码也正确,则返回此用户信息
    return user


def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    """创建tokens函数
    :param data: 对用JWT的Payload字段,这里是tokens的载荷,在这里就是用户的信息
    :param expires_delta: 缺省参数,截止时间
    :return:
    """
    # 深拷贝data
    to_encode = data.copy()
    # 如果携带了截至时间,就单独设置tokens的过期时间
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        # 否则的话,就默认用15分钟
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    # 编码,至此 JWT tokens诞生
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user(token: str = Depends(oauth2_scheme)):
    """获取当前用户信息,实际上是一个解密token的过程
    :param token: 携带的token
    :return:
    """
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        # 解密tokens
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        # 从tokens的载荷payload中获取用户名
        username: str = payload.get("sub")
        # 如果没有获取到,抛出异常
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    # 从数据库查询用户信息
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user


async def get_current_active_user(current_user: User = Depends(get_current_user)):
    """获取当前用户信息,实际上是作为依赖,注入其他路由以使用。
    :param current_user:
    :return:
    """
    # 如果用户被禁,抛出异常
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: FormData):
    """这里定义了一个接口,路径为 /token, 用于用户申请tokens
    :param form_data:
    :return:
    """
    # 首先对用户做出检查
    user = authenticate_user(fake_users_db, form_data.uname, form_data.passwd)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    # 定义tokens过期时间
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    # 创建token
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    # 返回token信息,JavaScript接收并存储,用于下次访问
    return {"access_token": access_token, "token_type": "bearer"}


@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    """获取当前用户信息
    :param current_user:
    :return:
    """
    return current_user


@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
    return [{"item_id": "Foo", "owner": current_user.username}]

启动程序

uvicorn main:app --port 8080 --host 127.0.0.1

编写前端文件

新建一个login.html并键入如下内容:

<html>
  <head>
    <meta charset="utf-8" />
    <title>FastAPI 登录测试</title>
    <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script>
  </head>
  <style>.login {display: flex;flex-direction: column;}</style>
  <body>
    <div class="login">
      <div class="item">用户名:<input type="text" name="username" id="uname" /></div>
      <div class="item">密码:<input type="password" name="password" id="passwd" /></div>
      <div class="item"><button id="login">登录</button></div>
      <div class="item"><button id="info">获取用户信息</button></div>
    </div>
    <script>

      // 监听点击登录按钮
      $("#login").click(function () {
          // 获取用户输入的用户名和密码
          let userName = $("#uname").val();
          let passWD = $("#passwd").val();
          console.log("用户输入的账户和密码", userName, passWD);
          getToken(userName, passWD);
      });

      // 获取用户信息
      $("#info").click(function(){
          // 从localStorage读取 token
          let token = localStorage.getItem('token');
          if(token){
            var headers = {Authorization: "Bearer " + token};
              $.ajax({
                  url: "http://127.0.0.1:8080/users/me",
                  headers: headers,
                  success: function(res){
                      console.log(res);
                  }
              })
          }else{
              console.log("无权限");
          }
      })
      // 获取token, 并存入本地
      function getToken(uname, passwd){
          $.ajax({
              url: "http://127.0.0.1:8080/token",
              method: "POST",
              async: false,
              data: JSON.stringify({'uname': uname, 'passwd': passwd}),
              success: function(res){
                  console.log("获取到的token", res['access_token']);
                  // 将获取到的token,存入浏览器 localStorage
                  localStorage.setItem("token", res['access_token']);
              }
          })
      }
    </script>
  </body>
</html>

登录过程解析

  1. 输入用户名与密码,在还没点击登录的情况下点击获取用户信息,提示无权限
    1.png
  2. 并且localStorage也没有存储token
    2.png
  3. 点击登录,获取token并存在本地
    3.png

4.png

携带token访问获取用户信息接口,获取成功
5.png

完整的gif演示:
GIF 2021-2-13 21-21-27.gif

已有 4 条评论

  1. 十冬 十冬

    前端文件没有在fastapi里应用呀?

  2. 前后端分离开发,他们俩是分开的,前端只调用一下后端写的接口。

  3. 十冬 十冬

    感谢博主,明白了。我想让后端post接收数据的时候与token认证一起,官方文档上看不太懂。

  4. 俐俐 俐俐

    我按照教程一步一步下来,报错如下:

    from jose import JWTError, jwt

    ModuleNotFoundError: No module named 'jose'
    我非常非常肯定已经安装好了jose或者python-jose模块,python 的 site-packages已经找到这两个模块
    不知为啥还会报错

添加新评论