FastAPI DB(Async)

Haks.·2025년 2월 13일
0

How to use

목록 보기
26/32

비동기 Asynchronous

비교 항목비동기 (Asynchronous)동기 (Synchronous)
실행 방식여러 작업을 동시에 실행 가능한 번에 하나의 작업만 실행
코드 구조async/await 사용일반적인 함수 호출
실행 순서await이 걸린 작업을 기다리는 동안 다른 작업 실행 가능하나의 작업이 끝나야 다음 작업 실행
성능CPU 및 I/O 작업을 병렬로 실행하여 속도 향상하나씩 순차적으로 실행되어 속도가 느릴 수 있음
예제async def fetch_data(): await asyncio.sleep(3)def fetch_data(): time.sleep(3)
사용 사례API 호출, 데이터베이스 쿼리, 웹 크롤링 등간단한 계산, 파일 처리 등
멀티태스킹가능 (asyncio.gather() 활용)불가능 (작업이 끝날 때까지 대기)
리소스 활용효율적 (작업 대기 중에도 다른 작업 수행)비효율적 (작업이 끝날 때까지 대기)
에러 처리try/exceptawait 조합 필요일반적인 try/except 처리 가능

비동기 DB 작업

  • 데이터베이스 요청(ex. 데이터 삽입, 조회, 수정 등)을 처리할 때 I/O 작업(ex. 네트워크
    또는 파일 시스템 접근)이 발생
  • 이 I/O 작업 동안 애플리케이션이 멈추지 않고 다른 작업을 처리할 수 있도록 비차단
    방식을 사용

=> 성능 향상, 동시성 지원 ,자원 절약

비동기 await

  • 비동기 함수 내에서 순차적으로 실행하지만
  • 같은 비동기 함수가 gather로 묵여서 실행되었을때 순차적으로 진행되는걸 지정
  • await 함수가 실행중이면 그 밑의 작업은 진행하지 않음, 같이 실행된 다른 비동기 함수는 동작중

비동기 DB 작업과 SQLAlchemy

  • 비동기 DB 설정
    • create_async_engine
    • AsyncSession

기본설정

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession 
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker 
from sqlalchemy import Column, Integer, String
# 비동기 SoLite 데이터베이스 URL 설정
DATABASE_URL = "sqlite+aiosqlite:///./test.db"
# 비동기 SQLAtchemy 엔진 생성
async_engine = create_async_engine (
    DATABASE_URL,
    echo=True, # SOL 쿼리 로그 출력
)
# 비동기 세션 생성
AsyncSessionLocal = sessionmaker (
    bind=async_engine,
    class_=AsyncSession,
    expire_on_commit=False
)
# Base 클래스 생성
Base = declarative_base()
  • 데이터베이스 초기화
    • 테이블 생성: 동일하게 설정
    • 엔진 초기화: async with 사용
async def init_db():
    async with async_engine.begin() as conn:
        await conn. run_sync (Base.metadata.create_ all)
      
# 데이터베이스 초기화 실행
if _name_ == "_main_":
    import asyncio
    asyncio. run (init_db())

🧑‍💻CRUD 구현

from sqlalchemy import Column, Integer, String, text, select
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base

DATABASE_URL = "sqlite+aiosqlite:///./test.db"

async_engine = create_async_engine(
    DATABASE_URL,
    echo=True, # 쿼리 로그 출력
)

# 비동기 세션 생성
AsyncSessionLocal = sessionmaker(
    autocommit=False,
    autoflush=False,
    expire_on_commit=False,
    class_=AsyncSession,
)

# Base클래스 생성
Base = declarative_base()

# 데이터베이스 모델 정의
class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    email = Column(String, unique=True, index=True)

# 데이터베이스 초기화 함수
async def init_db(): # 테이블 생성 함수
    async with async_engine.connect() as conn:
    # async with : 연결을 자동으로 열고 닫는 역할(자원 누수를 방지)
    # async with 블록이 끝나면 자동으로 conn.close()가 호출됨.
    # 즉, async with는
    # try-finally처럼 동작하여 자원을 자동으로 정리함
    # 데이터베이스와 연결하는 코드
    
        await conn.run_sync(Base.metadata.create_all)
        # 모델을 생성하는 함수
        # await conn : 비동기 SQLAlchemey에서 동기 코드를 실행할 떄 사용하는 방식

# 비동기 세션 생성 함수
async def get_db():
    async with AsyncSessionLocal() as session:
        yield session
        # ✔ await은 비동기 작업이 끝날 때까지 기다리는 역할을 함.
        # ✔ yield는 함수의 실행을 멈추고, 나중에 다시 실행을 이어갈 수 있도록 하는 역할을 함.

# CRUD

# CREATE
async def create_user(name: str, email : str):
    async with AsyncSessionLocal() as session:
        new_user = User(name=name, email=email)
        session.add(new_user)
        await session.commit()
        await session.refresh(new_user)
        return new_user
