Writing a web service in Python using FastAPI

image



I know, I know, you’re probably thinking "what, again ?!"



Yes, on Habré they have already written about the FastAPI framework many times . But I propose to consider this tool in a little more detail and write the API of your own mini Habr without karma and ratings, but with blackjack and with tests, authentication, migrations and asynchronous work with the database.

Database schema and migrations



First of all, using SQLAlchemy Expression Language , we will describe the database schema. Let's create a file models / users.py :



import sqlalchemy
from sqlalchemy.dialects.postgresql import UUID

metadata = sqlalchemy.MetaData()


users_table = sqlalchemy.Table(
    "users",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("email", sqlalchemy.String(40), unique=True, index=True),
    sqlalchemy.Column("name", sqlalchemy.String(100)),
    sqlalchemy.Column("hashed_password", sqlalchemy.String()),
    sqlalchemy.Column(
        "is_active",
        sqlalchemy.Boolean(),
        server_default=sqlalchemy.sql.expression.true(),
        nullable=False,
    ),
)


tokens_table = sqlalchemy.Table(
    "tokens",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column(
        "token",
        UUID(as_uuid=False),
        server_default=sqlalchemy.text("uuid_generate_v4()"),
        unique=True,
        nullable=False,
        index=True,
    ),
    sqlalchemy.Column("expires", sqlalchemy.DateTime()),
    sqlalchemy.Column("user_id", sqlalchemy.ForeignKey("users.id")),
)


And the models / posts.py file :



import sqlalchemy

from .users import users_table

metadata = sqlalchemy.MetaData()


posts_table = sqlalchemy.Table(
    "posts",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("user_id", sqlalchemy.ForeignKey(users_table.c.id)),
    sqlalchemy.Column("created_at", sqlalchemy.DateTime()),
    sqlalchemy.Column("title", sqlalchemy.String(100)),
    sqlalchemy.Column("content", sqlalchemy.Text()),
)


To automate database migrations, install alembic :



$ pip install alembic


To initialize Alembic, run:



$ alembic init migrations


This command will create in the current directory alembic.ini file and a migrations directory containing:



  • the versions directory , which will store the migration files
  • env.py script that runs when alembic is called
  • a script.py.mako file containing the template for new migrations.


We will indicate the url of our database, for this we will add the line in the alembic.ini file :



sqlalchemy.url = postgresql://%(DB_USER)s:%(DB_PASS)s@%(DB_HOST)s:5432/%(DB_NAME)s


The % (variable_name) s format allows us to set different values ​​of variables depending on the environment, overriding them in the env.py file like this:



from os import environ
from alembic import context
from app.models import posts, users

# Alembic Config   
#     alembic.ini
config = context.config

section = config.config_ini_section
config.set_section_option(section, "DB_USER", environ.get("DB_USER"))
config.set_section_option(section, "DB_PASS", environ.get("DB_PASS"))
config.set_section_option(section, "DB_NAME", environ.get("DB_NAME"))
config.set_section_option(section, "DB_HOST", environ.get("DB_HOST"))

fileConfig(config.config_file_name)

target_metadata = [users.metadata, posts.metadata]


Here we grab the values ​​of DB_USER, DB_PASS, DB_NAME and DB_HOST from environment variables. In addition, the env.py file specifies the metadata of our database in the target_metadata attribute , without which Alembic will not be able to determine what changes need to be made in the database.



Everything is ready and we can generate migrations and update the database:




$ alembic revision --autogenerate -m "Added required tables"
$ alembic upgrade head


We launch the application and connect the database



Let's create a main.py file :



from fastapi import FastAPI

app = FastAPI()

@app.get("/")
def read_root():
    return {"Hello": "World"}


And launch the application by running the command:



$ uvicorn main:app --reload


Let's make sure everything works as it should. Open http://127.0.0.1:8000/ in the browser and see
{"Hello": "World"}


To connect to the database, we will use the databases module , which allows us to execute queries asynchronously.



Let's configure the startup and shutdhown events of our service, at which the connection and disconnection from the database will occur. Let's edit the main.py file :



from os import environ

import databases

#      
DB_USER = environ.get("DB_USER", "user")
DB_PASSWORD = environ.get("DB_PASSWORD", "password")
DB_HOST = environ.get("DB_HOST", "localhost")
DB_NAME = "async-blogs"
SQLALCHEMY_DATABASE_URL = (
    f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}"
)
#   database,      
database = databases.Database(SQLALCHEMY_DATABASE_URL)


app = FastAPI()


@app.on_event("startup")
async def startup():
    #       
    await database.connect()


