- 사용자와 직접적인 상호작용 없이 실행되는 작업
- API 요청을 처리한 후 별도 스레드에서 작업 실행
BackgroundTasks.add_tasks()
를 사용하여 작업 추가
구분 | 개념 | 실행 방식 | 장점 | 단점 |
---|---|---|---|---|
동기 (Synchronous) | 요청이 끝날 때까지 다음 코드 실행 안됨 | 순차적으로 코드 실행 | 코드 흐름이 단순함 | 성능이 느림 |
비동기 (Asynchronous) | async/await 를 사용하여 다른 작업 실행 가능 | 비동기 I/O, 이벤트 루프 활용 | 동시성 처리 가능 | 코드가 복잡해질 수 있음 |
백그라운드 작업 (BackgroundTasks) | 요청 응답 후에도 실행되는 별도 작업 | FastAPI의 BackgroundTasks 사용 | 응답 속도가 빠름 | 실행이 보장되지 않음 |
from fastapi import FastAPI, BackgroundTasks
import time
app = FastAPI()
def background_task():
time.sleep(5)
print("백그라운드 작업 완료")
@app.get("/")
async def stsart_background_task(background_tasks: BacgroundTasks):
background_tasks.add_task(background_task)
return {"msg": "백그라운드 작업 실행 중..."}
- 이메일 전송: 사용자가 요청하면 즉시 응답을 주고, 실제 이메일 전송은 백그라운드에서 진행
- 파일 업로드: 파일을 업로드한 후, 처리는 백그라운드에서 수행
- 로그 기록: API 호출 로그를 백그라운드에서 저장
- 장기 실행 작업: 데이터 분석, PDF 생성, 크롤링 등
백그라운드 작업이 실패해도 클라이언트는 모름
서버 재시작 시 작업이 사라짐
대량의 백그라운드 작업은 성능 저하 가능성
from fastapi import FastAPI, BackgroundTasks
import time
app = FastAPI()
def send email(email: str, message: str) :
time.sleep(5)
pirnt(f"이메일 전송 완료: {email}, 내용: {message}")
@app.post("/send-email/")
async def send_email_endpoint(email: str, message: str, background_tasks: BackgroundTasks):
background_tasks.add_task(send_email, email, message)
return {"message": "이메일 전송 중..."}
def write_log(log_message: str):
with open("log.txt", "a") as log_file: # append 추가모드
log_file.write(log_message + "\n")
# database.py
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False}) # sqlite 전용 설정
SessionLocal = sessionmaker(autocommit=False, autoflush=False,bind=engine)
Base = declarative_base()
class UserActivity(Base):
_tablename_ = "user_activities"
id = Column (Integer, primary_key=True, index=True)
username = Column (String, index=True)
action = Column (String)
Base.metadata.create_all (bind=engine)
def get_db() :
db = SessionLocal()
try:
yield db
finally:
db.close()
# main.py
from fastapi import FastAPI, BackgroundTasks, Depends
from sqlalchemy.orm import Session
from database import get_db, UserActivity
app = FastAPI()
def save_activity(username: str, action: str, db: Session):
new_activity = UserActivity(username=username, action = action)
db.add(new_activity)
db.commit()
@app.post("/track-activity/")
async def track_activity(username: str, action: str, background_tasks: BackgroundTasks, db: Session=Depends(get_db)):
background_tasks.add_task(save_activity, username, action, db)
# database.py
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import Column, Integer, String, create_engine
DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(
DATABASE_URL,
connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
class FileRecord(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True, index=True)
filename = Column(String, index=True)
status = Column(String)
Base.metadata.create_all(bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# main.py
from fastapi import BackgroundTasks, Depends, FastAPI, File, UploadFile
from pydantic import BaseModel
from sqlalchemy.orm import Session
import os
from database import get_db, FileRecord
app = FastAPI()
UPLOAD_DIR = "uploads"
# 지정된 경로의 폴더를 생성하는 함수
# 해당폴더가 이미 존재해도 에러발생 x
os.makedirs(UPLOAD_DIR, exist_ok=True)
# os.path.join(UPLOAD_DIR, filename)
# UPLOAD_DIR 디렉토리 안에 filename 파일을 저장할 경로를 만듭니다
# os.path.join()을 사용하면 OS에 따라 적절한 경로 구분자를 자동으로 적용합니다.
# "wb" 모드는 바이너리 쓰기 모드
# 텍스트 파일이 아닌 이미지, 동영상, PDF 같은 바이너리 파일을 저장할 때 사용
def save_file(filename: str, content: bytes):
with open(os.path.join(UPLOAD_DIR, filename), "wb") as f:
f.write(content)
print(f"파일 저장 완료: {filename}")
# ValueError: I/O operation on closed file. 발생
# 로컬에서 파일으 업로드하기에 경로를 명시적으로 지정을 안하면 인식을 못함, 아래 비추
# def save_fiel(file: UploadFile):
# with open(f"uploads/{file.filename}", "wb") as f:
# f.write(file.file.read())
# pritn(f"파일 저장 완료: {file.filename}")
def log_file_upload(filename: str, db: Session):
new_record = FileRecord(filename=filename, status="Saved")
db.add(new_record)
db.commit()
print(f"로그 저장 완료: {filename}")
@app.post("/upload-file/")
async def upload_file(file: UploadFile = File(...),
background_tasks: BacgroundTasks = BackgroundTasks(),
db: Session = Depends(get_db)):
file_contet = await file.read() # 파일을 미리 읽어서 백그라운드 작업으로 전달
background_tasks.add_task(save_file,file.filename, file_content)
bacgroound_tasks.add_task(log_file_upload, file.filename, db)
return {"msg" : " 파일 업로드 중 ..."}
LargeBinary
)from fastapi import FastAPI, UploadFile, File, HTTPException
from sqlalchemy import Column, Integer, LargeBinary, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
app = FastAPI()
Base = declarative_base()
DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(bind=engine)
class FileData(Base):
__tablename__ = "files"
id = Column(Integer, primary_key=True, autoincrement=True)
data = Column(LargeBinary)
Base.metadata.create_all(bind=engine)
@app.post("/upload/")
async def upload_file(file: UploadFile = File(...)):
db: Session = SessionLocal()
file_content = await file.read()
new_file = FileData(data=file_content)
db.add(new_file)
db.commit()
db.refresh(new_file)
db.close()
return {"file_id": new_file.id}
@app.get("/download/{file_id}")
async def download_file(file_id: int):
db: Session = SessionLocal()
file_data = db.query(FileData).filter(FileData.id == file_id).first()
db.close()
if not file_data:
raise HTTPException(status_code=404, detail="File not found")
return {"file_content": file_data.data} # 실제 서비스에서는 파일을 반환하는 방식 사용
new_record = FileRecord(filename=filename, status="Saved", file = content)
media_type ="png/image"
를 넣어준다@app.get("/image/{image_id}")
async def get_image(image_id: int):
db: Session = SessionLocal()
image_data = db.query(ImageData).filter(ImageData.id == image_id).first()
db.close()
if not image_data:
raise HTTPException(status_code=404, detail="Image not found")
return Response(content=image_data.data, media_type="image/png")