# READ
async def read_user():
    async with AsyncSessionLocal() as session:
        result = await session.execute(text("SELECT * FROM users"))
        result2 = await session.execute(select(User))
        users = result.fetchall()
        users2 = result2.scalars().all()
        return users2
    # users = await session.query(User).all() # 비동기에서 사용 불가

async def read_user_by_email(email: str):
    async with AsyncSessionLocal() as session:
        result = await session.execute(
            text("SELECT * FROM users WHERE email = :email"), {"email": email}  # ✅ 바인딩 적용
        )
        result2 = await session.execute(select(User).filter(User.email == email))
        user = result.fetchone()
        user2 = result2.scalars().first()
        return user2

# PUT
async def update_user(user_id: int, name: str, email: str):
    async with AsyncSessionLocal() as session:
        result = await session.execute(
            text("SELECT * FROM users WHERE id = :user_id"),{"user_id": user_id} 
        )
        result2 = await session.execute(select(User).filter(User.id == user_id))
        user3 = await session.get(User, user_id)
        
        user = result.fetchone()
        user2 = result2.scalars().first()
        
        user2.name = name
        user2.email = email
        if not user:
            return None
        
        await session.commit()
        await session.refresh(user2)
        
        return user2

# DELETE
async def delete_user(user_id :int):
    async with AsyncSessionLocal() as session:
        # result = await session.execute(
        #     text("DELETE FROM users WHERE id = :user_id"),{"user_id": user_id} 
        # )
        
        user = await session.get(User, user_id)
        if not user:
            return None
        await session.delete(user)
        await session.commit()

await의 정확한 동작

  • await 함수 전체가 아니라 특정 비동기 작업을 기다린다!!!
    • await를 붙이면 해당 비동기 작업이 끝날때까지 실행을 멈추고 기다린다
    • 같은 비동기 함수 안의 다른코드들은 병렬적으로 실행될 수 있음

🧑‍💻 FastAPI 기반 CRUD 구현

from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy import Column, Integer, String, select
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, session
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

DATABASE_URL = "sqlite+aiosqlite:///./test.db"

async_engine = create_async_engine(
    DATABASE_URL,
    echo = True
)

AsyncSessionLocal = sessionmaker(
    bind = async_engine,
    expire_on_commit = False,
    class_=AsyncSession
)

Base = declarative_base()

app = FastAPI()

class Animal(Base):
    __tablename__ = "animals"

    id = Column(Integer, primary_key=True, index=True)
    species = Column(String, index=True)
    age = Column(Integer, index=True)
    name = Column(String, index=True)

async def init_db():
        async with async_engine.connect() as conn:
            await conn.run_sync(Base.metadata.create_all)

async def get_db():
    async with AsyncSessionLocal() as session:
        yield session

# 등록된 모든 동물 목록을 반환
@app.get("/animals/")
async def get_all_animals(db: AsyncSession = Depends(get_db)):
    results = await db.execute(select(Animal))
    animals = results.scalars().all()
    return animals

# 특정 동물 종의 세부 정보 조회
@app.get("/animals/by-species")
async def get_animals_by_species(species: str=None, db: AsyncSession = Depends(get_db)):
    if species:
        result = await db.execute(select(Animal).filter(Animal.species==species))
        animals = result.scalars().all()
        if not animals :
            raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
        return animals
    return []


# 특정 나이 이상의 동물 조회
@app.get("/animals/by-min-age")
async def get_animals_by_min_age(min_age:int=None, db: AsyncSession = Depends(get_db)):
    if min_age:
        results = await db.execute(select(Animal).filter(Animal.age >= min_age))
        animals = results.scalars().all()
        if not animals:
            raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
        return animals
    return []

# 특정 문자열의 이름을 가진 동물 검색
@app.get("/animals/search-by-name")
async def get_animals_by_name(name_contains: str=None, db: AsyncSession= Depends(get_db)):
    if name_contains:
        results = await db.execute(select(Animal).filter(Animal.name.like(f"%{name_contains}%")))
        animals = results.scalars().all()
        if not animals:
            raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
        return animals
    return []

# 동물 목록을 나이순(오름차순)으로 정렬
@app.get("/animals/age_asc")
async def get_animals_by_asc(db : AsyncSession = Depends(get_db)):
    results = await db.execute(select(Animal).order_by(Animal.age))
    animals = results.scalars().all()
    return animals

# 특정 id의 동물 정보 조회
@app.get("/animals/{animal_id}")
async def get_animal_by_id(animal_id: int, db: AsyncSession = Depends(get_db)):
    # results = await db.execute(select(Animal).filter(Animal.id == animal_id))
    # animal = results.scalars().first()
    animal = await db.get(Animal, animal_id)
    if not animal:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
    return animal

0개의 댓글

관련 채용 정보