FastAPI Backgroud Tasks

Haks.·2025년 2월 18일
0

How to use

목록 보기
30/32

Background Tasks란

  • 사용자와 직접적인 상호작용 없이 실행되는 작업
  • API 요청을 처리한 후 별도 스레드에서 작업 실행
  • BackgroundTasks.add_tasks()를 사용하여 작업 추가

동기 vs 비동기 vs 백그라운드 작업 차이

구분개념실행 방식장점단점
동기 (Synchronous)요청이 끝날 때까지 다음 코드 실행 안됨순차적으로 코드 실행코드 흐름이 단순함성능이 느림
비동기 (Asynchronous)async/await를 사용하여 다른 작업 실행 가능비동기 I/O, 이벤트 루프 활용동시성 처리 가능코드가 복잡해질 수 있음
백그라운드 작업 (BackgroundTasks)요청 응답 후에도 실행되는 별도 작업FastAPI의 BackgroundTasks 사용응답 속도가 빠름실행이 보장되지 않음
  • 백그라운드 작업 방식: 즉시 응답을 반환후, 실제 작업은 백그라운드에서 실행
    • 실행 예시 : 콘솔에선 5초후에 작업이 실행
from fastapi import FastAPI, BackgroundTasks
import time

app = FastAPI()

def background_task():
	time.sleep(5)
    print("백그라운드 작업 완료")

@app.get("/")
async def stsart_background_task(background_tasks: BacgroundTasks):
	background_tasks.add_task(background_task)
    return {"msg": "백그라운드 작업 실행 중..."}

Background Tasks가 필요한 이유

  • 이메일 전송: 사용자가 요청하면 즉시 응답을 주고, 실제 이메일 전송은 백그라운드에서 진행
  • 파일 업로드: 파일을 업로드한 후, 처리는 백그라운드에서 수행
  • 로그 기록: API 호출 로그를 백그라운드에서 저장
  • 장기 실행 작업: 데이터 분석, PDF 생성, 크롤링 등
  • 이점
    • 빠른 응답 시간: 사용자는 즉시 응답을 받고 작업은 따로 실행
    • 서버 부하 감소: 여러 요청이 들어와도 메인 스레드가 차단되지 않음
    • 유저 경험 개선: 이메일 전송이나 데이터 저장 작업이 끝날 때까지 기다릴 필요 없음

FastAPI의 Background Tasks 작동 방식

  1. API 요청을 받음
  2. add_task()에 해당 요청 처리
  3. FastAPI는 일단 즉시 응답을 반환
  4. add_task()에 대한 내용이 백그라운드에서 실행됨
  5. 그 외 서버가 다른 요청을 처리하는 동안 작업이 진행됨

Background Tasks 주의점

  • 백그라운드 작업이 실패해도 클라이언트는 모름

    • 작업이 실패해도 응답은 이미 전송됨
    • 해결책: 작업 결과를 데이터베이스에 저장하고 후속 확인 API 제공
  • 서버 재시작 시 작업이 사라짐

    • 서버가 재시작되면 미완료된 백그라운드 작업이 유실됨
    • 해결책: Celery같은 작업 큐 사용 고려
  • 대량의 백그라운드 작업은 성능 저하 가능성

    • 한 번에 너무 많은 작업이 들어오면 서버 성능 저하
    • 해결책: 워커 프로세스를 활용하여 작업 분산

🧑‍💻 예시

  • 이메일 전송 : 즉시 응답 주고, 이메일 전송은 백그라운드에서 진행
from fastapi import FastAPI, BackgroundTasks 
import time
app = FastAPI()

def send email(email: str, message: str) :
	time.sleep(5)
    pirnt(f"이메일 전송 완료: {email}, 내용: {message}")
@app.post("/send-email/")
async def send_email_endpoint(email: str, message: str, background_tasks: BackgroundTasks):
	background_tasks.add_task(send_email, email, message)
    return {"message": "이메일 전송 중..."}
  • 로그 기록 : 요청이 끝난 후 로그가 저장 됨
def write_log(log_message: str):
	with open("log.txt", "a") as log_file: # append 추가모드
    log_file.write(log_message + "\n")
  • 사용자 활동 기록: DB에 사용자 활동을 백그라운드에서 기록
# database.py
from sqlalchemy import Column, Integer, String, create_engine 
from sqlalchemy.orm import sessionmaker, declarative_base

DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False}) # sqlite 전용 설정
SessionLocal = sessionmaker(autocommit=False, autoflush=False,bind=engine)
Base = declarative_base()

class UserActivity(Base):
    _tablename_ = "user_activities"
    id = Column (Integer, primary_key=True, index=True)
    username = Column (String, index=True)
    action = Column (String)

Base.metadata.create_all (bind=engine)

def get_db() :
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# main.py
from fastapi import FastAPI, BackgroundTasks, Depends
from sqlalchemy.orm import Session 
from database import get_db, UserActivity
app = FastAPI()

