I know, I know, you’re probably thinking "what, again ?!"
Yes, on Habré they have already written about the FastAPI framework many times . But I propose to consider this tool in a little more detail and write the API of your own mini Habr without karma and ratings, but
Database schema and migrations
First of all, using SQLAlchemy Expression Language , we will describe the database schema. Let's create a file models / users.py :
import sqlalchemy
from sqlalchemy.dialects.postgresql import UUID
metadata = sqlalchemy.MetaData()
users_table = sqlalchemy.Table(
"users",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("email", sqlalchemy.String(40), unique=True, index=True),
sqlalchemy.Column("name", sqlalchemy.String(100)),
sqlalchemy.Column("hashed_password", sqlalchemy.String()),
sqlalchemy.Column(
"is_active",
sqlalchemy.Boolean(),
server_default=sqlalchemy.sql.expression.true(),
nullable=False,
),
)
tokens_table = sqlalchemy.Table(
"tokens",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column(
"token",
UUID(as_uuid=False),
server_default=sqlalchemy.text("uuid_generate_v4()"),
unique=True,
nullable=False,
index=True,
),
sqlalchemy.Column("expires", sqlalchemy.DateTime()),
sqlalchemy.Column("user_id", sqlalchemy.ForeignKey("users.id")),
)
And the models / posts.py file :
import sqlalchemy
from .users import users_table
metadata = sqlalchemy.MetaData()
posts_table = sqlalchemy.Table(
"posts",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("user_id", sqlalchemy.ForeignKey(users_table.c.id)),
sqlalchemy.Column("created_at", sqlalchemy.DateTime()),
sqlalchemy.Column("title", sqlalchemy.String(100)),
sqlalchemy.Column("content", sqlalchemy.Text()),
)
To automate database migrations, install alembic :
$ pip install alembic
To initialize Alembic, run:
$ alembic init migrations
This command will create in the current directory alembic.ini file and a migrations directory containing:
- the versions directory , which will store the migration files
- env.py script that runs when alembic is called
- a script.py.mako file containing the template for new migrations.
We will indicate the url of our database, for this we will add the line in the alembic.ini file :
sqlalchemy.url = postgresql://%(DB_USER)s:%(DB_PASS)s@%(DB_HOST)s:5432/%(DB_NAME)s
The % (variable_name) s format allows us to set different values of variables depending on the environment, overriding them in the env.py file like this:
from os import environ
from alembic import context
from app.models import posts, users
# Alembic Config
# alembic.ini
config = context.config
section = config.config_ini_section
config.set_section_option(section, "DB_USER", environ.get("DB_USER"))
config.set_section_option(section, "DB_PASS", environ.get("DB_PASS"))
config.set_section_option(section, "DB_NAME", environ.get("DB_NAME"))
config.set_section_option(section, "DB_HOST", environ.get("DB_HOST"))
fileConfig(config.config_file_name)
target_metadata = [users.metadata, posts.metadata]
Here we grab the values of DB_USER, DB_PASS, DB_NAME and DB_HOST from environment variables. In addition, the env.py file specifies the metadata of our database in the target_metadata attribute , without which Alembic will not be able to determine what changes need to be made in the database.
Everything is ready and we can generate migrations and update the database:
$ alembic revision --autogenerate -m "Added required tables"
$ alembic upgrade head
We launch the application and connect the database
Let's create a main.py file :
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
def read_root():
return {"Hello": "World"}
And launch the application by running the command:
$ uvicorn main:app --reload
Let's make sure everything works as it should. Open http://127.0.0.1:8000/ in the browser and see
{"Hello": "World"}
To connect to the database, we will use the databases module , which allows us to execute queries asynchronously.
Let's configure the startup and shutdhown events of our service, at which the connection and disconnection from the database will occur. Let's edit the main.py file :
from os import environ
import databases
#
DB_USER = environ.get("DB_USER", "user")
DB_PASSWORD = environ.get("DB_PASSWORD", "password")
DB_HOST = environ.get("DB_HOST", "localhost")
DB_NAME = "async-blogs"
SQLALCHEMY_DATABASE_URL = (
f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}"
)
# database,
database = databases.Database(SQLALCHEMY_DATABASE_URL)
app = FastAPI()
@app.on_event("startup")
async def startup():
#
await database.connect()
@app.on_event("shutdown")
async def shutdown():
#
await database.disconnect()
@app.get("/")
async def read_root():
# ,
query = (
select(
[
posts_table.c.id,
posts_table.c.created_at,
posts_table.c.title,
posts_table.c.content,
posts_table.c.user_id,
users_table.c.name.label("user_name"),
]
)
.select_from(posts_table.join(users_table))
.order_by(desc(posts_table.c.created_at))
)
return await database.fetch_all(query)
We open http://127.0.0.1:8000/ and if we see an empty list [] in the response , then everything went well and we can move on.
Request and response validation
We will implement the possibility of user registration. To do this, we need to validate HTTP requests and responses. To solve this problem, we will use the pydantic library :
pip install pydantic
Create a schemas / users.py file and add a model that is responsible for validating the request body:
from pydantic import BaseModel, EmailStr
class UserCreate(BaseModel):
""" sign-up """
email: EmailStr
name: str
password: str
Note that field types are defined using type annotation. In addition to the built-in data types such as int and str , pydantic offers a large number of types that provide additional validation. For example, the EmailStr type checks that the received value is a valid email. To use the EmailStr type, you need to install the email-validator module :
pip install email-validator
The response body must contain its own specific fields, for example id and access_token , so let's add models responsible for generating the response to the schemas / users.py file :
from typing import Optional
from pydantic import UUID4, BaseModel, EmailStr, Field, validator
class UserCreate(BaseModel):
""" sign-up """
email: EmailStr
name: str
password: str
class UserBase(BaseModel):
""" """
id: int
email: EmailStr
name: str
class TokenBase(BaseModel):
token: UUID4 = Field(..., alias="access_token")
expires: datetime
token_type: Optional[str] = "bearer"
class Config:
allow_population_by_field_name = True
@validator("token")
def hexlify_token(cls, value):
""" UUID hex """
return value.hex
class User(UserBase):
""" """
token: TokenBase = {}
For each field in the model, you can write a custom validator . For example, hexlify_token converts the UUID value to a hex string. It's worth noting that you can use the Field class when you need to override the default behavior of a model field. For example, token: UUID4 = Field (..., alias = "access_token") sets the access_token alias for the token field . To indicate that the field is required, a special value - ... ( ellipsis ) is passed as the first parameter .
Let's add the utils / users.py file , in which we will create the methods needed to write a user to the database:
import hashlib
import random
import string
from datetime import datetime, timedelta
from sqlalchemy import and_
from app.models.database import database
from app.models.users import tokens_table, users_table
from app.schemas import users as user_schema
def get_random_string(length=12):
""" , """
return "".join(random.choice(string.ascii_letters) for _ in range(length))
def hash_password(password: str, salt: str = None):
""" """
if salt is None:
salt = get_random_string()
enc = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 100_000)
return enc.hex()
def validate_password(password: str, hashed_password: str):
""" , """
salt, hashed = hashed_password.split("$")
return hash_password(password, salt) == hashed
async def get_user_by_email(email: str):
""" """
query = users_table.select().where(users_table.c.email == email)
return await database.fetch_one(query)
async def get_user_by_token(token: str):
""" """
query = tokens_table.join(users_table).select().where(
and_(
tokens_table.c.token == token,
tokens_table.c.expires > datetime.now()
)
)
return await database.fetch_one(query)
async def create_user_token(user_id: int):
""" user_id """
query = (
tokens_table.insert()
.values(expires=datetime.now() + timedelta(weeks=2), user_id=user_id)
.returning(tokens_table.c.token, tokens_table.c.expires)
)
return await database.fetch_one(query)
async def create_user(user: user_schema.UserCreate):
""" """
salt = get_random_string()
hashed_password = hash_password(user.password, salt)
query = users_table.insert().values(
email=user.email, name=user.name, hashed_password=f"{salt}${hashed_password}"
)
user_id = await database.execute(query)
token = await create_user_token(user_id)
token_dict = {"token": token["token"], "expires": token["expires"]}
return {**user.dict(), "id": user_id, "is_active": True, "token": token_dict}
Let's create a file routers / users.py and add a sign-up route, indicating that it expects a CreateUser model in the request and returns a User model :
from fastapi import APIRouter
from app.schemas import users
from app.utils import users as users_utils
router = APIRouter()
@router.post("/sign-up", response_model=users.User)
async def create_user(user: users.UserCreate):
db_user = await users_utils.get_user_by_email(email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return await users_utils.create_user(user=user)
It remains only to connect the routes from the routers / users.py file . To do this, add the following lines to main.py :
from app.routers import users
app.include_router(users.router)
Authentication and access control
Now that we have users in our database, we are ready to set up application authentication. Let's add an endpoint that takes a username and password and returns a token. Update the routers / users.py file to add :
from fastapi import Depends
from fastapi.security import OAuth2PasswordRequestForm
@router.post("/auth", response_model=users.TokenBase)
async def auth(form_data: OAuth2PasswordRequestForm = Depends()):
user = await users_utils.get_user_by_email(email=form_data.username)
if not user:
raise HTTPException(status_code=400, detail="Incorrect email or password")
if not users_utils.validate_password(
password=form_data.password, hashed_password=user["hashed_password"]
):
raise HTTPException(status_code=400, detail="Incorrect email or password")
return await users_utils.create_user_token(user_id=user["id"])
At the same time, we do not need to describe the request model ourselves, Fastapi provides a special dependency class OAuth2PasswordRequestForm , which makes the route expect two fields username and password.
To restrict access to certain routes for unauthenticated users, we will write a dependency method. It will verify that the provided token belongs to the active user and return the user's details. This will allow us to use user information on all routes that require authentication. Let's create a utils / dependecies.py file :
from app.utils import users as users_utils
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth")
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = await users_utils.get_user_by_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
if not user["is_active"]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user"
)
return user
Please note that a dependency may in turn depend on another dependency. For example, OAuth2PasswordBearer is a dependency that makes it clear to FastAPI that the current route requires authentication.
To check that everything is working as expected, add the / users / me route , which returns the details of the current user. Add the lines to routers / users.py :
from app.utils.dependencies import get_current_user
@router.get("/users/me", response_model=users.UserBase)
async def read_users_me(current_user: users.User = Depends(get_current_user)):
return current_user
Now we have the / users / me route, which only authenticated users have access to.
Everything is ready to finally add the ability for users to create and edit publications:
utils / posts.py
from datetime import datetime
from app.models.database import database
from app.models.posts import posts_table
from app.models.users import users_table
from app.schemas import posts as post_schema
from sqlalchemy import desc, func, select
async def create_post(post: post_schema.PostModel, user):
query = (
posts_table.insert()
.values(
title=post.title,
content=post.content,
created_at=datetime.now(),
user_id=user["id"],
)
.returning(
posts_table.c.id,
posts_table.c.title,
posts_table.c.content,
posts_table.c.created_at,
)
)
post = await database.fetch_one(query)
# Convert to dict and add user_name key to it
post = dict(zip(post, post.values()))
post["user_name"] = user["name"]
return post
async def get_post(post_id: int):
query = (
select(
[
posts_table.c.id,
posts_table.c.created_at,
posts_table.c.title,
posts_table.c.content,
posts_table.c.user_id,
users_table.c.name.label("user_name"),
]
)
.select_from(posts_table.join(users_table))
.where(posts_table.c.id == post_id)
)
return await database.fetch_one(query)
async def get_posts(page: int):
max_per_page = 10
offset1 = (page - 1) * max_per_page
query = (
select(
[
posts_table.c.id,
posts_table.c.created_at,
posts_table.c.title,
posts_table.c.content,
posts_table.c.user_id,
users_table.c.name.label("user_name"),
]
)
.select_from(posts_table.join(users_table))
.order_by(desc(posts_table.c.created_at))
.limit(max_per_page)
.offset(offset1)
)
return await database.fetch_all(query)
async def get_posts_count():
query = select([func.count()]).select_from(posts_table)
return await database.fetch_val(query)
async def update_post(post_id: int, post: post_schema.PostModel):
query = (
posts_table.update()
.where(posts_table.c.id == post_id)
.values(title=post.title, content=post.content)
)
return await database.execute(query)
routers / posts.py
from app.schemas.posts import PostDetailsModel, PostModel
from app.schemas.users import User
from app.utils import posts as post_utils
from app.utils.dependencies import get_current_user
from fastapi import APIRouter, Depends, HTTPException, status
router = APIRouter()
@router.post("/posts", response_model=PostDetailsModel, status_code=201)
async def create_post(post: PostModel, current_user: User = Depends(get_current_user)):
post = await post_utils.create_post(post, current_user)
return post
@router.get("/posts")
async def get_posts(page: int = 1):
total_cout = await post_utils.get_posts_count()
posts = await post_utils.get_posts(page)
return {"total_count": total_cout, "results": posts}
@router.get("/posts/{post_id}", response_model=PostDetailsModel)
async def get_post(post_id: int):
return await post_utils.get_post(post_id)
@router.put("/posts/{post_id}", response_model=PostDetailsModel)
async def update_post(
post_id: int, post_data: PostModel, current_user=Depends(get_current_user)
):
post = await post_utils.get_post(post_id)
if post["user_id"] != current_user["id"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to modify this post",
)
await post_utils.update_post(post_id=post_id, post=post_data)
return await post_utils.get_post(post_id)
Let's connect new routes by adding to main.py
from app.routers import posts
app.include_router(posts.router)
Testing
We will write tests in pytest :
$ pip install pytest
For testing endpoints, FastAPI provides a special tool TestClient .
Let's write an endpoint test that doesn't require a database connection:
from app.main import app
from fastapi.testclient import TestClient
client = TestClient(app)
def test_health_check():
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"Hello": "World"}
As you can see, everything is quite simple. You must initialize TestClient and use it to test HTTP requests.
To test the rest of the endpoints, you need to create a test database. Let's edit the main.py file , adding the test base configuration to it:
from os import environ
import databases
DB_USER = environ.get("DB_USER", "user")
DB_PASSWORD = environ.get("DB_PASSWORD", "password")
DB_HOST = environ.get("DB_HOST", "localhost")
TESTING = environ.get("TESTING")
if TESTING:
#
DB_NAME = "async-blogs-temp-for-test"
TEST_SQLALCHEMY_DATABASE_URL = (
f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}"
)
database = databases.Database(TEST_SQLALCHEMY_DATABASE_URL)
else:
DB_NAME = "async-blogs"
SQLALCHEMY_DATABASE_URL = (
f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}"
)
database = databases.Database(SQLALCHEMY_DATABASE_URL)
We are still using the "async-blogs" database for our application. But if the value of the environment variable TESTING is set, then the database "async-blogs-temp-for-test" is used .
To create the "async-blogs-temp-for-test" database automatically when running tests and delete after running them, create a fixture in the tests / conftest.py file :
import os
import pytest
# `os.environ`,
os.environ['TESTING'] = 'True'
from alembic import command
from alembic.config import Config
from app.models import database
from sqlalchemy_utils import create_database, drop_database
@pytest.fixture(scope="module")
def temp_db():
create_database(database.TEST_SQLALCHEMY_DATABASE_URL) #
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
alembic_cfg = Config(os.path.join(base_dir, "alembic.ini")) # alembic
command.upgrade(alembic_cfg, "head") #
try:
yield database.TEST_SQLALCHEMY_DATABASE_URL
finally:
drop_database(database.TEST_SQLALCHEMY_DATABASE_URL) #
To create and delete the database, we will use the sqlalchemy_utils library .
By using the temp_db fixture in tests, we can test all the endpoints of our application:
def test_sign_up(temp_db):
request_data = {
"email": "vader@deathstar.com",
"name": "Darth Vader",
"password": "rainbow"
}
with TestClient(app) as client:
response = client.post("/sign-up", json=request_data)
assert response.status_code == 200
assert response.json()["id"] == 1
assert response.json()["email"] == "vader@deathstar.com"
assert response.json()["name"] == "Darth"
assert response.json()["token"]["expires"] is not None
assert response.json()["token"]["access_token"] is not None
tests / test_posts.py
import asyncio
from app.main import app
from app.schemas.users import UserCreate
from app.utils.users import create_user, create_user_token
from fastapi.testclient import TestClient
def test_create_post(temp_db):
user = UserCreate(
email="vader@deathstar.com",
name="Darth",
password="rainbow"
)
request_data = {
"title": "42",
"content": "Don't panic!"
}
with TestClient(app) as client:
# Create user and use his token to add new post
loop = asyncio.get_event_loop()
user_db = loop.run_until_complete(create_user(user))
response = client.post(
"/posts",
json=request_data,
headers={"Authorization": f"Bearer {user_db['token']['token']}"}
)
assert response.status_code == 201
assert response.json()["id"] == 1
assert response.json()["title"] == "42"
assert response.json()["content"] == "Don't panic!"
def test_create_post_forbidden_without_token(temp_db):
request_data = {
"title": "42",
"content": "Don't panic!"
}
with TestClient(app) as client:
response = client.post("/posts", json=request_data)
assert response.status_code == 401
def test_posts_list(temp_db):
with TestClient(app) as client:
response = client.get("/posts")
assert response.status_code == 200
assert response.json()["total_count"] == 1
assert response.json()["results"][0]["id"] == 1
assert response.json()["results"][0]["title"] == "42"
assert response.json()["results"][0]["content"] == "Don't panic!"
def test_post_detail(temp_db):
post_id = 1
with TestClient(app) as client:
response = client.get(f"/posts/{post_id}")
assert response.status_code == 200
assert response.json()["id"] == 1
assert response.json()["title"] == "42"
assert response.json()["content"] == "Don't panic!"
def test_update_post(temp_db):
post_id = 1
request_data = {
"title": "42",
"content": "Life? Don't talk to me about life."
}
with TestClient(app) as client:
# Create user token to add new post
loop = asyncio.get_event_loop()
token = loop.run_until_complete(create_user_token(user_id=1))
response = client.put(
f"/posts/{post_id}",
json=request_data,
headers={"Authorization": f"Bearer {token['token']}"}
)
assert response.status_code == 200
assert response.json()["id"] == 1
assert response.json()["title"] == "42"
assert response.json()["content"] == "Life? Don't talk to me about life."
def test_update_post_forbidden_without_token(temp_db):
post_id = 1
request_data = {
"title": "42",
"content": "Life? Don't talk to me about life."
}
with TestClient(app) as client:
response = client.put(f"/posts/{post_id}", json=request_data)
assert response.status_code == 401
tests / test_users.py
import asyncio
import pytest
from app.main import app
from app.schemas.users import UserCreate
from app.utils.users import create_user, create_user_token
from fastapi.testclient import TestClient
def test_sign_up(temp_db):
request_data = {
"email": "vader@deathstar.com",
"name": "Darth",
"password": "rainbow"
}
with TestClient(app) as client:
response = client.post("/sign-up", json=request_data)
assert response.status_code == 200
assert response.json()["id"] == 1
assert response.json()["email"] == "vader@deathstar.com"
assert response.json()["name"] == "Darth"
assert response.json()["token"]["expires"] is not None
assert response.json()["token"]["token"] is not None
def test_login(temp_db):
request_data = {"username": "vader@deathstar.com", "password": "rainbow"}
with TestClient(app) as client:
response = client.post("/auth", data=request_data)
assert response.status_code == 200
assert response.json()["token_type"] == "bearer"
assert response.json()["expires"] is not None
assert response.json()["access_token"] is not None
def test_login_with_invalid_password(temp_db):
request_data = {"username": "vader@deathstar.com", "password": "unicorn"}
with TestClient(app) as client:
response = client.post("/auth", data=request_data)
assert response.status_code == 400
assert response.json()["detail"] == "Incorrect email or password"
def test_user_detail(temp_db):
with TestClient(app) as client:
# Create user token to see user info
loop = asyncio.get_event_loop()
token = loop.run_until_complete(create_user_token(user_id=1))
response = client.get(
"/users/me",
headers={"Authorization": f"Bearer {token['token']}"}
)
assert response.status_code == 200
assert response.json()["id"] == 1
assert response.json()["email"] == "vader@deathstar.com"
assert response.json()["name"] == "Darth"
def test_user_detail_forbidden_without_token(temp_db):
with TestClient(app) as client:
response = client.get("/users/me")
assert response.status_code == 401
@pytest.mark.freeze_time("2015-10-21")
def test_user_detail_forbidden_with_expired_token(temp_db, freezer):
user = UserCreate(
email="sidious@deathstar.com",
name="Palpatine",
password="unicorn"
)
with TestClient(app) as client:
# Create user and use expired token
loop = asyncio.get_event_loop()
user_db = loop.run_until_complete(create_user(user))
freezer.move_to("'2015-11-10'")
response = client.get(
"/users/me",
headers={"Authorization": f"Bearer {user_db['token']['token']}"}
)
assert response.status_code == 401
PS Sources
That's all, the source repository from the post can be viewed on GitHub .