FastAPI WebSocket

Haks.·2025년 2월 18일
0

How to use

목록 보기
31/32

웹 소켓(Web Socket)

  • 클라이언트와 서버 간 양방향 통신을 지원하는 프로토콜
  • HTTP 요청과 다르게 한 번 연결되면 지속적으로 데이터 송수신 가능
    • 클라이언트가 요청하지 않아도 서버가 데이터를 보낼 수 있음 (Push
      방식 지원)
  • ws:// 또는 wss://(보안) 방식 활용(web socket security)
  • 실시간 통신

필요 이유

= 기존 HTTP 방식의 문제점

  • 실시간성 부족
    • HTTP 요청-응답 방식은 클라이언트가 요청해야만 서버가 응답 가능
    • 실시간 알림, 채팅 등에는 비효율적
  • 풀링(Pulling)과 긴 연결(Long Polling)의 한계
    • 풀링: 클라이언트가 일정 시간마다 서버에 요청 (과부하 문제)
    • 긴 연결: 클라이언트가 연결을 유지하며 서버 응답을 기다림 (비효율적)
특징HTTPWebSocket
연결 방식요청-응답 방식지속적인 연결 유지
통신 방식클라이언트가 요청하면 서버가 응답서버와 클라이언트가 자유롭게 송수신 가능
활용 사례REST API, 정적 페이지 요청실시간 채팅, 알림 시스템, 주식 데이터 스트리밍

WebSocket 프로토콜 구조

pip3 install 'uvicorn[standard]'

  • Handshake
    • HTTP 요청을 통해 WebSocket 연결 수립 (이 때 Upgrade: websocket헤더 사용)
    • 기존 HTTP 요청을 WebSOcket통신으로 전환하는 과정
    • 클라이언트와 서버간에 WebSocket 연결을 수립하기 위해 수행하는 초기 단계를 의미
    1. 클라이언트가 서버에 WebSocket 요청을 보냄 (Upgrade 요청
    2. 서버가 WebSocket 연결을 허용하면 응답 (101 Switching Protocols)
    3. WebSocket 연결이 수립되고, 지속적인 데이터 통신 가능
  • 메시지 송수신 (Message Exchange)
    • 텍스트 또는 바이너리 메시지 교환
  • 연결 종료(Connection Close)
    • 클라이언트 또는 서버가 연결 종료

🧑‍💻 서버 구현

from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws") # 보편적으로 /ws로 url을 지정
async def websocket_endpoint(websocket: WebSocket): # 실시간으로 여러 요청을 처리하기에 비동기가 적합
	await websocket.accept() # WebSocket 연결 수락
    while True: 서버를 게속 유지하기위해 (돌리기 위해)
    	data = await websocket.receive_text() # 클라이언트 메세지 수신
        await websocket.send_text(f"서버 응답: {data}") # 클라이언트에게 응답
  • postman 설정



WebSocket 서버 구현 - Disconnect 1000

  • Starlette.websockets.WebSocketDisconnect: (1000, ‘’)
    • 에러코드 1000: WebSocket이 정상적으로 종료됨
    • 클라이언트(예: Postman, 브라우저)가 WebSocket 연결을 닫으면
      서버에서 발생하는 오류
  • 해결 방법: try-except을 추가하여 예외 처리
    • fastapi에서 WebSocketDisconnect을 import 후, except 처리

from fastapi import FastAPI, WebSocket, WebSocketDisconnect


app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"서버 응답 : {data}")
    except WebSocketDisconnect:
        print("클라이언트 연결 종료")

WebSocket을 활용한 실시간 기능

  • 대표적인 서비스 사례
    • 실시간 채팅: 카카오톡, 슬랙, 디스코드 등
    • 실시간 알림 시스템: SNS 알림, 주문 처리 상태 알림 등
    • 실시간 주식 데이터 스트리밍: 증권 거래소 API
    • 실시간 협업 기능: Google Docs같은 실시간 편집

실시간 알림 시스템 구현

  • 기본 개념
    • 클라이언트(Web, Mobile 등)가 서버와 WebSocket 연결을 맺음
    • 서버가 특정 이벤트(예: 주문 완료, 메시지 도착, 좋아요 등) 감지
    • 서버가 연결된 모든 클라이언트에게 즉시 알림 전송
  • 시스템 구성도
  • 여러 클라이언트가 WebSocket을 통해 서버에 연결될 수 있도록 구성
    • ConnectionManager 클래스를 만들어 연결 관리
    • 사용자가 연결을 끊으면 자동으로 제거됨
class ConnectionManager:
    """WebSocket 연결관리"""
    def __init__(self): # 연결된 사용자 리스트
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        """ 클라이언트가 websocket 연결을 요청하면 리스트에 추가 """
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: str):
        """ 모든 연결된 클라이언트에게 메세지 전송 """
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()


# 특정 요청을 감지하면 서버가 사용자에게 전달
  • 특정 사용자에게만 1:1 알림 보내기

from typing import Dict, List
from fastapi import FastAPI, WebSocket, WebSocketDisconnect


app = FastAPI()

class ConnectionManager:
    def __init__(self):
        self.active_connections: Dict[str, WebSocket] = {}
    
    async def connect(self, websocket: WebSocket, username: str):
        """ 특정 사용자의 WebSocket 연결 관리 """
        await websocket.accept()
        self.active_connections[username] = websocket
    
    def disconnect(self, username: str):
        """ 사용자가 연결을 끊으면 제거 """
        if username in self.active_connections:
            del self.active_connections[username]
    
    async def send_private_message(self, username: str, message: str):
        """ 특정 사용자에게 메세지 전송 """
        if username in self.active_connections:
            await self.active_connections[username].send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/{username}")
async def websocket_endpoint(username: str,websocket: WebSocket):
    """ 클라이언트가 /ws 경로로 WebSocket 연결 요청 """
    await manager.connect(websocket, username)
    
    try:
        while True:
            data = await websocket.receive_text()
            await manager.send_private_message(username, f"📢 개인 메시지: {data}")
    except WebSocketDisconnect:
        manager.disconnect(username)

@app.post("/send-message/{username}")
async def send_message(username: str, message: str):
    """ 특정 사용자에게 메시지를 전송하는 API (Postman 테스트용) """
    await manager.send_private_message(username, message)
    return {"message": f"📢 {username}에게 메시지 전송 완료!"}
  • postman

WebSocket 보안 강화

  • 보안 프로토콜 적용 (wss://)
  • DDos 공격 방어
    • 디도스: 고의로 접속량을 폭주시켜 서버를 마비시키는 것
    • IP 기반 연결 제한
    • Rate Limiting: 일정 시간 내 특정 요청 횟수 초과 시 차단
    • Cloudflare, AWS WAF 같은 방화벽 솔루션 활용

WebSocket 성능 최적화

  • WebSocket Keep-Alive 설정
    • 기본적으로 장시간 연결을 유지하므로 서버 부하 발생 가능
    • Ping/Pong 메시지를 사용하여 비활성 연결 종료
    • 특정 시간마다 클라이언트와 서버 간 메시지 송수신
while Ture:
	await websocket.send_text("ping")
    await asyncio.sleep(30) # 30초마다 ping 전송
  • 로드 밸런싱 Load Balancing
    • 하나의 서버가 많은 요청을 처리하면 서버 과부하 발생
    • 여러 개의 서버를 사용하여 트래픽을 분산
    • 클라이언트 요청을 여러 서버 중 하나로 자동으로 분배

0개의 댓글

관련 채용 정보