@app.on_event("shutdown")
async def shutdown():
    #       
    await database.disconnect()


@app.get("/")
async def read_root():
    #    ,      
    query = (
        select(
            [
                posts_table.c.id,
                posts_table.c.created_at,
                posts_table.c.title,
                posts_table.c.content,
                posts_table.c.user_id,
                users_table.c.name.label("user_name"),
            ]
        )
        .select_from(posts_table.join(users_table))
        .order_by(desc(posts_table.c.created_at))
    )
    return await database.fetch_all(query)


We open http://127.0.0.1:8000/ and if we see an empty list [] in the response , then everything went well and we can move on.



Request and response validation



We will implement the possibility of user registration. To do this, we need to validate HTTP requests and responses. To solve this problem, we will use the pydantic library :



pip install pydantic


Create a schemas / users.py file and add a model that is responsible for validating the request body:



from pydantic import BaseModel, EmailStr

class UserCreate(BaseModel):
    """  sign-up  """
    email: EmailStr
    name: str
    password: str


Note that field types are defined using type annotation. In addition to the built-in data types such as int and str , pydantic offers a large number of types that provide additional validation. For example, the EmailStr type checks that the received value is a valid email. To use the EmailStr type, you need to install the email-validator module :



pip install email-validator


The response body must contain its own specific fields, for example id and access_token , so let's add models responsible for generating the response to the schemas / users.py file :



from typing import Optional
from pydantic import UUID4, BaseModel, EmailStr, Field, validator


class UserCreate(BaseModel):
    """  sign-up  """
    email: EmailStr
    name: str
    password: str


class UserBase(BaseModel):
    """       """
    id: int
    email: EmailStr
    name: str


class TokenBase(BaseModel):
    token: UUID4 = Field(..., alias="access_token")
    expires: datetime
    token_type: Optional[str] = "bearer"

    class Config:
        allow_population_by_field_name = True

    @validator("token")
    def hexlify_token(cls, value):
        """  UUID  hex  """
        return value.hex


class User(UserBase):
    """         """
    token: TokenBase = {}


For each field in the model, you can write a custom validator . For example, hexlify_token converts the UUID value to a hex string. It's worth noting that you can use the Field class when you need to override the default behavior of a model field. For example, token: UUID4 = Field (..., alias = "access_token") sets the access_token alias for the token field . To indicate that the field is required, a special value - ... ( ellipsis ) is passed as the first parameter .



Let's add the utils / users.py file , in which we will create the methods needed to write a user to the database:



import hashlib
import random
import string
from datetime import datetime, timedelta
from sqlalchemy import and_

from app.models.database import database
from app.models.users import tokens_table, users_table
from app.schemas import users as user_schema

def get_random_string(length=12):
    """   ,    """
    return "".join(random.choice(string.ascii_letters) for _ in range(length))


def hash_password(password: str, salt: str = None):
    """     """
    if salt is None:
        salt = get_random_string()
    enc = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 100_000)
    return enc.hex()


def validate_password(password: str, hashed_password: str):
    """ ,         """
    salt, hashed = hashed_password.split("$")
    return hash_password(password, salt) == hashed


async def get_user_by_email(email: str):
    """     """
    query = users_table.select().where(users_table.c.email == email)
    return await database.fetch_one(query)


async def get_user_by_token(token: str):
    """       """
    query = tokens_table.join(users_table).select().where(
        and_(
            tokens_table.c.token == token,
            tokens_table.c.expires > datetime.now()
        )
    )
    return await database.fetch_one(query)


async def create_user_token(user_id: int):
    """       user_id """
    query = (
        tokens_table.insert()
        .values(expires=datetime.now() + timedelta(weeks=2), user_id=user_id)
        .returning(tokens_table.c.token, tokens_table.c.expires)
    )

    return await database.fetch_one(query)


async def create_user(user: user_schema.UserCreate):
    """      """
    salt = get_random_string()
    hashed_password = hash_password(user.password, salt)
    query = users_table.insert().values(
        email=user.email, name=user.name, hashed_password=f"{salt}${hashed_password}"
    )
    user_id = await database.execute(query)
    token = await create_user_token(user_id)
    token_dict = {"token": token["token"], "expires": token["expires"]}

    return {**user.dict(), "id": user_id, "is_active": True, "token": token_dict}




Let's create a file routers / users.py and add a sign-up route, indicating that it expects a CreateUser model in the request and returns a User model :

from fastapi import APIRouter
from app.schemas import users
from app.utils import users as users_utils


router = APIRouter()


