FastAPI 인증(JWT)

Haks.·2025년 2월 13일
0

How to use

목록 보기
27/32

인증의 중요성 및 JWT

  • 인증이 중요한 이유?
    • 보안 강화: 사용자 데이터 및 시스템 자원을 보호
    • 권한 관리: 인증을 통해 사용자의 권한을 검증
    • 사용자 경험: 안전한 환경에서 맞춤형 서비스 제공

인증 vs 인가

  • 인증 : 너가 누구냐
  • 인가 : 인증이 됬으면 너가 뭘 할수있냐

JWT(Json Web Token)

  • 인증 정보를 JSON형식으로 저장하고, 서명을 통해 위변조 방지

  • 주로 클라이언트와 서버간의 인증을 위해 사용

  • HEADER/PAYLOAD/SIGNATURE

  • 장점

    • 무상태 인증: 서버가 세션을 유지할 필요 없음
    • 확장성: 다양한 서비스에서 쉽게 통합 가능
  • 단점

    • 크기가 크므로 반복적으로 보내는 경우 성능 이슈 발생
    • 만료되지 않은 토큰이 탈취되면 보안 문제가 발생

인증 디펜던시 (Depends 개념)

  • 인증 디펜던시란?
    • FastAPI의 Depends를 사용하면 특정 기능 (예: DB 연결, 인증 등)을 경로
      함수에서 반복적으로 재사용할 수 있음
  • 인증 디펜던시가 필요한 이유
    • 인증 및 사용자 검증 로직을 코드 중복 없이 모듈화할 수 있음
    • 인증이 필요한 API 엔드포인트에서 간편하게 적용 가능
    • JWT 토큰을 자동으로 검증하고 사용자 정보를 API 경로 함수에 전달할 수 있음

인증 디펜던시의 동작 방식

  • 기본적인 인증 흐름
    1. 클라이언트가 로그인하여 JWT 토큰을 발급받음
    2. 클라이언트는 모든 요청의 Authorization 헤더에 JWT 토큰을 포함하여 보냄
    3. FastAPI는 Depends(get_current_user)를 통해 인증 디펜던시를 실행
    4. JWT 토큰 검증 과정
      a. 토큰이 유효한지 확인: jwt.decode
      b. 만료 여부 확인
      c. 사용자가 존재하는지 데이터베이스에서 확인
    5. 인증 성공 시, 해당 사용자 정보를 경로 함수에 전달
  • JWT 설정: Oauth2PasswordBearer(tokenUrl=“login”)
    • 클라이언트가 Authorization: Bearer <jWT_TOKEN> 형태로 요청을 보낼 때
      추출하는 역할
    • tokenUrl=“login”은 로그인 API 엔드포인트를 지정하는 용도

인증 디펜던시 구현 방법

  • 인증이 필요한 엔드포인트 적용
    • ex) 프로필 조회 API
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from passlib.context import CryptContext

class Token(BaseModel):
    access_token : str
    token_type : str

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Passlib 라이브러리를 사용해 비밀번호를 안전하게 해싱하고 검증하기위한 설정
# bcrypt 알고리즘을 사용하여 비밀번호 암호화
# deprecated 사용해 이전 해싱 방식이 자동으로 업데이트 되도록 설정

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
# OAuth2 인증 방식 중 “Password Flow” 방식을 사용하여 액세스 토큰을 발급하고 인증을 수행
# OAuth2PasswordBearer(tokenUrl="login") → 사용자가 로그인할 때 JWT 토큰을 발급하는 URL을 지정
# FastAPI의 Depends(oauth2_scheme)를 통해 자동으로 요청 헤더에서 토큰을 추출 가능

def hash_password(password: str):
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str):
    return pwd_context.verify(plain_password, hashed_password)

SECRET_KEY = "secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

def create_access_token(data: dict): # JWT 엑세스 토큰을 생성하는 함수 
    to_encode = data.copy() # data 딕셔너리를 복사하여 변경해도 원본 데이터가 영향을 받지 않도록 보호
    expire = datetime.now() + timedelta(minutes =  ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire}) 
    # to_encode 딕셔너리에 "exp"(만료 시간) 키 추가
    # JWT를 생성할 때 만료 시간이 포함되어야 자동으로 검증 가능
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) # JWT를 생성하는 핵심 코드
    return encoded_jwt

