- poetry add sqlalchemy
from sqlalchemy import create_engine
engine = create_engine("sqlite:///./test.db")
# 보통
DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(
DATABASE_URL,
echo=True
)
Session
Base
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
Base.metadata.create_all(bind=engine)
@app.on_event("startup")
async def startup():
await init_db()
relationship
: 역참조할 명 서로 지정해줘야함back_populates=
class User(Base):
__tablename__ = 'users'
user_id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
username = Column(String, nullable=False)
email = Column(String, nullable=False)
password = Column(String, nullable=False)
role = Column(String, default = "user")
is_active = Column(Boolean)
# 첫번째인자: 연결할 모델 클래스명, 유저에서 찾는거니까,
# 생성시에 역참조할거는 아직 DB에 없으니까 함수내에서 찾아야함 그래서 클래스명 자체를 쓰는거임 Order
# sql 에선 실재로 이런관계가 없으니 sqlalchemy 자체에서 찾아야함 그래서 클래스명으로 사용
# 두번째인자: Order테이블의 user를 참조한것
# relationship()은 ForeignKey()와 다르게 SQLAlchemy ORM 내부에서 참조할 필드명을 사용해야 하기 때문에 user를 써야 함
orders = relationship("Order", back_populates="user", cascade="all, delete")
class Order(Base):
__tablename__ = 'orders'
order_id = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String, nullable=False)
total_price = Column(DECIMAL(65, 2))
is_paid = Column(Boolean)
created_at = Column(DATETIME, default=datetime.utcnow)
# 실제 users의 DB테이블 이름을 가르킴, 이건 SQL내에서 실행되야 하는거니까
user_id = Column(ForeignKey('users.user_id'),nullable=False)
user = relationship("User", back_populates="orders")
order_products = Table(
"order_products",
Base.metadata,
Column("order_id", ForeignKey("orders.order_id"), primary_key=True),
Column("product_id", ForeignKey("products.product_id"), primary_key=True)
)
class Order(Base):
__tablename__ = 'orders'
order_id = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String, nullable=False)
total_price = Column(DECIMAL(65, 2))
is_paid = Column(Boolean)
created_at = Column(DATETIME, default=datetime.utcnow)
# 실제 users의 DB테이블 이름을 가르킴, 이건 SQL내에서 실행되야 하는거니까
user_id = Column(ForeignKey('users.user_id'),nullable=False)
user = relationship("User", back_populates="orders")
# Product 과 참조
products = relationship("Product", secondary=order_products, back_populates="orders")
class Product(Base):
__tablename__ = 'products'
product_id = Column(String, primary_key=True)
name = Column(String, nullable=False)
price = Column(Float, nullable=False)
discount = Column(Float)
final_price = Column(Float)
# Order와 참조
orders = relationship("Order", secondary=order_products, back_populates="products")
# Django
order = Order.objects.select_related("users").get(order_id=1)
# FastAPI
result = await db.execute(
select(Order).options(joinedload(Order.users)).where(Order.Order_id == Order_id)
)
# Django
user = User.objects.prefetch_related("orders").get(user_id=1)
# FastAPI
result = await db.execute(
select(User).options(selectinload(User.orders)).where(User.user_id == user_id)
)
@orders_router.post("/")
async def create_order(
order: OrderSchema,
db: AsyncSession=Depends(get_db),
user : dict = Depends(get_current_user)
):
products = await db.execute(select(Product).where(Product.product_id.in_(order.product_id)))
products = products.scalars().all()
if not products:
raise HTTPException(status_code=400, detail="No valid products found")
new_order = Order(
username = user.username,
total_price = order.total_price,
is_paid = order.is_paid
)
db.add(new_order)
await db.commit()
await db.refresh(new_order)
# 주문이 없는 관계에서 바로 추가하면 주문이 없는 상태이기에 에러가 발생 그래서 따로 생성해야함
# Many-to-Many 관계(relationship(secondary=order_products))에서 .extend()를 호출하면,
# SQLAlchemy ORM이 내부적으로 변경을 감지하고 트랜잭션에 포함함
# 이 트랜잭션을 DB에 반영하려면 반드시 db.commit()을 호출해야 함
new_order.products.extend(products)
await db.commit()
await db.refresh(new_order)
return new_order
@orders_router.get(
"/",
response_model=List[OrderSchema],
response_model_exclude={
"order_id",
"is_paid",
"created_at"
}
)
async def current_user_orders(db: AsyncSession = Depends(get_db), user: dict = Depends(get_current_user)):
orders = await db.execute(
select(Order)
.options(selectinload(Order.products))
.where(Order.username == user.username)
)
orders = orders.scalars().all()
return orders
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
engine = create_engine("sqlite:///./test.db")
SessionLocal = sessionmaker (bind=engine)
session = SessionLocal()
Base = declarative_base()
Base.metadata.create_all(bind=engine)
# 데이터 삽입
session = SessionLocal()
new_user = User(name="John")
session.add(new_user)
session.commit()
# 조회
users = session.query(User).all()
for user in users:
print(user.name)
# 조건조회
user = session.query(User).filter(User.name =='John').first()
print(user.id, user.name, user.email)
# 수정
user = session.query(User).filter(User.name =='John').first()
if user:
user.email = "new@new.com"
session.commit()
# 삭제
if user:
session.delete(user)
session.commit()
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(
DATABASE_URL,
connect_args={"check_same_thread": False}, # 멀티 쓰레드 활성화
)
SessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=engine
)
Base = declarative_base()
app = FastAPI()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
email = Column(String, unique=True,index=True)
Base.metadata.create_all(bind=engine)
class UserResponse(BaseModel):
id : int
name: str
email: str
def get_db(): # 데이터베이스 연결
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.get("/users/name")
def read_username(db:Session = Depends(get_db)):
users = db.query(User.name).order_by(User.name, User.id.desc()).all()
return [user[0] for user in users]
@app.get("/users/count")
def read_username(db:Session = Depends(get_db)):
users = db.query(User.name).order_by(User.name, User.id.desc()).all()
return len(users)
@app.get("/users/")
def read_users(skip: int = 0, limit: int = 10,db: Session = Depends(get_db)):
users = db.query(User).order_by(User.name, User.id.desc()).offset(skip).limit(limit).all()
return users
# offset(skip): skip 개수만큼 건너뛰고 데이터를 가져옴.
# limit(limit): 최대 limit 개수만큼 데이터만 가져옴.
@app.post("/users/add/")
def add_user(user : UserResponse, db : Session = Depends(get_db)):
new_user = User(id=user.id, name=user.name, email=user.email)
db.add(new_user)
db.commit()
return {
"msg" : "success create"
}
@app.get("/users/search")
def read_search_user(email : str= None, name: str =None, db: Session = Depends(get_db)):
if email:
user = db.query(User).filter(User.email == email).first()
if user:
return user
if name:
user = db.query(User).filter(User.name.like(f"%{name}%")).first()
if user:
return user
raise HTTPException(status_code=404, detail="User not found")
@app.get("/users/{user_id}")
def read_user(user_id: int, db : Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@app.get("/users/ge/{user_id}")
def read_ge_user(user_id : int, db : Session = Depends(get_db)):
user = db.query(User).filter(User.id >= user_id).all()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user