@router.post("/sign-up", response_model=users.User)
async def create_user(user: users.UserCreate):
    db_user = await users_utils.get_user_by_email(email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return await users_utils.create_user(user=user)


It remains only to connect the routes from the routers / users.py file . To do this, add the following lines to main.py :



from app.routers import users
app.include_router(users.router)


Authentication and access control



Now that we have users in our database, we are ready to set up application authentication. Let's add an endpoint that takes a username and password and returns a token. Update the routers / users.py file to add :



from fastapi import Depends
from fastapi.security import OAuth2PasswordRequestForm


@router.post("/auth", response_model=users.TokenBase)
async def auth(form_data: OAuth2PasswordRequestForm = Depends()):
    user = await users_utils.get_user_by_email(email=form_data.username)

    if not user:
        raise HTTPException(status_code=400, detail="Incorrect email or password")

    if not users_utils.validate_password(
        password=form_data.password, hashed_password=user["hashed_password"]
    ):
        raise HTTPException(status_code=400, detail="Incorrect email or password")

    return await users_utils.create_user_token(user_id=user["id"])


At the same time, we do not need to describe the request model ourselves, Fastapi provides a special dependency class OAuth2PasswordRequestForm , which makes the route expect two fields username and password.



To restrict access to certain routes for unauthenticated users, we will write a dependency method. It will verify that the provided token belongs to the active user and return the user's details. This will allow us to use user information on all routes that require authentication. Let's create a utils / dependecies.py file :



from app.utils import users as users_utils
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer


oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth")


async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = await users_utils.get_user_by_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    if not user["is_active"]:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user"
        )
    return user


Please note that a dependency may in turn depend on another dependency. For example, OAuth2PasswordBearer is a dependency that makes it clear to FastAPI that the current route requires authentication.



To check that everything is working as expected, add the / users / me route , which returns the details of the current user. Add the lines to routers / users.py :



from app.utils.dependencies import get_current_user


@router.get("/users/me", response_model=users.UserBase)
async def read_users_me(current_user: users.User = Depends(get_current_user)):
    return current_user


Now we have the / users / me route, which only authenticated users have access to.



Everything is ready to finally add the ability for users to create and edit publications:



utils / posts.py
from datetime import datetime

from app.models.database import database
from app.models.posts import posts_table
from app.models.users import users_table
from app.schemas import posts as post_schema
from sqlalchemy import desc, func, select


async def create_post(post: post_schema.PostModel, user):
    query = (
        posts_table.insert()
        .values(
            title=post.title,
            content=post.content,
            created_at=datetime.now(),
            user_id=user["id"],
        )
        .returning(
            posts_table.c.id,
            posts_table.c.title,
            posts_table.c.content,
            posts_table.c.created_at,
        )
    )
    post = await database.fetch_one(query)

    # Convert to dict and add user_name key to it
    post = dict(zip(post, post.values()))
    post["user_name"] = user["name"]
    return post


async def get_post(post_id: int):
    query = (
        select(
            [
                posts_table.c.id,
                posts_table.c.created_at,
                posts_table.c.title,
                posts_table.c.content,
                posts_table.c.user_id,
                users_table.c.name.label("user_name"),
            ]
        )
        .select_from(posts_table.join(users_table))
        .where(posts_table.c.id == post_id)
    )
    return await database.fetch_one(query)


async def get_posts(page: int):
    max_per_page = 10
    offset1 = (page - 1) * max_per_page
    query = (
        select(
            [
                posts_table.c.id,
                posts_table.c.created_at,
                posts_table.c.title,
                posts_table.c.content,
                posts_table.c.user_id,
                users_table.c.name.label("user_name"),
            ]
        )
        .select_from(posts_table.join(users_table))
        .order_by(desc(posts_table.c.created_at))
        .limit(max_per_page)
        .offset(offset1)
    )
    return await database.fetch_all(query)


async def get_posts_count():
    query = select([func.count()]).select_from(posts_table)
    return await database.fetch_val(query)


async def update_post(post_id: int, post: post_schema.PostModel):
    query = (
        posts_table.update()
        .where(posts_table.c.id == post_id)
        .values(title=post.title, content=post.content)
    )
    return await database.execute(query)





routers / posts.py
from app.schemas.posts import PostDetailsModel, PostModel
from app.schemas.users import User
from app.utils import posts as post_utils
from app.utils.dependencies import get_current_user
from fastapi import APIRouter, Depends, HTTPException, status

router = APIRouter()


@router.post("/posts", response_model=PostDetailsModel, status_code=201)
async def create_post(post: PostModel, current_user: User = Depends(get_current_user)):
    post = await post_utils.create_post(post, current_user)
    return post


