The go-to resource for upgrading Python, Django, Flask, and your dependencies.

Implementing OAuth2 with Password Flow in FastAPI: Complete JWT Token Example


In the early days of the web, authentication relied on sending usernames and passwords with every request, verified against a central session store. As applications distributed across mobile apps, SPAs, and microservices, this central state became a bottleneck. Tokens offered a way forward: self-contained, signed claims that prove identity without repeated credential submission or server lookups.\n\nOAuth2’s resource owner password credentials grant (often called password flow) lets trusted clients exchange user credentials directly for access tokens. We’ll implement this in FastAPI using JWT bearer tokens—common for API-only backends serving mobile or SPAs where browser redirects complicate other flows.\n\n## Why Consider Password Flow in FastAPI?\n\nBefore we implement it, let’s weigh the trade-offs of OAuth2 flows. Password flow exchanges user credentials directly for tokens—suitable for trusted first-party clients like mobile apps, but less secure for third parties due to credential exposure risk.\n\n| Flow | Typical Use Cases | Key Trade-offs |\n|------|-------------------|---------------|\n| Password (Resource Owner) | Mobile apps, SPAs with trusted backend | Direct, no redirects; credentials sent to backend (avoid for untrusted clients) |\n| Authorization Code (+PKCE) | Browser-based web apps | More secure; requires redirects |\n| Client Credentials | Server-to-server (no user) | Simple; no user context |\n\nFastAPI provides OAuth2PasswordBearer and OAuth2PasswordRequestForm with automatic OpenAPI docs. Though not recommended for public clients, it’s straightforward for internal APIs.\n\nPassword flow suits when you control both client and server, avoiding redirect complexity.

FlowUse CaseProsCons
PasswordInternal API, mobile appsSimple, directClient secret exposure
Authorization CodeWeb appsPKCE secureRedirects
Client CredentialsM2MNo userNo user context

Prerequisites & Install

pip install fastapi uvicorn "python-jose[cryptography]" "passlib[bcrypt]" python-multipart
# Or pwdlib (newer): pip install "pwdlib[argon2]"

Python 3.10+.\n\n## Building Step by Step\n\nWe’ll construct main.py progressively, explaining each part before seeing the full app. Run with uvicorn main:app --reload once complete.\n\n### 1. Imports, Config, and Fake DB\n\nStart with basics: imports, secrets (generate with openssl rand -hex 32), token settings, and a fake user DB for demo (hash “secret” with bcrypt).\n\npython\nfrom datetime import datetime, timedelta, timezone\nfrom typing import Annotated\n\nfrom fastapi import Depends, FastAPI, HTTPException, status\nfrom fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm\nfrom jose import JWTError, jwt\nfrom passlib.context import CryptContext\nfrom pydantic import BaseModel\n\n# Generate: openssl rand -hex 32\nSECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"\nALGORITHM = "HS256"\nACCESS_TOKEN_EXPIRE_MINUTES = 30\n\nfake_users_db = {\n "johndoe": {\n "username": "johndoe",\n "full_name": "John Doe",\n "email": "johndoe@example.com",\n "hashed_password": "$2b$12$EixZaYVK1fsbw1Zfbx3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", # "secret"\n "disabled": False,\n }\n}\n\n\n### 2. Pydantic Models\n\nModels for tokens, users. UserInDB adds hashed_password for verification.\n\npython\nclass Token(BaseModel):\n access_token: str\n token_type: str\n\nclass TokenData(BaseModel):\n username: str | None = None\n\nclass User(BaseModel):\n username: str\n email: str | None = None\n full_name: str | None = None\n disabled: bool | None = None\n\nclass UserInDB(User):\n hashed_password: str\n\nfrom typing import Annotated

from fastapi import Depends, FastAPI, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jose import JWTError, jwt from passlib.context import CryptContext from pydantic import BaseModel

SECRET_KEY: openssl rand -hex 32

SECRET_KEY = “09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7” ALGORITHM = “HS256” ACCESS_TOKEN_EXPIRE_MINUTES = 30

