FastAPI DB(SQLAlchemy)

Haks.·2025년 2월 13일
0

Study

목록 보기
58/65

SQLAlchemy

  • poetry add sqlalchemy
  • Engine
    • 데이터베이스와의 연결을 관리하는 핵심 요소
    • 데이터베이스 URL을 통해 연결 설정
from sqlalchemy import create_engine
engine = create_engine("sqlite:///./test.db")

# 보통 
DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(
	DATABASE_URL,
    echo=True
)
  • Session

    • 데이터베이스와의 작업(삽입, 조회, 수정, 삭제)을 관리하며,트랜잭션을 제어
    • 여러 작업을 하나의 트랜잭션으로 묶어 처리 가능
  • Base

    • 테이블 정의를 위한 클래스 기반
    • 모든 테이블 클래스는 Base를 상속받아 정의
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
  • SQLAlchemy 기본 문법
    • 데이터베이스 생성
Base.metadata.create_all(bind=engine)
  • 함수선언시 실행방법
@app.on_event("startup")
async def startup():
	await init_db()

ForeginKey, 역참조

  • relationship : 역참조할 명 서로 지정해줘야함
    • 첫번쨰 인자 :
      • SQL에선 없는 타입이라 SQLAlchemy 내에서 찾아야함
      • 해당 모델 클래스명 입력
    • 두번째인자 back_populates=
      • 반대편 필드의 ForeginKey로 지정 된 것을 지정
      • 어떤 필드와 연결될건지 지정

1:N 관계

class User(Base):
    __tablename__ = 'users'

    user_id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
    username = Column(String, nullable=False)
    email = Column(String, nullable=False)
    password = Column(String, nullable=False)
    role = Column(String, default = "user")
    is_active = Column(Boolean)

    # 첫번째인자: 연결할 모델 클래스명, 유저에서 찾는거니까,
    # 생성시에 역참조할거는 아직 DB에 없으니까 함수내에서 찾아야함 그래서 클래스명 자체를 쓰는거임 Order
    # sql 에선 실재로 이런관계가 없으니 sqlalchemy 자체에서 찾아야함 그래서 클래스명으로 사용
    # 두번째인자: Order테이블의 user를 참조한것
    # relationship()은 ForeignKey()와 다르게 SQLAlchemy ORM 내부에서 참조할 필드명을 사용해야 하기 때문에 user를 써야 함
    orders = relationship("Order", back_populates="user", cascade="all, delete")


class Order(Base):
    __tablename__ = 'orders'

    order_id = Column(Integer, primary_key=True, autoincrement=True)
    username = Column(String, nullable=False)
    total_price = Column(DECIMAL(65, 2))
    is_paid = Column(Boolean)
    created_at = Column(DATETIME, default=datetime.utcnow)

    # 실제 users의 DB테이블 이름을 가르킴, 이건 SQL내에서 실행되야 하는거니까

    user_id = Column(ForeignKey('users.user_id'),nullable=False)
    user = relationship("User", back_populates="orders")

N:N 관계 : secondary 에 두 테이블을 연결할 모델이 필요함

order_products = Table(
    "order_products",
    Base.metadata,
    Column("order_id", ForeignKey("orders.order_id"), primary_key=True),
    Column("product_id", ForeignKey("products.product_id"), primary_key=True)
)

class Order(Base):
    __tablename__ = 'orders'

    order_id = Column(Integer, primary_key=True, autoincrement=True)
    username = Column(String, nullable=False)
    total_price = Column(DECIMAL(65, 2))
    is_paid = Column(Boolean)
    created_at = Column(DATETIME, default=datetime.utcnow)

    # 실제 users의 DB테이블 이름을 가르킴, 이건 SQL내에서 실행되야 하는거니까

    user_id = Column(ForeignKey('users.user_id'),nullable=False)
    user = relationship("User", back_populates="orders")
    
    # Product 과 참조
    products = relationship("Product", secondary=order_products, back_populates="orders")


class Product(Base):
    __tablename__ = 'products'

    product_id = Column(String, primary_key=True)
    name = Column(String, nullable=False)
    price = Column(Float, nullable=False)
    discount = Column(Float)
    final_price = Column(Float)
    
    # Order와 참조
    orders = relationship("Order", secondary=order_products, back_populates="products")
  • FastAPI : joinedload() = Django : select_related(), JOIN
    • JOIN을 사용하여 한 번의 쿼리로 데이터를 가져옴 (즉시 로드)
    • 1:N 관계에서 N이 많으면 중복 데이터가 많아지고 비효율적
    • User 1개에 대한 모든 주문 내역을 한 번에 가져올 때 적합
    • 관리자가 특정 사용자 주문 내역을 보고 싶을 때
# Django
order = Order.objects.select_related("users").get(order_id=1)
# FastAPI
result = await db.execute(
        select(Order).options(joinedload(Order.users)).where(Order.Order_id == Order_id)
    )
  • FastAPI : selectinload() = Django : prefetch_related(), 전부 가져옴 한번에
    • IN 서브쿼리를 사용하여 여러 개의 데이터를 한 번에 로드 (즉시 로드)
    • 1:N 관계에서 N이 많을 경우 JOIN보다 효율적
    • User 목록을 가져오면서 각 사용자의 주문도 같이 로드할 때 적합
    • 여러 개의 User에 대해 Order 데이터를 한꺼번에 가져올 때 사용