def get_current_user(token: str = Depends(oauth2_scheme)):
    """
    1. 전달된 JWT 토큰을 검증하여 사용자 정보를 가져오는 함수입니다.
    2. JWT 토큰이 유효한지 확인하고, 만료 여부를 체크해야 합니다.
    3. 토큰에서 사용자 정보를 추출한 후, 데이터베이스(또는 가짜 DB)에서 해당 사용자가 존재하는지 확인해야 합니다.
    4. 사용자가 없거나 토큰이 유효하지 않다면, 401 Unauthorized 오류를 반환해야 합니다.
    """
    try:
        # 1. JWT 토큰 디코딩
        # SECRET_KEY와 알고리즘을 사용하여 토큰을 디코딩합니다.
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])

        # 2. 사용자 이름 추출
        # 토큰 페이로드에서 "sub" (subject), "exp" 필드를 가져옵니다.
        username = payload.get("sub")
        exp = payload.get("exp")

        # 3. 토큰이 유효하지 않거나 sub 필드가 없으면 오류 반환
        if username is None:
            raise HTTPException(status_code=401, detail="Invalid token")

        # 4. 사용자 검증
        # 데이터베이스(또는 가짜 DB)에서 사용자가 존재하는지 확인합니다.
        if username not in fake_users_db:
            raise HTTPException(status_code=401, detail="User not found")
        
        # 5. 토큰 만료 시간 검증
        # exp시간을 확인합니다.
        if exp is None or datetime.utcnow() > datetime.utcfromtimestamp(exp):
            raise HTTPException(status_code=401, detail="Token has expired")

        # 6. 검증된 사용자 정보 반환
        return fake_users_db[username]

    except JWTError:
        # 7. 토큰 검증 실패 시 오류 반환
        raise HTTPException(status_code=401, detail="Invalid token")

# Login API
@app.post("/login")
async def login_user(user: UserLoginRegister, db: AsyncSession= Depends(get_db)):
    result = await db.execute(select(User).where(User.username == user.username))
    existing_user = result.scalars().first()
    if not existing_user or not verify_password(user.password, existing_user.password):
        raise HTTPException(
            status_code=404,
            detail="User not found"
        )
    access_token = create_access_token(data={"sub": existing_user.username})
    return {"access_token": access_token, "token_type": "bearer"}

# Profile API
@app.get("/profile")
def get_profile(current_user: dict = Depends(get_current_user)):
    return {"username": current_user["username"]}

인증 디펜던시 장점

  • 코드 재사용성 증가
    • Depends(get_current_user)를 활용하면 API마다 중복된 인증 코드 작성이
      불필요
    • 여러 API에서 일관된 방식으로 인증 적용 가능
  • 보안성 강화
    • JWT 토큰을 중앙에서 검증: 일괄적인 보안 적용 가능
    • 모든 요청마다 get_current_user를 호출하여 인증을 검증
  • 유지보수 용이
    • 인증 방식이 변경되더라도, get_current_user만 수정하면 전체 API에 반영됨
    • 관리가 쉽고, 확장성이 높음

🧑‍💻 DB에 연결한 모든 코드

from datetime import datetime, timedelta, timezone

from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError

from pydantic import BaseModel
from passlib.context import CryptContext
from sqlalchemy import Column, String, Integer, select
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base



DATABASE_URL = "sqlite+aiosqlite:///./test.db"

async_engine = create_async_engine(
    DATABASE_URL,
    echo=True, # 쿼리 로그 출력
)

AsyncSessionLocal = sessionmaker(
    bind=async_engine, # 어떤 DB에 연결할지 설정
    class_=AsyncSession, 
    expire_on_commit=False # 세션이 커밋될 때 객체가 사라지지 않도록 설정
    # 📌 기본적으로 SQLAlchemy는 세션이 commit()되면 객체를 메모리에서 제거
    # 📌 expire_on_commit=False를 설정하면 세션이 종료되더라도 객체를 계속 유지 가능
)

Base = declarative_base()
# Base: 모든 데이터베이스의 부모 역할
# SQLAlchemy ORM(Object Relational Mapper) 모델을 정의하는 기본 클래스를 생성
# 📌 SQLAlchemy에서 ORM을 사용할 때, 모든 모델 클래스는 Base를 상속받아야 함.
# 📌 declarative_base()는 테이블을 정의할 때 필요한 “메타데이터”를 포함하는 베이스 클래스를 제공.


class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String)
    password = Column(String)

app = FastAPI()

# DB 초기화
async def init_db():
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
		# 데이터베이스가 아직 존재하지 않는다면 테이블을 생성
		# 이미 존재하는 경우, 기존 테이블을 변경하지 않고 그대로 유지
		# 단순히 테이블을 생성하는 역할이며, 기존 데이터에는 영향을 주지 않음

async def get_db():
    async with AsyncSessionLocal() as session: # async with 블록을 사용하여 세션이 자동으로 닫히도록 보장
        yield session
        # 비동기 SQLAlchemy 세션을 생성하고 반환하는 역할
        # FastAPI의 Depends()를 통해 엔드포인트에서 데이터베이스 세션을 사용 가능

        # 📌 **세션(Session)**은 데이터베이스와의 연결을 관리하는 객체로,
        #  쿼리 실행, 데이터 추가, 수정, 삭제 등의 작업을 할 때 사용됨.

class UserRegister(BaseModel):
    username : str
    password : str

class UserLogin(BaseModel):
    username : str
    password : str

