인증 정보를 JSON형식으로 저장하고, 서명을 통해 위변조 방지
주로 클라이언트와 서버간의 인증을 위해 사용
HEADER/PAYLOAD/SIGNATURE
장점
단점
<jWT_TOKEN>
형태로 요청을 보낼 때from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from passlib.context import CryptContext
class Token(BaseModel):
access_token : str
token_type : str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Passlib 라이브러리를 사용해 비밀번호를 안전하게 해싱하고 검증하기위한 설정
# bcrypt 알고리즘을 사용하여 비밀번호 암호화
# deprecated 사용해 이전 해싱 방식이 자동으로 업데이트 되도록 설정
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
# OAuth2 인증 방식 중 “Password Flow” 방식을 사용하여 액세스 토큰을 발급하고 인증을 수행
# OAuth2PasswordBearer(tokenUrl="login") → 사용자가 로그인할 때 JWT 토큰을 발급하는 URL을 지정
# FastAPI의 Depends(oauth2_scheme)를 통해 자동으로 요청 헤더에서 토큰을 추출 가능
def hash_password(password: str):
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str):
return pwd_context.verify(plain_password, hashed_password)
SECRET_KEY = "secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict): # JWT 엑세스 토큰을 생성하는 함수
to_encode = data.copy() # data 딕셔너리를 복사하여 변경해도 원본 데이터가 영향을 받지 않도록 보호
expire = datetime.now() + timedelta(minutes = ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
# to_encode 딕셔너리에 "exp"(만료 시간) 키 추가
# JWT를 생성할 때 만료 시간이 포함되어야 자동으로 검증 가능
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) # JWT를 생성하는 핵심 코드
return encoded_jwt
def get_current_user(token: str = Depends(oauth2_scheme)):
"""
1. 전달된 JWT 토큰을 검증하여 사용자 정보를 가져오는 함수입니다.
2. JWT 토큰이 유효한지 확인하고, 만료 여부를 체크해야 합니다.
3. 토큰에서 사용자 정보를 추출한 후, 데이터베이스(또는 가짜 DB)에서 해당 사용자가 존재하는지 확인해야 합니다.
4. 사용자가 없거나 토큰이 유효하지 않다면, 401 Unauthorized 오류를 반환해야 합니다.
"""
try:
# 1. JWT 토큰 디코딩
# SECRET_KEY와 알고리즘을 사용하여 토큰을 디코딩합니다.
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# 2. 사용자 이름 추출
# 토큰 페이로드에서 "sub" (subject), "exp" 필드를 가져옵니다.
username = payload.get("sub")
exp = payload.get("exp")
# 3. 토큰이 유효하지 않거나 sub 필드가 없으면 오류 반환
if username is None:
raise HTTPException(status_code=401, detail="Invalid token")
# 4. 사용자 검증
# 데이터베이스(또는 가짜 DB)에서 사용자가 존재하는지 확인합니다.
if username not in fake_users_db:
raise HTTPException(status_code=401, detail="User not found")
# 5. 토큰 만료 시간 검증
# exp시간을 확인합니다.
if exp is None or datetime.utcnow() > datetime.utcfromtimestamp(exp):
raise HTTPException(status_code=401, detail="Token has expired")
# 6. 검증된 사용자 정보 반환
return fake_users_db[username]
except JWTError:
# 7. 토큰 검증 실패 시 오류 반환
raise HTTPException(status_code=401, detail="Invalid token")
# Login API
@app.post("/login")
async def login_user(user: UserLoginRegister, db: AsyncSession= Depends(get_db)):
result = await db.execute(select(User).where(User.username == user.username))
existing_user = result.scalars().first()
if not existing_user or not verify_password(user.password, existing_user.password):
raise HTTPException(
status_code=404,
detail="User not found"
)
access_token = create_access_token(data={"sub": existing_user.username})
return {"access_token": access_token, "token_type": "bearer"}
# Profile API
@app.get("/profile")
def get_profile(current_user: dict = Depends(get_current_user)):
return {"username": current_user["username"]}
from datetime import datetime, timedelta, timezone
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from pydantic import BaseModel
from passlib.context import CryptContext
from sqlalchemy import Column, String, Integer, select
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
DATABASE_URL = "sqlite+aiosqlite:///./test.db"
async_engine = create_async_engine(
DATABASE_URL,
echo=True, # 쿼리 로그 출력
)
AsyncSessionLocal = sessionmaker(
bind=async_engine, # 어떤 DB에 연결할지 설정
class_=AsyncSession,
expire_on_commit=False # 세션이 커밋될 때 객체가 사라지지 않도록 설정
# 📌 기본적으로 SQLAlchemy는 세션이 commit()되면 객체를 메모리에서 제거
# 📌 expire_on_commit=False를 설정하면 세션이 종료되더라도 객체를 계속 유지 가능
)
Base = declarative_base()
# Base: 모든 데이터베이스의 부모 역할
# SQLAlchemy ORM(Object Relational Mapper) 모델을 정의하는 기본 클래스를 생성
# 📌 SQLAlchemy에서 ORM을 사용할 때, 모든 모델 클래스는 Base를 상속받아야 함.
# 📌 declarative_base()는 테이블을 정의할 때 필요한 “메타데이터”를 포함하는 베이스 클래스를 제공.
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String)
password = Column(String)
app = FastAPI()
# DB 초기화
async def init_db():
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# 데이터베이스가 아직 존재하지 않는다면 테이블을 생성
# 이미 존재하는 경우, 기존 테이블을 변경하지 않고 그대로 유지
# 단순히 테이블을 생성하는 역할이며, 기존 데이터에는 영향을 주지 않음
async def get_db():
async with AsyncSessionLocal() as session: # async with 블록을 사용하여 세션이 자동으로 닫히도록 보장
yield session
# 비동기 SQLAlchemy 세션을 생성하고 반환하는 역할
# FastAPI의 Depends()를 통해 엔드포인트에서 데이터베이스 세션을 사용 가능
# 📌 **세션(Session)**은 데이터베이스와의 연결을 관리하는 객체로,
# 쿼리 실행, 데이터 추가, 수정, 삭제 등의 작업을 할 때 사용됨.
class UserRegister(BaseModel):
username : str
password : str
class UserLogin(BaseModel):
username : str
password : str
class Token(BaseModel):
access_token : str
token_type : str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Passlib 라이브러리를 사용해 비밀번호를 안전하게 해싱하고 검증하기위한 설정
# bcrypt 알고리즘을 사용하여 비밀번호 암호화
# deprecated 사용해 이전 해싱 방식이 자동으로 업데이트 되도록 설정
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
# OAuth2 인증 방식 중 “Password Flow” 방식을 사용하여 액세스 토큰을 발급하고 인증을 수행
# OAuth2PasswordBearer(tokenUrl="login") → 사용자가 로그인할 때 JWT 토큰을 발급하는 URL을 지정
# FastAPI의 Depends(oauth2_scheme)를 통해 자동으로 요청 헤더에서 토큰을 추출 가능
def hash_password(password: str):
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str):
return pwd_context.verify(plain_password, hashed_password)
SECRET_KEY = "secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict): # JWT 엑세스 토큰을 생성하는 함수
to_encode = data.copy() # data 딕셔너리를 복사하여 변경해도 원본 데이터가 영향을 받지 않도록 보호
expire = datetime.now() + timedelta(minutes = ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
# to_encode 딕셔너리에 "exp"(만료 시간) 키 추가
# JWT를 생성할 때 만료 시간이 포함되어야 자동으로 검증 가능
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) # JWT를 생성하는 핵심 코드
return encoded_jwt
@app.post("/register")
async def register(user: UserRegister, db : AsyncSession = Depends(get_db)):
new_user = User(username=user.username, password= hash_password(user.password))
db.add(new_user)
await db.commit()
await db.refresh(new_user)
return new_user
@app.post("/login")
async def login_user(user: UserLoginRegister, db: AsyncSession= Depends(get_db)):
result = await db.execute(select(User).where(User.username == user.username))
existing_user = result.scalars().first()
if not existing_user or not verify_password(user.password, existing_user.password):
raise HTTPException(
status_code=404,
detail="User not found"
)
access_token = create_access_token(data={"sub": existing_user.username})
return {"access_token": access_token, "token_type": "bearer"}
async def get_current_user(token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db)):
try:
# 1. JWT 토큰 디코딩
# SECRET_KEY와 알고리즘을 사용하여 토큰을 디코딩합니다.
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# 2. 사용자 이름 추출
# 토큰 페이로드에서 "sub" (subject), "exp" 필드를 가져옵니다.
username = payload.get("sub")
exp = payload.get("exp")
result = await db.execute(select(User).filter(User.username == username))
user = result.scalars().first()
# 3. 토큰이 유효하지 않거나 sub 필드가 없으면 오류 반환
if username is None:
raise HTTPException(status_code=401, detail="Invalid token")
# 4. 사용자 검증
# 데이터베이스(또는 가짜 DB)에서 사용자가 존재하는지 확인합니다.
if not user:
raise HTTPException(status_code=401, detail="User not found")
# 5. 토큰 만료 시간 검증
# exp시간을 확인합니다.
if exp is None or datetime.utcnow() > datetime.utcfromtimestamp(exp):
raise HTTPException(status_code=401, detail="Token has expired")
# 6. 검증된 사용자 정보 반환
return user
except JWTError:
# 7. 토큰 검증 실패 시 오류 반환
raise HTTPException(status_code=401, detail="Invalid token")
@app.get("/profile")
async def get_profile(current_user: dict = Depends(get_current_user)):
return current_user
# 현재 로그인한 사용자의 토큰 만료 시간을 가져오는 GET /token-expiry API
@app.get("/token_expiry")
async def token_expiry(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
exp = payload.get("exp")
if exp is None:
raise HTTPException(status_code=401, detail="Token missing expiration time")
if datetime.utcnow() > datetime.utcfromtimestamp(exp):
raise HTTPException(status_code=404, detail="Expired token")
return {"expire_time": datetime.utcfromtimestamp(exp)}
except ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Expired token")
except JWTError:
raise HTTPException(status_code=404, detail="Invalid token")
# 관리자만 접근 가능한 GET /admin API
@app.get("/admin/")
async def admin(current_user: dict = Depends(get_current_user)):
if current_user.role == "admin":
return {"msg":"welcome admin"}
raise HTTPException(status_code=403, detail="Forbidden")
# 사용자의 프로필을 수정하는 POST /profile API
@app.put("/profile/")
async def put_profile(
update_user: UserLogin,
current_user: dict = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
result = await db.execute(select(User).filter(User.username == current_user.username))
existing_user = result.scalars().first()
if not existing_user:
raise HTTPException(status_code=404, detail="Its not your profile.")
existing_user.username = update_user.username
await db.commit()
await db.refresh(existing_user)
return {"message": "Profile updated successfully", "username": existing_user.username}