# Django
user = User.objects.prefetch_related("orders").get(user_id=1)
# FastAPI
result = await db.execute(
        select(User).options(selectinload(User.orders)).where(User.user_id == user_id)
    )
  • API
@orders_router.post("/")
async def create_order(
        order: OrderSchema,
        db: AsyncSession=Depends(get_db),
        user : dict = Depends(get_current_user)
    ):
    products = await db.execute(select(Product).where(Product.product_id.in_(order.product_id)))
    products = products.scalars().all()

    if not products:
        raise HTTPException(status_code=400, detail="No valid products found")

    new_order = Order(
        username = user.username,
        total_price = order.total_price,
        is_paid = order.is_paid
    )

    db.add(new_order)
    await db.commit()
    await db.refresh(new_order)
    # 주문이 없는 관계에서 바로 추가하면 주문이 없는 상태이기에 에러가 발생 그래서 따로 생성해야함
    # Many-to-Many 관계(relationship(secondary=order_products))에서 .extend()를 호출하면,
    # SQLAlchemy ORM이 내부적으로 변경을 감지하고 트랜잭션에 포함함
    # 이 트랜잭션을 DB에 반영하려면 반드시 db.commit()을 호출해야 함
    new_order.products.extend(products)
    await db.commit()
    await db.refresh(new_order)
    return new_order


@orders_router.get(
    "/",
    response_model=List[OrderSchema],
    response_model_exclude={
        "order_id",
        "is_paid",
        "created_at"
    }
)
async def current_user_orders(db: AsyncSession = Depends(get_db), user: dict = Depends(get_current_user)):
    orders = await db.execute(
        select(Order)
        .options(selectinload(Order.products))
        .where(Order.username == user.username)
    )
    orders = orders.scalars().all()

    return orders

🧑‍💻 총 코드

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

engine = create_engine("sqlite:///./test.db")
SessionLocal = sessionmaker (bind=engine)
session = SessionLocal()

Base = declarative_base()
Base.metadata.create_all(bind=engine)

🧑‍💻 CRUD

# 데이터 삽입
session = SessionLocal()
new_user = User(name="John")
session.add(new_user)
session.commit()

# 조회
users = session.query(User).all()
for user in users:
    print(user.name)
# 조건조회
user = session.query(User).filter(User.name =='John').first()
print(user.id, user.name, user.email)

# 수정
user = session.query(User).filter(User.name =='John').first()
if user:
    user.email = "new@new.com"
    session.commit()
# 삭제
if user:
    session.delete(user)
    session.commit()

🧑‍💻 실습

from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session

DATABASE_URL = "sqlite:///./test.db"

engine = create_engine(
    DATABASE_URL,
    connect_args={"check_same_thread": False}, # 멀티 쓰레드 활성화
)

SessionLocal = sessionmaker(
    autocommit=False,
    autoflush=False,
    bind=engine
)

Base = declarative_base()

app = FastAPI()

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    email = Column(String, unique=True,index=True)

Base.metadata.create_all(bind=engine)

class UserResponse(BaseModel):
    id : int
    name: str
    email: str

def get_db(): # 데이터베이스 연결
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

@app.get("/users/name")
def read_username(db:Session = Depends(get_db)):
    users = db.query(User.name).order_by(User.name, User.id.desc()).all()
    return [user[0] for user in users]

@app.get("/users/count")
def read_username(db:Session = Depends(get_db)):
    users = db.query(User.name).order_by(User.name, User.id.desc()).all()
    return len(users)
@app.get("/users/")
def read_users(skip: int = 0, limit: int = 10,db: Session = Depends(get_db)):
    users = db.query(User).order_by(User.name, User.id.desc()).offset(skip).limit(limit).all()
    return users
# offset(skip): skip 개수만큼 건너뛰고 데이터를 가져옴.
# limit(limit): 최대 limit 개수만큼 데이터만 가져옴.
@app.post("/users/add/")
def add_user(user : UserResponse, db : Session = Depends(get_db)):
    new_user = User(id=user.id, name=user.name, email=user.email)
    db.add(new_user)
    db.commit()
    return {
        "msg" : "success create"
    }

@app.get("/users/search")
def read_search_user(email : str= None, name: str =None, db: Session = Depends(get_db)):
    if email:
        user = db.query(User).filter(User.email == email).first()
        if user:
            return user
    if name:
        user = db.query(User).filter(User.name.like(f"%{name}%")).first()
        if user:
            return user
    raise HTTPException(status_code=404, detail="User not found")



@app.get("/users/{user_id}")
def read_user(user_id: int, db : Session = Depends(get_db)):
    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

@app.get("/users/ge/{user_id}")
def read_ge_user(user_id : int, db : Session = Depends(get_db)):
    user = db.query(User).filter(User.id >= user_id).all()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

0개의 댓글

관련 채용 정보