@router.get("/posts")
async def get_posts(page: int = 1):
    total_cout = await post_utils.get_posts_count()
    posts = await post_utils.get_posts(page)
    return {"total_count": total_cout, "results": posts}


@router.get("/posts/{post_id}", response_model=PostDetailsModel)
async def get_post(post_id: int):
    return await post_utils.get_post(post_id)


@router.put("/posts/{post_id}", response_model=PostDetailsModel)
async def update_post(
    post_id: int, post_data: PostModel, current_user=Depends(get_current_user)
):
    post = await post_utils.get_post(post_id)
    if post["user_id"] != current_user["id"]:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="You don't have access to modify this post",
        )

    await post_utils.update_post(post_id=post_id, post=post_data)
    return await post_utils.get_post(post_id)





Let's connect new routes by adding to main.py

from app.routers import posts
app.include_router(posts.router)


Testing



We will write tests in pytest :



$ pip install pytest


For testing endpoints, FastAPI provides a special tool TestClient .



Let's write an endpoint test that doesn't require a database connection:



from app.main import app
from fastapi.testclient import TestClient

client = TestClient(app)


def test_health_check():
    response = client.get("/")
    assert response.status_code == 200
    assert response.json() == {"Hello": "World"}


As you can see, everything is quite simple. You must initialize TestClient and use it to test HTTP requests.



To test the rest of the endpoints, you need to create a test database. Let's edit the main.py file , adding the test base configuration to it:



from os import environ

import databases

DB_USER = environ.get("DB_USER", "user")
DB_PASSWORD = environ.get("DB_PASSWORD", "password")
DB_HOST = environ.get("DB_HOST", "localhost")

TESTING = environ.get("TESTING")

if TESTING:
    #      
    DB_NAME = "async-blogs-temp-for-test"
    TEST_SQLALCHEMY_DATABASE_URL = (
        f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}"
    )
    database = databases.Database(TEST_SQLALCHEMY_DATABASE_URL)
else:
    DB_NAME = "async-blogs"
    SQLALCHEMY_DATABASE_URL = (
        f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}"
    )
    database = databases.Database(SQLALCHEMY_DATABASE_URL)


We are still using the "async-blogs" database for our application. But if the value of the environment variable TESTING is set, then the database "async-blogs-temp-for-test" is used .



To create the "async-blogs-temp-for-test" database automatically when running tests and delete after running them, create a fixture in the tests / conftest.py file :



import os

import pytest

#  `os.environ`,    
os.environ['TESTING'] = 'True'

from alembic import command
from alembic.config import Config
from app.models import database
from sqlalchemy_utils import create_database, drop_database


@pytest.fixture(scope="module")
def temp_db():
    create_database(database.TEST_SQLALCHEMY_DATABASE_URL) #  
    base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
    alembic_cfg = Config(os.path.join(base_dir, "alembic.ini")) #   alembic 
    command.upgrade(alembic_cfg, "head") #  

    try:
        yield database.TEST_SQLALCHEMY_DATABASE_URL
    finally:
        drop_database(database.TEST_SQLALCHEMY_DATABASE_URL) #  


To create and delete the database, we will use the sqlalchemy_utils library .



By using the temp_db fixture in tests, we can test all the endpoints of our application:



def test_sign_up(temp_db):
    request_data = {
        "email": "vader@deathstar.com",
        "name": "Darth Vader",
        "password": "rainbow"
    }
    with TestClient(app) as client:
        response = client.post("/sign-up", json=request_data)
    assert response.status_code == 200
    assert response.json()["id"] == 1
    assert response.json()["email"] == "vader@deathstar.com"
    assert response.json()["name"] == "Darth"
    assert response.json()["token"]["expires"] is not None
    assert response.json()["token"]["access_token"] is not None


tests / test_posts.py
import asyncio

from app.main import app
from app.schemas.users import UserCreate
from app.utils.users import create_user, create_user_token
from fastapi.testclient import TestClient


def test_create_post(temp_db):
    user = UserCreate(
        email="vader@deathstar.com",
        name="Darth",
        password="rainbow"
    )
    request_data = {
      "title": "42",
      "content": "Don't panic!"
    }
    with TestClient(app) as client:
        # Create user and use his token to add new post
        loop = asyncio.get_event_loop()
        user_db = loop.run_until_complete(create_user(user))
        response = client.post(
            "/posts",
            json=request_data,
            headers={"Authorization": f"Bearer {user_db['token']['token']}"}
        )
    assert response.status_code == 201
    assert response.json()["id"] == 1
    assert response.json()["title"] == "42"
    assert response.json()["content"] == "Don't panic!"