def save_activity(username: str, action: str, db: Session):
	new_activity = UserActivity(username=username, action = action)
    db.add(new_activity)
    db.commit()

@app.post("/track-activity/")
async def track_activity(username: str, action: str, background_tasks: BackgroundTasks, db: Session=Depends(get_db)):
	background_tasks.add_task(save_activity, username, action, db)
  • backtracking 예시(파일 업로드)
# database.py
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import Column, Integer, String, create_engine


DATABASE_URL = "sqlite:///./test.db"

engine = create_engine(
    DATABASE_URL,
    connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

class FileRecord(Base):
    __tablename__ = "items"
    id = Column(Integer, primary_key=True, index=True)
    filename = Column(String, index=True)
    status = Column(String)
    
Base.metadata.create_all(bind=engine)

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# main.py

from fastapi import BackgroundTasks, Depends, FastAPI, File, UploadFile
from pydantic import BaseModel
from sqlalchemy.orm import Session
import os

from database import get_db, FileRecord


app = FastAPI()

UPLOAD_DIR = "uploads"
# 지정된 경로의 폴더를 생성하는 함수
# 해당폴더가 이미 존재해도 에러발생 x
os.makedirs(UPLOAD_DIR, exist_ok=True)

# os.path.join(UPLOAD_DIR, filename)
# UPLOAD_DIR 디렉토리 안에 filename 파일을 저장할 경로를 만듭니다
# os.path.join()을 사용하면 OS에 따라 적절한 경로 구분자를 자동으로 적용합니다.

# "wb" 모드는 바이너리 쓰기 모드
# 텍스트 파일이 아닌 이미지, 동영상, PDF 같은 바이너리 파일을 저장할 때 사용
def save_file(filename: str, content: bytes):
	with open(os.path.join(UPLOAD_DIR, filename), "wb") as f:
    	f.write(content)
    print(f"파일 저장 완료: {filename}")

# ValueError: I/O operation on closed file. 발생
# 로컬에서 파일으 업로드하기에 경로를 명시적으로 지정을 안하면 인식을 못함, 아래 비추
# def save_fiel(file: UploadFile):
#     with open(f"uploads/{file.filename}", "wb") as f:
#         f.write(file.file.read())
#     pritn(f"파일 저장 완료: {file.filename}")

def log_file_upload(filename: str, db: Session):
	new_record = FileRecord(filename=filename, status="Saved")
    db.add(new_record)
    db.commit()
    print(f"로그 저장 완료: {filename}")

@app.post("/upload-file/")
async def upload_file(file: UploadFile = File(...),
				background_tasks: BacgroundTasks = BackgroundTasks(),
                db: Session = Depends(get_db)):
	file_contet = await file.read() # 파일을 미리 읽어서 백그라운드 작업으로 전달
    background_tasks.add_task(save_file,file.filename, file_content)
    bacgroound_tasks.add_task(log_file_upload, file.filename, db)
    
    return {"msg" : " 파일 업로드 중 ..."}
  • DB에 넣으려면, 테이블에 추가 (LargeBinary)
from fastapi import FastAPI, UploadFile, File, HTTPException
from sqlalchemy import Column, Integer, LargeBinary, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session

app = FastAPI()

Base = declarative_base()
DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(bind=engine)

class FileData(Base):
    __tablename__ = "files"
    id = Column(Integer, primary_key=True, autoincrement=True)
    data = Column(LargeBinary)

Base.metadata.create_all(bind=engine)

@app.post("/upload/")
async def upload_file(file: UploadFile = File(...)):
    db: Session = SessionLocal()
    file_content = await file.read()
    
    new_file = FileData(data=file_content)
    db.add(new_file)
    db.commit()
    db.refresh(new_file)
    db.close()

    return {"file_id": new_file.id}

@app.get("/download/{file_id}")
async def download_file(file_id: int):
    db: Session = SessionLocal()
    file_data = db.query(FileData).filter(FileData.id == file_id).first()
    db.close()
    
    if not file_data:
        raise HTTPException(status_code=404, detail="File not found")
    
    return {"file_content": file_data.data}  # 실제 서비스에서는 파일을 반환하는 방식 사용
  • api 코드는 동일함 save 할때 그냥 contet를 추가적으로 넣으면됨
new_record = FileRecord(filename=filename, status="Saved", file = content)
  • DB에서 이미지 불러오기, 똑같이 가지고 오면되지만 return값에, media_type ="png/image"를 넣어준다
@app.get("/image/{image_id}")
async def get_image(image_id: int):
    db: Session = SessionLocal()
    image_data = db.query(ImageData).filter(ImageData.id == image_id).first()
    db.close()
    
    if not image_data:
        raise HTTPException(status_code=404, detail="Image not found")
    
    return Response(content=image_data.data, media_type="image/png")

0개의 댓글

관련 채용 정보