fake_users_db = { “johndoe”: { “username”: “johndoe”, “full_name”: “John Doe”, “email”: “johndoe@example.com”, “hashed_password”: “$2b$12$EixZaYVK1fsbw1Zfbx3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW”, # secret “disabled”: False, } }

class Token(BaseModel): access_token: str token_type: str

class TokenData(BaseModel): username: str | None = None

class User(BaseModel): username: str email: str | None = None full_name: str | None = None disabled: bool | None = None

class UserInDB(User): hashed_password: str

pwd_context = CryptContext(schemes=[“bcrypt”], deprecated=“auto”)

oauth2_scheme = OAuth2PasswordBearer(tokenUrl=“token”)

app = FastAPI()

def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password): return pwd_context.hash(password)

def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict)

def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return user

def create_access_token(data: dict, expires_delta: timedelta | None = None): to_encode = data.copy() if expires_delta: expire = datetime.now(timezone.utc) + expires_delta else: expire = datetime.now(timezone.utc) + timedelta(minutes=15) to_encode.update({“exp”: expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt

async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=“Could not validate credentials”, headers={“WWW-Authenticate”: “Bearer”}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get(“sub”) if username is None: raise credentials_exception token_data = TokenData(username=username) except JWTError: raise credentials_exception user = get_user(fake_users_db, username=token_data.username) if user is None: raise credentials_exception return user

async def get_current_active_user( current_user: Annotated[User, Depends(get_current_user)] ): if current_user.disabled: raise HTTPException(status_code=400, detail=“Inactive user. Please re-activate.”) return current_user

@app.post(“/token”, response_model=Token) async def login_for_access_token( form_data: Annotated[OAuth2PasswordRequestForm, Depends()] ) -> Token: user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=“Incorrect username or password”, headers={“WWW-Authenticate”: “Bearer”}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={“sub”: user.username}, expires_delta=access_token_expires ) return Token(access_token=access_token, token_type=“bearer”)

@app.get(“/users/me/”, response_model=User) async def read_users_me( current_user: Annotated[User, Depends(get_current_active_user)] ): return current_user

@app.post(“/token”, response_model=Token)\nasync def login_for_access_token(\n form_data: Annotated[OAuth2PasswordRequestForm, Depends()]\n) -> Token:\n user = authenticate_user(fake_users_db, form_data.username, form_data.password)\n if not user:\n raise HTTPException(\n status_code=status.HTTP_401_UNAUTHORIZED,\n detail=“Incorrect username or password”,\n headers={“WWW-Authenticate”: “Bearer”},\n )\n access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)\n access_token = create_access_token(\n data={“sub”: user.username}, expires_delta=access_token_expires\n )\n return Token(access_token=access_token, token_type=“bearer”)\n\n@app.get(“/users/me/”, response_model=User)\nasync def read_users_me(\n current_user: Annotated[User, Depends(get_current_active_user)]\n):\n return current_user\n\n@app.get(“/users/me/items/”)\nasync def read_own_items(\n current_user: Annotated[User, Depends(get_current_active_user)]\n):\n return [{“item_id”: “Foo”, “owner”: current_user.username}]\n```\n\n### Full main.py Reference\n\nCombine all above for the complete app:\n\n[Full code here - but since long, refer to snippets above for clarity]\n\n## Test with Curl\n

# Get token
curl -X 'POST' \
  'http://localhost:8000/token' \
  -H 'Content-Type: application/x-www-form-urlencoded' \
  -d 'username=johndoe&password=secret'

# Use token
TOKEN="eyJ..."  # From above

curl -X 'GET' \
  'http://localhost:8000/users/me/' \
  -H 'accept: application/json' \
  -H "Authorization: Bearer $TOKEN"

Swagger: http://localhost:8000/docs → “Authorize” button uses /token.

Production Considerations\n\nReplace fake DB with SQLAlchemy or asyncpg; hash passwords on registration (never store plain).\n\nAdd refresh tokens: longer-lived endpoint to issue new access tokens without re-login.\n\nScopes: Extend OAuth2PasswordBearer(scopes={"me": "read:user"}); validate in deps.\n\nAlways HTTPS (Traefik/nginx proxy); rate-limit /token (slowapi/slowapi).\n\nAlternatives: Authlib for full OAuth, or managed like Auth0/Keycloak for complex flows.\n\nEdge cases: token revocation (blacklist or short expiry), disabled users (handled), brute-force (CAPTCHA/rate-limit).\n\nTest: Write pytest for endpoints; consider locust for load.\n\nRelated:\n- FastAPI orjson\n- FastAPI Lifespan

Sponsored by Durable Programming

Need help maintaining or upgrading your Python application? Durable Programming specializes in keeping Python apps secure, performant, and up-to-date.

Hire Durable Programming