def test_create_post_forbidden_without_token(temp_db):
    request_data = {
      "title": "42",
      "content": "Don't panic!"
    }
    with TestClient(app) as client:
        response = client.post("/posts", json=request_data)
    assert response.status_code == 401


def test_posts_list(temp_db):
    with TestClient(app) as client:
        response = client.get("/posts")
    assert response.status_code == 200
    assert response.json()["total_count"] == 1
    assert response.json()["results"][0]["id"] == 1
    assert response.json()["results"][0]["title"] == "42"
    assert response.json()["results"][0]["content"] == "Don't panic!"


def test_post_detail(temp_db):
    post_id = 1
    with TestClient(app) as client:
        response = client.get(f"/posts/{post_id}")
    assert response.status_code == 200
    assert response.json()["id"] == 1
    assert response.json()["title"] == "42"
    assert response.json()["content"] == "Don't panic!"


def test_update_post(temp_db):
    post_id = 1
    request_data = {
      "title": "42",
      "content": "Life? Don't talk to me about life."
    }
    with TestClient(app) as client:
        # Create user token to add new post
        loop = asyncio.get_event_loop()
        token = loop.run_until_complete(create_user_token(user_id=1))
        response = client.put(
            f"/posts/{post_id}",
            json=request_data,
            headers={"Authorization": f"Bearer {token['token']}"}
        )
    assert response.status_code == 200
    assert response.json()["id"] == 1
    assert response.json()["title"] == "42"
    assert response.json()["content"] == "Life? Don't talk to me about life."


def test_update_post_forbidden_without_token(temp_db):
    post_id = 1
    request_data = {
      "title": "42",
      "content": "Life? Don't talk to me about life."
    }
    with TestClient(app) as client:
        response = client.put(f"/posts/{post_id}", json=request_data)
    assert response.status_code == 401




tests / test_users.py
import asyncio
import pytest

from app.main import app
from app.schemas.users import UserCreate
from app.utils.users import create_user, create_user_token
from fastapi.testclient import TestClient


def test_sign_up(temp_db):
    request_data = {
        "email": "vader@deathstar.com",
        "name": "Darth",
        "password": "rainbow"
    }
    with TestClient(app) as client:
        response = client.post("/sign-up", json=request_data)
    assert response.status_code == 200
    assert response.json()["id"] == 1
    assert response.json()["email"] == "vader@deathstar.com"
    assert response.json()["name"] == "Darth"
    assert response.json()["token"]["expires"] is not None
    assert response.json()["token"]["token"] is not None


def test_login(temp_db):
    request_data = {"username": "vader@deathstar.com", "password": "rainbow"}
    with TestClient(app) as client:
        response = client.post("/auth", data=request_data)
    assert response.status_code == 200
    assert response.json()["token_type"] == "bearer"
    assert response.json()["expires"] is not None
    assert response.json()["access_token"] is not None


def test_login_with_invalid_password(temp_db):
    request_data = {"username": "vader@deathstar.com", "password": "unicorn"}
    with TestClient(app) as client:
        response = client.post("/auth", data=request_data)
    assert response.status_code == 400
    assert response.json()["detail"] == "Incorrect email or password"


def test_user_detail(temp_db):
    with TestClient(app) as client:
        # Create user token to see user info
        loop = asyncio.get_event_loop()
        token = loop.run_until_complete(create_user_token(user_id=1))
        response = client.get(
            "/users/me",
            headers={"Authorization": f"Bearer {token['token']}"}
        )
    assert response.status_code == 200
    assert response.json()["id"] == 1
    assert response.json()["email"] == "vader@deathstar.com"
    assert response.json()["name"] == "Darth"


def test_user_detail_forbidden_without_token(temp_db):
    with TestClient(app) as client:
        response = client.get("/users/me")
    assert response.status_code == 401


@pytest.mark.freeze_time("2015-10-21")
def test_user_detail_forbidden_with_expired_token(temp_db, freezer):
    user = UserCreate(
        email="sidious@deathstar.com",
        name="Palpatine",
        password="unicorn"
    )
    with TestClient(app) as client:
        # Create user and use expired token
        loop = asyncio.get_event_loop()
        user_db = loop.run_until_complete(create_user(user))
        freezer.move_to("'2015-11-10'")
        response = client.get(
            "/users/me",
            headers={"Authorization": f"Bearer {user_db['token']['token']}"}
        )
    assert response.status_code == 401




PS Sources



That's all, the source repository from the post can be viewed on GitHub .



All Articles