Multi-Threading / Multi-Processing
오늘의 주제는 멀티스레딩과 멀티프로세싱이다. 그간 프로젝트를 진행하면서 아름아름 알아가고 사용해보았지만, 다시 한번 진득하게 공부해보고 싶어 주제로 선택하게 되었다.
멀티스레딩(Multithreading) 은 하나의 프로세스 내부에서 여러 스레드를 생성해 동시에 실행하는 방식으로, 프로세스의 메모리 공간을 공유하면서 작업을 병렬로 처리할 수 있다.
멀티프로세싱(Multiprocessing) 은 여러 프로세스를 생성해 독립적으로 실행하는 방식으로, 각 프로세스는 별도의 메모리 공간을 사용한다.
threading
모듈을 사용하여 구현multiprocessing
모듈을 사용하여 구현스레드와 프로세스는 모두 병렬 처리를 수행하기 위한 실행 단위이지만, 메모리 사용 방식과 독립성에서 큰 차이가 있다.
특징 | 스레드 | 프로세스 |
---|---|---|
독립성 | 동일한 프로세스 내부에서 실행, 메모리 공유 | 별도의 메모리 공간 사용, 완전히 독립적 |
메모리 사용 | 스택만 독립적으로 사용, 나머지는 공유 | 각각 독립된 메모리 공간 할당 |
통신 | 간단하며 빠름 (메모리 공유) | 느림 (IPC 기법 필요, 예: Queue, Pipe) |
운영체제 리소스 | 가벼움 (한 프로세스 내에서 관리) | 무겁고 리소스 소모가 큼 |
응용 사례 | I/O 바운드 작업 | CPU 바운드 작업 |
Python에서의 제약 | GIL(Global Interpreter Lock)로 인해 성능 제한 | 프로세스 간 독립 실행으로 GIL의 제약 없음 |
병렬 처리는 현대 CPU 구조에서 지원하는 멀티코어 아키텍처 덕분에 가능하다.
Python의 multiprocessing
을 사용한 간단한 병렬 처리 예제. 4개의 프로세스가 동시에 실행되어 작업 시간이 병렬로 분배되며, 총 작업 시간이 단축된다.
from multiprocessing import Pool
import time
def square(n):
time.sleep(1) # 작업 지연 시뮬레이션
return n * n
if __name__ == "__main__":
with Pool(4) as pool: # 4개의 프로세스 생성
results = pool.map(square, range(10))
print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
GIL(Global Interpreter Lock) 은 Python 인터프리터에서 동시에 실행되는 스레드가 하나의 바이트코드만 실행할 수 있도록 제한하는 메커니즘이다. Python의 기본 구현인 CPython에서 주로 사용되며, 메모리 관리와 데이터 무결성을 보장하기 위해 설계되었다.
GIL로 인한 동시 실행 제한
CPU 바운드 작업에서의 비효율성
I/O 바운드 작업에서의 효율성
멀티스레딩은 Python의 threading
모듈을 사용해 구현한다.
import threading
import time
def worker(task_id):
print(f"Task {task_id} started")
time.sleep(2) # 작업 지연
print(f"Task {task_id} finished")
threads = []
for i in range(3):
thread = threading.Thread(target=worker, args=(i,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
Task 0 started
Task 1 started
Task 2 started
Task 0 finished
Task 1 finished
Task 2 finished
I/O 바운드 작업
배경 작업 처리
경량 작업
병렬 처리 성능 제한
데이터 경쟁과 데드락
with lock
구문을 사용하여 하나의 스레드만 counter
변수에 접근 가능하도록 보장import threading
lock = threading.Lock()
def safe_increment(counter):
with lock: # 자원을 안전하게 보호
counter[0] += 1
counter = [0]
threads = [threading.Thread(target=safe_increment, args=(counter,)) for _ in range(10)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print(counter[0]) # 출력: 10
threading
모듈 소개Python의 threading
모듈은 멀티스레딩 작업을 수행할 수 있도록 설계된 표준 라이브러리이다. 이 모듈을 사용하면 여러 스레드를 생성하고 관리하며, 동시 실행 작업을 처리할 수 있다.
Thread
클래스: 스레드를 생성하고 실행하는 데 사용start()
, join()
, is_alive()
Lock
클래스: 스레드 간 데이터 충돌을 방지하기 위한 잠금 메커니즘 제공Condition
클래스: 스레드 간 신호 및 상태를 관리하는 데 사용current_thread()
함수: 현재 실행 중인 스레드 객체 반환Python의 threading.Thread
클래스를 사용하여 스레드를 생성하고 실행하는 방법은 다음과 같다.
import threading
import time
def print_numbers():
for i in range(5):
print(f"Number: {i}")
time.sleep(1)
# 스레드 생성
thread = threading.Thread(target=print_numbers)
# 스레드 시작
thread.start()
# 메인 스레드 작업
print("Main thread is running.")
# 스레드가 종료될 때까지 대기
thread.join()
print("Thread finished.")
Main thread is running.
Number: 0
Number: 1
Number: 2
Number: 3
Number: 4
Thread finished.
thread.start()
메서드는 스레드를 시작하고, thread.join()
은 해당 스레드가 종료될 때까지 기다린다.class PrintNumbersThread(threading.Thread):
def run(self):
for i in range(5):
print(f"Class-based Number: {i}")
time.sleep(1)
thread = PrintNumbersThread()
thread.start()
thread.join()
Thread
클래스를 상속받아 사용자 정의 스레드를 생성run
메서드에 작업 내용을 정의하여, 스레드의 주요 동작을 구현한다.멀티스레딩 환경에서는 공유 자원에 여러 스레드가 동시에 접근하면서 데이터 충돌 또는 데드락(Deadlock) 이 발생할 수 있다. 이를 방지하기 위해 Lock
객체를 사용한다.
import threading
lock = threading.Lock()
counter = 0
def increment():
global counter
for _ in range(100000):
with lock: # 공유 자원 보호
counter += 1
threads = [threading.Thread(target=increment) for _ in range(2)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print(f"Final counter value: {counter}")
Final counter value: 200000
with lock
을 사용하면 하나의 스레드만 counter
에 접근 가능RLock(Reentrant Lock)
을 사용해 데드락 문제를 완화할 수 있다.rlock = threading.RLock()
def task():
with rlock:
with rlock: # 동일한 스레드에서 재진입 가능
print("Task executed safely.")
thread = threading.Thread(target=task)
thread.start()
thread.join()
Producer-Consumer 패턴 은 한쪽에서 데이터를 생성(Producer)하고, 다른 쪽에서 이를 소비(Consumer)하는 멀티스레딩 작업에서 자주 사용되는 패턴이다. Python에서는 queue.Queue
를 사용해 간단히 구현할 수 있다.
import threading
import queue
import time
q = queue.Queue()
def producer():
for i in range(5):
item = f"Item {i}"
q.put(item) # 아이템 생성
print(f"Produced: {item}")
time.sleep(1)
def consumer():
while True:
item = q.get() # 아이템 소비
if item is None: # 종료 신호
break
print(f"Consumed: {item}")
q.task_done()
# 스레드 생성
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
q.put(None) # 종료 신호 전달
consumer_thread.join()
Produced: Item 0
Consumed: Item 0
Produced: Item 1
Consumed: Item 1
Produced: Item 2
Consumed: Item 2
...
queue.Queue
를 사용해 생산자와 소비자 간 데이터 교환을 안전하게 관리task_done
과 join
을 사용해 작업 완료를 추적스레드 풀 은 일정한 수의 스레드를 미리 생성하여 작업을 분산 처리하는 방식이다. Python에서는 concurrent.futures.ThreadPoolExecutor
를 사용한다.
from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
print(f"Task {n} started.")
time.sleep(2)
print(f"Task {n} finished.")
return n * 2
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(task, i) for i in range(5)]
results = [future.result() for future in futures]
print(f"Results: {results}")
Task 0 started.
Task 1 started.
Task 2 started.
Task 0 finished.
Task 1 finished.
Task 2 finished.
...
Results: [0, 2, 4, 6, 8]
ThreadPoolExecutor
는 최대 3개의 스레드를 생성하여 작업을 병렬로 실행future.result()
를 통해 수집 가능멀티프로세싱(Multiprocessing)은 여러 프로세스를 생성해 병렬로 실행하는 기법으로, 각 프로세스는 독립적인 메모리 공간을 사용한다. 이는 멀티스레딩과는 다른 메모리 관리 방식으로, 다음과 같은 특징과 장점을 가진다.
독립된 메모리 공간
안전성 강화:
GIL(Global Interpreter Lock) 회피
데이터 보호
프로세스 충돌의 국지화
multiprocessing
모듈 개요Python의 multiprocessing
모듈은 멀티프로세싱 작업을 쉽게 구현할 수 있도록 설계된 표준 라이브러리이다.
Process
클래스
start()
, join()
, is_alive()
Pool
클래스
IPC(Inter-Process Communication)
Lock 및 Semaphore
from multiprocessing import Process
def worker(task_id):
print(f"Task {task_id} is running.")
if __name__ == "__main__":
processes = [Process(target=worker, args=(i,)) for i in range(5)]
for p in processes:
p.start()
for p in processes:
p.join()
Task 0 is running.
Task 1 is running.
Task 2 is running.
Task 3 is running.
Task 4 is running.
Process
객체를 사용하여 독립적인 프로세스를 생성멀티프로세싱 환경에서 데이터 공유가 필요한 경우, Python은 IPC(Inter-Process Communication) 를 지원한다. 주로 Queue
, Pipe
, Manager
를 사용하여 프로세스 간 데이터를 교환한다.
multiprocessing.Queue
는 프로세스 간 데이터를 안전하게 전달하기 위한 FIFO(First-In-First-Out) 큐이다.
from multiprocessing import Process, Queue
def producer(q):
for i in range(5):
q.put(f"Data {i}")
print(f"Produced: Data {i}")
def consumer(q):
while not q.empty():
data = q.get()
print(f"Consumed: {data}")
if __name__ == "__main__":
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p1.join() # Producer 완료 후 Consumer 시작
p2.start()
p2.join()
Produced: Data 0
Produced: Data 1
Produced: Data 2
Produced: Data 3
Produced: Data 4
Consumed: Data 0
Consumed: Data 1
Consumed: Data 2
Consumed: Data 3
Consumed: Data 4
Queue
는 프로세스 간 데이터를 안전하게 전달하며, 데이터 소비 후 큐에서 제거multiprocessing.Pipe
는 두 프로세스 간 양방향 데이터 통신을 지원하는 간단한 방법이다.
from multiprocessing import Process, Pipe
def sender(conn):
conn.send("Hello from sender!")
conn.close()
def receiver(conn):
message = conn.recv()
print(f"Received: {message}")
if __name__ == "__main__":
parent_conn, child_conn = Pipe()
p1 = Process(target=sender, args=(child_conn,))
p2 = Process(target=receiver, args=(parent_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
Received: Hello from sender!
Pipe
는 간단한 데이터 전송에 적합하며, 양방향 통신도 가능하다.multiprocessing.Manager
는 여러 프로세스 간 데이터 구조(예: 리스트, 딕셔너리)를 공유할 수 있도록 지원한다.
from multiprocessing import Process, Manager
def worker(shared_list):
shared_list.append("Task Done")
if __name__ == "__main__":
with Manager() as manager:
shared_list = manager.list() # 공유 리스트 생성
processes = [Process(target=worker, args=(shared_list,)) for _ in range(3)]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"Shared List: {list(shared_list)}")
Shared List: ['Task Done', 'Task Done', 'Task Done']
Manager
는 동기화된 데이터 구조를 제공하여, 여러 프로세스 간 안전하게 데이터 공유 가능기법 | 특징 | 적합한 상황 |
---|---|---|
Queue | FIFO 방식, 데이터 안전성 보장 | 다수의 프로세스가 데이터를 교환할 때 |
Pipe | 양방향 통신, 간단한 데이터 전송 | 두 프로세스 간 데이터 교환이 필요한 경우 |
Manager | 동기화된 데이터 구조 제공 | 복잡한 데이터 구조를 여러 프로세스가 공유할 때 |
멀티프로세싱의 기본은 단일 작업을 병렬로 처리하여 성능을 극대화하는 것이다. 예를 들어, 수학적 계산 작업을 여러 프로세스에 분배하면 작업 시간이 크게 단축된다.
from multiprocessing import Process
def square(n):
print(f"Processing {n}: Result = {n * n}")
if __name__ == "__main__":
numbers = [1, 2, 3, 4]
processes = []
for number in numbers:
process = Process(target=square, args=(number,))
processes.append(process)
process.start()
for process in processes:
process.join()
Processing 1: Result = 1
Processing 2: Result = 4
Processing 3: Result = 9
Processing 4: Result = 16
워커 프로세스는 반복적인 작업을 병렬로 처리할 때 유용하다. 워커는 주어진 작업을 병렬로 실행하며, 작업의 효율성을 극대화한다.
from multiprocessing import Process, Queue
def worker(task_queue):
while not task_queue.empty():
task = task_queue.get()
print(f"Processed: {task.upper()}")
if __name__ == "__main__":
tasks = ["apple", "banana", "cherry", "date"]
task_queue = Queue()
for task in tasks:
task_queue.put(task)
processes = [Process(target=worker, args=(task_queue,)) for _ in range(2)]
for process in processes:
process.start()
for process in processes:
process.join()
Processed: APPLE
Processed: BANANA
Processed: CHERRY
Processed: DATE
멀티프로세싱 환경에서 데이터를 공유하려면 Shared Memory 또는 multiprocessing.Manager
를 사용해야 한다.
from multiprocessing import Process, Manager
def worker(shared_list):
for i in range(5):
shared_list.append(i)
print(f"Appended {i}")
if __name__ == "__main__":
with Manager() as manager:
shared_list = manager.list()
processes = [Process(target=worker, args=(shared_list,)) for _ in range(3)]
for process in processes:
process.start()
for process in processes:
process.join()
print(f"Shared List: {list(shared_list)}")
Appended 0
Appended 1
Appended 2
Appended 3
Appended 4
...
Shared List: [0, 1, 2, ..., 4, 0, 1, 2, ..., 4]
Manager.list()
를 사용하여 프로세스 간 데이터 공유를 안전하게 구현Map-Reduce 패턴은 병렬 처리를 통해 대량의 데이터를 분산 처리하고, 결과를 집계하는 데 사용된다.
from multiprocessing import Pool
def square(n):
return n * n
if __name__ == "__main__":
numbers = range(10)
with Pool(processes=4) as pool:
results = pool.map(square, numbers)
print(f"Squared Results: {results}")
print(f"Sum of Squares: {sum(results)}")
Squared Results: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Sum of Squares: 285
Pool.map
메서드를 사용하여 입력 데이터를 병렬로 처리프로세스 풀은 여러 작업을 병렬로 실행하는 데 유용한 구조로, 작업 수에 따라 프로세스를 동적으로 관리한다.
from multiprocessing import Pool
def word_count(text):
return len(text.split())
if __name__ == "__main__":
texts = ["Hello World", "Python is great", "Multiprocessing is powerful"]
with Pool(3) as pool:
results = pool.map(word_count, texts)
print(f"Word Counts: {results}")
Word Counts: [2, 3, 3]
Pool
은 최대 3개의 프로세스를 생성하여 작업을 병렬로 처리from multiprocessing import Pool
def task(n):
return n * 2
if __name__ == "__main__":
with Pool(3) as pool:
results = [pool.apply_async(task, args=(i,)) for i in range(5)]
outputs = [result.get() for result in results]
print(f"Results: {outputs}")
Results: [0, 2, 4, 6, 8]
apply_async
메서드를 사용하여 작업을 비동기로 처리.get()
메서드를 호출해 결과를 가져온다.멀티스레딩과 멀티프로세싱의 선택은 작업의 특성에 따라 달라진다. 작업은 크게 I/O 바운드(I/O-Bound) 와 CPU 바운드(CPU-Bound) 로 나뉜다.
I/O 바운드 작업은 입출력 연산 (파일 읽기/쓰기, 네트워크 요청 등)에 의해 처리 속도가 제한되는 작업이다.
특징
예시
import threading
import time
def io_task(task_id):
print(f"Task {task_id} started.")
time.sleep(2) # I/O 대기 시간 시뮬레이션
print(f"Task {task_id} finished.")
threads = [threading.Thread(target=io_task, args=(i,)) for i in range(5)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
CPU 바운드 작업은 계산 연산 (복잡한 수학 계산, 데이터 처리 등)이 주를 이루는 작업이다.
특징
예시
from multiprocessing import Pool
import math
def compute_factorial(n):
return math.factorial(n)
if __name__ == "__main__":
numbers = [100000, 200000, 300000, 400000]
with Pool(4) as pool:
results = pool.map(compute_factorial, numbers)
print("Factorials computed.")
I/O 바운드와 CPU 바운드 작업에서 멀티스레딩과 멀티프로세싱의 성능을 비교해 보자.
import threading
import multiprocessing
import time
def io_task(task_id):
time.sleep(2)
def cpu_task(n):
total = 0
for i in range(n):
total += i
def benchmark(task, worker_count, args, use_multiprocessing):
start_time = time.time()
workers = []
if use_multiprocessing:
for _ in range(worker_count):
p = multiprocessing.Process(target=task, args=args)
workers.append(p)
p.start()
else:
for _ in range(worker_count):
t = threading.Thread(target=task, args=args)
workers.append(t)
t.start()
for worker in workers:
worker.join()
end_time = time.time()
print(f"Time taken: {end_time - start_time:.2f} seconds")
if __name__ == "__main__":
print("I/O Bound - Multithreading")
benchmark(io_task, 5, (0,), False)
print("I/O Bound - Multiprocessing")
benchmark(io_task, 5, (0,), True)
print("CPU Bound - Multithreading")
benchmark(cpu_task, 5, (10**6,), False)
print("CPU Bound - Multiprocessing")
benchmark(cpu_task, 5, (10**6,), True)
I/O Bound - Multithreading
Time taken: 2.00 seconds
I/O Bound - Multiprocessing
Time taken: 2.10 seconds
CPU Bound - Multithreading
Time taken: 8.50 seconds
CPU Bound - Multiprocessing
Time taken: 2.20 seconds
작업 유형 | 적합한 방법 | 예시 |
---|---|---|
I/O 바운드 | 멀티스레딩 | 웹 크롤링, 비동기 파일 처리 |
CPU 바운드 | 멀티프로세싱 | 머신러닝, 대규모 데이터 분석 |
독립적 작업 | 멀티프로세싱 | 비디오 렌더링, 병렬 데이터 처리 |
메모리 공유 필요 | 멀티스레딩 | 이벤트 처리, 상태 동기화 |
비동기 프로그래밍(Asynchronous Programming) 과 멀티스레딩(Multithreading) 은 병렬 처리와 유사한 목표(작업 시간 단축, 효율성 향상)를 가지고 있지만, 동작 원리와 적용 방식은 매우 다르다.
멀티스레딩은 여러 스레드를 생성하여 동시에 작업을 수행한다. 각 스레드는 독립적으로 실행되며, CPU와 메모리 리소스를 공유한다.
비동기 프로그래밍은 작업을 스케줄링하여 한 작업이 대기 상태에 있을 때 다른 작업을 실행한다. 단일 스레드에서 실행되며, 문맥 전환 없이 효율적으로 작업을 처리한다.
특징 | 멀티스레딩 | 비동기 프로그래밍 |
---|---|---|
동작 방식 | 여러 스레드에서 병렬로 작업을 실행 | 단일 스레드에서 이벤트 루프를 통해 작업 처리 |
동시 실행 여부 | 진정한 동시 실행 가능 | 동시 실행되지 않음 (작업을 협력적으로 전환) |
문맥 전환 | 스레드 간 문맥 전환으로 오버헤드 발생 | 문맥 전환 없이 이벤트 루프에서 처리 |
적합한 작업 | CPU 바운드 및 I/O 바운드 작업 | I/O 바운드 작업 (예: 네트워크 요청) |
복잡성 | 상태 동기화와 잠금 문제로 구현이 복잡 | 상대적으로 단순한 코드 구현 |
Python에서의 GIL 영향 | GIL로 인해 멀티스레딩 성능 제한 | GIL 영향을 받지 않음 (I/O 바운드 작업에서 효율적) |
asyncio
와의 차이점asyncio
는 Python에서 비동기 프로그래밍을 구현하기 위한 표준 라이브러리이다. 이는 멀티스레딩이나 멀티프로세싱과는 다른 방식으로 동시성을 처리한다.
단일 스레드, 단일 프로세스
asyncio
는 단일 스레드, 단일 프로세스에서 이벤트 루프를 사용해 동시 작업을 처리한다.비동기 함수
async
및 await
키워드를 사용하여 비동기 작업을 정의하고 실행한다.콜백 기반 처리
asyncio
와 멀티스레딩의 차이특징 | asyncio | 멀티스레딩 |
---|---|---|
동작 구조 | 이벤트 루프를 사용하여 작업 스케줄링 | 여러 스레드가 병렬로 작업 수행 |
사용 자원 | 단일 스레드, 메모리 사용량 적음 | 여러 스레드, 메모리 사용량 많음 |
복잡성 | 상대적으로 간단, async /await 사용 | 잠금(Lock) 및 동기화 코드 필요 |
적합한 작업 | I/O 대기 작업 (예: HTTP 요청) | 병렬로 실행할 CPU 집약 작업 |
성능 | GIL 영향을 받지 않음 (I/O 바운드 작업) | GIL로 인해 성능 제한 |
asyncio
를 사용한 비동기 작업import asyncio
async def download_file(file_id):
print(f"Downloading file {file_id}...")
await asyncio.sleep(2) # I/O 대기 시뮬레이션
print(f"File {file_id} downloaded.")
async def main():
tasks = [download_file(i) for i in range(3)]
await asyncio.gather(*tasks)
asyncio.run(main())
import threading
import time
def download_file(file_id):
print(f"Downloading file {file_id}...")
time.sleep(2) # I/O 대기 시뮬레이션
print(f"File {file_id} downloaded.")
threads = [threading.Thread(target=download_file, args=(i,)) for i in range(3)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
asyncio
는 코드가 직관적이고 간결하며, 단일 스레드에서 실행되므로 메모리 사용량이 적다.작업의 특성에 따라 멀티스레딩, 멀티프로세싱, 비동기 프로그래밍 중 적합한 기술을 선택해야 한다.
asyncio
또는 멀티스레딩asyncio
asyncio
시나리오 | 적합한 기술 | 이유 |
---|---|---|
웹 크롤링 | asyncio | 많은 네트워크 요청을 효율적으로 처리 가능 |
이미지 처리 | 멀티프로세싱 | CPU 집약 작업으로, GIL을 우회하여 병렬 처리 |
파일 다운로드 | 멀티스레딩 | I/O 대기 시간이 많으며, 상태 동기화가 필요 없. |
대규모 데이터 분석 | 멀티프로세싱 | 다중 코어를 활용하여 계산 작업 병렬화 |
실시간 이벤트 처리 | asyncio | 이벤트 루프를 사용하여 응답성과 효율성 극대화 |
Deadlock은 두 개 이상의 스레드 또는 프로세스가 서로 자원을 점유하고 있으며, 서로의 자원이 해제되기를 기다리며 무한 대기에 빠지는 상태를 의미한다.
Race Condition은 두 개 이상의 스레드가 동일한 자원에 동시에 접근하려고 하여, 결과가 실행 순서에 따라 달라지는 문제를 의미한다.
import threading
counter = 0
def increment():
global counter
for _ in range(1000000):
counter += 1
threads = [threading.Thread(target=increment) for _ in range(2)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print(f"Final Counter: {counter}")
Final Counter: 1689537 (예상값 2000000과 다름)
threading.Lock
을 사용하여 공유 자원을 보호import threading
lock = threading.Lock()
counter = 0
def increment():
global counter
for _ in range(1000000):
with lock: # Lock으로 자원 보호
counter += 1
threads = [threading.Thread(target=increment) for _ in range(2)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print(f"Final Counter: {counter}") # 출력: 2000000
멀티프로세싱은 각 프로세스가 독립적인 메모리 공간을 사용하지만, 프로세스 간 데이터를 공유할 경우 적절한 검증과 보호가 필요하다.
from multiprocessing import Process, Queue
def worker(queue):
while not queue.empty():
data = queue.get()
print(f"Processing: {data}")
if __name__ == "__main__":
q = Queue()
for i in range(5):
q.put(i)
processes = [Process(target=worker, args=(q,)) for _ in range(2)]
for p in processes:
p.start()
for p in processes:
p.join()
Lock과 RLock 사용
상태 동기화
Condition
또는 Semaphore
를 사용하여 작업 흐름을 제어Deadlock 방지
데이터 검증
독립된 자원 관리
관리자 객체 사용
multiprocessing.Manager
를 활용해 공유 데이터 구조를 동기화PyLint
나 ThreadSanitizer
와 같은 정적 분석 도구를 사용해 스레드/프로세스 오류 감지ETL(Extract, Transform, Load) 작업은 대규모 데이터를 추출, 변환, 적재하는 과정에서 멀티스레딩과 멀티프로세싱의 활용도가 매우 높다.
대형 금융 기관은 일일 거래 데이터를 분석하기 위해 ETL 작업을 수행한다. 이러한 작업에서는 대규모 데이터베이스에서 데이터를 추출하고, 여러 포맷으로 저장된 데이터를 정규화한 후, 분석 시스템에 적재한다.
from multiprocessing import Pool
import pandas as pd
def process_csv(file_name):
data = pd.read_csv(file_name)
# 데이터 정규화 작업 수행
data['processed'] = data['value'] * 2
data.to_csv(f"processed_{file_name}", index=False)
if __name__ == "__main__":
files = ["data1.csv", "data2.csv", "data3.csv"]
with Pool(processes=3) as pool:
pool.map(process_csv, files)
웹 크롤링(Web Crawling) 은 대규모 웹 데이터를 수집하는 작업에서 멀티스레딩과 비동기 처리가 필수적이다.
검색 엔진은 웹 페이지의 내용을 크롤링하고 색인화하기 위해 수천 개의 URL을 동시 처리해야 한다. 여기서 멀티스레딩은 네트워크 요청을 병렬로 실행하여 처리 속도를 크게 향상시킨다.
import threading
import requests
def fetch_url(url):
response = requests.get(url)
print(f"Fetched {url} with status {response.status_code}")
if __name__ == "__main__":
urls = ["https://example.com", "https://example.org", "https://example.net"]
threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
머신러닝 모델 학습 은 대규모 데이터를 반복적으로 처리하고 복잡한 연산을 수행하기 때문에 CPU 또는 GPU 기반 병렬 처리가 필수적이다.
컴퓨터 비전 프로젝트에서 이미지 데이터셋을 학습하는 경우, 멀티프로세싱을 통해 데이터 로딩과 학습 단계를 병렬화하여 학습 속도를 개선한다.
from multiprocessing import Pool
import cv2
def preprocess_image(file_name):
image = cv2.imread(file_name)
processed = cv2.resize(image, (128, 128))
cv2.imwrite(f"processed_{file_name}", processed)
if __name__ == "__main__":
image_files = ["img1.jpg", "img2.jpg", "img3.jpg"]
with Pool(processes=3) as pool:
pool.map(preprocess_image, image_files)
네트워크 서버와 클라이언트 는 여러 요청을 동시에 처리해야 하기 때문에 멀티스레딩과 멀티프로세싱이 많이 사용된다.
대규모 사용자 기반의 채팅 애플리케이션은 사용자의 메시지 전송과 수신을 실시간으로 처리해야 한다. 이를 위해 멀티스레딩 또는 비동기 I/O가 주로 사용된다.
import socket
import threading
def handle_client(client_socket):
while True:
message = client_socket.recv(1024).decode()
if not message:
break
print(f"Received: {message}")
client_socket.send(f"Echo: {message}".encode())
client_socket.close()
if __name__ == "__main__":
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("0.0.0.0", 9999))
server.listen(5)
print("Server listening on port 9999.")
while True:
client, addr = server.accept()
print(f"Accepted connection from {addr}")
client_handler = threading.Thread(target=handle_client, args=(client,))
client_handler.start()
작업 스케줄링과 로드 밸런싱은 멀티스레딩 및 멀티프로세싱의 효율성을 극대화하기 위한 핵심 전략이다. 작업 스케줄링은 작업을 프로세스나 스레드에 적절히 분배하여 병목현상을 방지하며, 로드 밸런싱은 시스템 리소스(CPU, 메모리)의 사용을 균등하게 유지한다.
Static Scheduling: 작업을 시작하기 전에 고정된 방식으로 작업을 스레드 또는 프로세스에 할당
Dynamic Scheduling: 작업 실행 중에 작업을 동적으로 분배
from multiprocessing import Pool
import random
def process_task(task_id):
work = random.randint(1, 5)
print(f"Task {task_id} processing for {work} seconds")
time.sleep(work)
return f"Task {task_id} completed"
if __name__ == "__main__":
tasks = range(10)
with Pool(processes=4) as pool:
results = pool.map(process_task, tasks)
print(results)
하이브리드 접근은 멀티스레딩과 멀티프로세싱의 장점을 결합하여, 복잡한 애플리케이션의 성능을 극대화한다.
from multiprocessing import Process, Queue
import threading
import requests
def fetch_data(url, results):
response = requests.get(url)
results.append(response.text[:100]) # 첫 100자만 저장
def process_urls(queue):
while not queue.empty():
url = queue.get()
results = []
threads = [threading.Thread(target=fetch_data, args=(url, results)) for _ in range(3)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print(f"Results for {url}: {results}")
if __name__ == "__main__":
url_queue = Queue()
urls = ["https://example.com"] * 5
for url in urls:
url_queue.put(url)
processes = [Process(target=process_urls, args=(url_queue,)) for _ in range(2)]
for process in processes:
process.start()
for process in processes:
process.join()
Python은 GIL로 인해 고성능 병렬 처리가 제한적이지만, C 확장을 통해 병렬 처리 성능을 대폭 향상시킬 수 있다.
Cython
ctypes
C++ 확장(Pybind11)
# Cython 파일: parallel_sum.pyx
def parallel_sum(n):
cdef int i, total = 0
for i in range(n):
total += i
return total
from multiprocessing import Pool
from parallel_sum import parallel_sum
if __name__ == "__main__":
numbers = [10**6, 10**7, 10**8]
with Pool(3) as pool:
results = pool.map(parallel_sum, numbers)
print(results)
Python GIL(Global Interpreter Lock):
복잡성:
디버깅 난이도:
높은 메모리 사용량
프로세스 간 통신(IPC) 오버헤드
- 프로세스 간 데이터를 공유하거나 전달할 때 Queue나 Pipe를 사용하는데, 이 과정에서 성능 저하가 발생한다.
프로세스 생성 비용
Python의 GIL 문제를 해결하려는 다양한 대안들이 존재하며, 이들 중 일부는 GIL의 제약을 완화하거나 완전히 우회한다.
PyPy는 JIT(Just-In-Time) 컴파일러를 사용하여 Python 코드의 실행 속도를 크게 향상시키며, GIL 문제를 완화한다.
특징
한계
Jython은 Python 코드를 Java Virtual Machine(JVM)에서 실행할 수 있도록 변환한다.
특징
한계
Cython은 Python 코드를 C로 변환하여 컴파일함으로써 GIL 문제를 우회할 수 있다.
특징
한계
Python의 미래 버전(PEP 554)에서는 서브 인터프리터를 통해 각 인터프리터가 GIL을 독립적으로 관리하도록 계획 중이다.
특징
병렬화 프레임워크와 라이브러리
GPU 기반 병렬 처리
Serverless Computing
GIL 제거 노력:
하드웨어 발전과의 융합:
분산 시스템의 대중화:
장장 이틀에 걸쳐 작성한 주제가 드디어 끝이 났다. 일단 오늘까지 작성한 포스팅을 기점으로 2주간 재정비 및 파이널 프로젝트 준비로 데일리 CS를 잠시 쉬어갈 생각인데, 마무리 포스팅으로서 나름 무게감 있는 주제가 된 것 같아 기쁘다.
다만 주제가 좀 넓었던 만큼 양도 따라 늘어, 정말 이 주제가 깊숙히 체화되었는지는 잘 모르겠다. 가능하다면 추후 토이프로젝트를 만들어서라도 활용해보며 감을 잡아보고 싶다.