class Token(BaseModel):
    access_token : str
    token_type : str

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Passlib 라이브러리를 사용해 비밀번호를 안전하게 해싱하고 검증하기위한 설정
# bcrypt 알고리즘을 사용하여 비밀번호 암호화
# deprecated 사용해 이전 해싱 방식이 자동으로 업데이트 되도록 설정

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
# OAuth2 인증 방식 중 “Password Flow” 방식을 사용하여 액세스 토큰을 발급하고 인증을 수행
# OAuth2PasswordBearer(tokenUrl="login") → 사용자가 로그인할 때 JWT 토큰을 발급하는 URL을 지정
# FastAPI의 Depends(oauth2_scheme)를 통해 자동으로 요청 헤더에서 토큰을 추출 가능

def hash_password(password: str):
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str):
    return pwd_context.verify(plain_password, hashed_password)

SECRET_KEY = "secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

def create_access_token(data: dict): # JWT 엑세스 토큰을 생성하는 함수 
    to_encode = data.copy() # data 딕셔너리를 복사하여 변경해도 원본 데이터가 영향을 받지 않도록 보호
    expire = datetime.now() + timedelta(minutes =  ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire}) 
    # to_encode 딕셔너리에 "exp"(만료 시간) 키 추가
    # JWT를 생성할 때 만료 시간이 포함되어야 자동으로 검증 가능
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) # JWT를 생성하는 핵심 코드
    return encoded_jwt

@app.post("/register")
async def register(user: UserRegister, db : AsyncSession = Depends(get_db)):
    new_user = User(username=user.username, password= hash_password(user.password))
    db.add(new_user)
    await db.commit()
    await db.refresh(new_user)

    return new_user

@app.post("/login")
async def login_user(user: UserLoginRegister, db: AsyncSession= Depends(get_db)):
    result = await db.execute(select(User).where(User.username == user.username))
    existing_user = result.scalars().first()
    if not existing_user or not verify_password(user.password, existing_user.password):
        raise HTTPException(
            status_code=404,
            detail="User not found"
        )
    access_token = create_access_token(data={"sub": existing_user.username})
    return {"access_token": access_token, "token_type": "bearer"}

async def get_current_user(token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db)):
    try:
        # 1. JWT 토큰 디코딩
        # SECRET_KEY와 알고리즘을 사용하여 토큰을 디코딩합니다.
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])

        # 2. 사용자 이름 추출
        # 토큰 페이로드에서 "sub" (subject), "exp" 필드를 가져옵니다.
        username = payload.get("sub")
        exp = payload.get("exp")

        result = await db.execute(select(User).filter(User.username == username))
        user = result.scalars().first()

        # 3. 토큰이 유효하지 않거나 sub 필드가 없으면 오류 반환
        if username is None:
            raise HTTPException(status_code=401, detail="Invalid token")

        # 4. 사용자 검증
        # 데이터베이스(또는 가짜 DB)에서 사용자가 존재하는지 확인합니다.
        if not user:
            raise HTTPException(status_code=401, detail="User not found")

        # 5. 토큰 만료 시간 검증
        # exp시간을 확인합니다.
        if exp is None or datetime.utcnow() > datetime.utcfromtimestamp(exp):
            raise HTTPException(status_code=401, detail="Token has expired")

        # 6. 검증된 사용자 정보 반환
        return user

    except JWTError:
        # 7. 토큰 검증 실패 시 오류 반환
        raise HTTPException(status_code=401, detail="Invalid token")

@app.get("/profile")
async def get_profile(current_user: dict = Depends(get_current_user)):
    return current_user
# 현재 로그인한 사용자의 토큰 만료 시간을 가져오는 GET /token-expiry API
@app.get("/token_expiry")
async def token_expiry(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        exp = payload.get("exp")
        if exp is None:
            raise HTTPException(status_code=401, detail="Token missing expiration time")

        if datetime.utcnow() > datetime.utcfromtimestamp(exp):
            raise HTTPException(status_code=404, detail="Expired token")

        return {"expire_time": datetime.utcfromtimestamp(exp)}
    except ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Expired token")
    except JWTError:
        raise HTTPException(status_code=404, detail="Invalid token")

# 관리자만 접근 가능한 GET /admin API
@app.get("/admin/")
async def admin(current_user: dict = Depends(get_current_user)):
    if current_user.role == "admin":
        return {"msg":"welcome admin"}
    raise HTTPException(status_code=403, detail="Forbidden")

# 사용자의 프로필을 수정하는 POST /profile API
@app.put("/profile/")
async def put_profile(
        update_user: UserLogin,
        current_user: dict = Depends(get_current_user),
        db: AsyncSession = Depends(get_db)
):
    result = await db.execute(select(User).filter(User.username == current_user.username))
    existing_user = result.scalars().first()

    if not existing_user:
        raise HTTPException(status_code=404, detail="Its not your profile.")
    existing_user.username = update_user.username
    await db.commit()
    await db.refresh(existing_user)

    return {"message": "Profile updated successfully", "username": existing_user.username}

0개의 댓글

관련 채용 정보