brute_force with redis,django

Haks.·2025년 3월 30일
0

Study

목록 보기
63/65

Brute Force

브루트포스(Brute Force) 공격을 방어 하기위해
로그인 실패 횟수를 기억하고, 일정 횟수이상 실패하면 일시 차단하는 기능이 필요

django-axes

Django에서 로그인 시도 추적&차단을 해주는 오픈소스 패키지

기본 특징은

항목설명
자동 로그인 시도 기록DB에 로그인 실패 내역 저장
차단 정책 제공특정 IP, 계정 등 차단 기능
관리자 화면 지원차단 해제, 기록 조회 가능
DB 기반시도 기록은 DB에 저장됨(보통 RDB)

Redis + 커트서마이징 방식이 필요한 이유

이유설명
성능Redis는 메모리 기반이라 DB보다 발고, 요청 많은 시스템에서 유리함
확장성마이크로서비스, 서버리스 환경에서 공유 캐시로 활용 가능
가볍게 쓸 수 있음django-axes보다 의존성 적고, 내가 원하는 정책만 넣을 수 있음
유연한 정책 구현 가능IP + 이메일 기준, 로그인 성공 시 초기화, 제한 시간 자유 설정 등 완전 제어 가능
로그인 외에도 확장 가능이메일 인증, OTP 시도 제한 등 여러 곳에 활용 가능

Django 로그인 시도 제한 방식 비교: django-axes vs Redis + 커스터마이징

항목django-axesRedis + 커스터마이징
저장소DBRedis (메모리 기반)
성능낮음 (DB 쓰기 부하)높음 (빠른 응답, 경량 처리)
설정 난이도쉬움 (설치 + 세팅만 하면 됨)중간 (직접 코드 구현 필요)
커스터마이징제한적 (정해진 정책만 설정 가능)자유롭게 구현 가능 (IP, 이메일, TTL 등)
적용 유연성로그인 시도 제한에 국한됨로그인, OTP, 이메일 인증 등 다양하게 확장 가능
관리자 UIDjango Admin에 내장없음 (원하면 직접 구현 가능)
보안 정책 제어일부 가능 (설정으로 조절)완전한 제어 가능 (개발자 마음대로)
추가 기능자동 차단 해제, 알림 등 제공직접 구현해야 함

💻 커스터 마이징 redis + django 구현 예

import os
import redis
from rest_framework.exceptions import PermissionDenied

if os.getenv("DOCKER_ENV", "false").lower() =="true":
    REDIS_HOST = os.getenv("REDIS_HOST", "redis")
else:
    REDIS_HOST = "localhost"

r = redis.Redis(host=REDIS_HOST, port=6379, db=0, decode_responses=True)

def get_login_attempt_key(key):
    return f"login_attempt_{key}"

def check_login_attempt_key(key, limit=5, block_time = 300):
    redis_key = get_login_attempt_key(key)
    attempts = r.get(redis_key)

    if attempts and int(attempts) >= limit:
        raise PermissionDenied(
            detail="로그인 시도 횟수를 초과했습니다. 잠시후 다시 시도하세요",
            code="Too_much_attempts"
        )
    pipe = r.pipeline()
    pipe.incr(redis_key)
    pipe.expire(redis_key, block_time)
    pipe.execute()

def reset_login_attempt(key):
    redis_key = get_login_attempt_key(key)
    r.delete(redis_key)

Redis pipline() 메서드 정리

메서드설명
set(key, value)키에 값 저장
get(key)키의 값 가져오기
incr(key)키의 값을 1 증가
decr(key)키의 값을 1 감소
expire(key, seconds)키의 만료 시간 설정 (초 단위)
delete(key)키 삭제
exists(key)키 존재 여부 확인 (1 또는 0 반환)
ttl(key)키의 TTL(남은 시간) 확인 (초 단위)
hset(name, key, value)해시(hash)에 값 저장
hget(name, key)해시에서 값 가져오기
sadd(key, value)Set 자료형에 값 추가
smembers(key)Set 전체 값 조회
  • redis.Redis(host="host", port=6379, db = 0, decode=response=True) decode => 기본적으로 UTF-8 로 디코딩해서 사용하기 위해

Login API 적용

class UserLoginView(APIView):
    @swagger_auto_schema(
        security=[{"Bearer": []}],
        request_body=UserLoginSerializer,
        responses={
            200: UserLoginSerializer,
            400: openapi.Response(
                description=(
                    "잘못된 요청 시 응답\n"
                    "- `code`:`missmatch` , 이메일 또는 비밀번호 불일치.\n"
                ),
            ),
            403: openapi.Response(
                description=(
                    "- `code`:`not_verified`, 인증되지 않은 이메일\n"
                    "- `code`:`Too_much_attempts`, 로그인 시도횟수 5회 초과 실패 5분 간 불가"
                )
            ),
        },
    )
    def post(self, request):
        email = request.data.get("email")
        password = request.data.get("password")

        check_login_attempt_key(email)

        user = authenticate(email=email, password=password)
        if not user:
            return Response(
                {
                    "error": "이메일 또는 비밀번호가 올바르지 않습니다.",
                    "code": "missmatch",
                },
                status=status.HTTP_400_BAD_REQUEST,
            )

        if not user.email_verified:
            return Response(
                {
                    "error": "이메일 인증이 완료되지 않았습니다. 이메일을 확인해주세요.",
                    "code": "not_verified",
                },
                status=status.HTTP_403_FORBIDDEN,
            )

        refresh = RefreshToken.for_user(user)
        access_token = str(refresh.access_token)

        store_access_token(user.id, access_token, 3600)
        store_refresh_token(user.id, str(refresh), 86400)

        # activity log 추가 = 로그인
        ActivityLog.objects.create(
            user_id=user,
            action="LOGIN",
            ip_address=get_client_ip(request),
        )

        reset_login_attempt(email)

        return Response(
            {
                "access_token": access_token,
                "refresh_token": str(refresh),
                "token_type": "Bearer",
                "expires_in": 3600,
            },

0개의 댓글

관련 채용 정보