Skip to content

Python 멀티프로세싱: 프로세스, 풀, 큐

개요

Python 멀티프로세싱에 관한 이 포스팅에서는 Python의 멀티프로세싱 모듈을 사용하여 병렬 실행을 위한 프로세스를 생성하고 사용하는 방법을 설명한다. 또한 병렬 실행을 위한 풀과 맵을 사용하는 방법, 프로세스 간 통신을 위한 큐 사용 방법, 예외 처리와 프로세스 종료 방법도 설명한다.

Python은 데이터 분석, 머신 러닝, 웹 개발 등 많은 어플리케이션에 널리 사용되는 프로그래밍 언어이다. 하지만 Python은 동시 실행에 여러 CPU 코어를 사용하는 데 한계가 있다. 이는 여러 스레드가 Python 코드를 동시에 실행하지 못하게 하는 GIL(Global Interpreter Lock) 때문이다. 이는 멀티 코어 CPU를 가지고 있어도 Python 스레드에서는 CPU 능력을 충분히 활용할 수 없다는 것을 의미한다.

이러한 한계를 극복하기 위한 한 방법으로 Python 코드를 독립적으로 동시에 실행할 수 있도록 프로세스를 생성하고 실행할 수 있는 멀티프로세싱을 사용하는 것이다. 프로세스는 자체 메모리 공간과 리소스를 가진 프로그램의 인스턴스이다. 멀티프로세싱을 사용하면 여러 CPU 코어를 활용할 수 있고 계산 집약적인 작업의 속도를 높일 수 있다.

이 포스팅에서는 프로세스를 생성하고 관리하기 위한 높은 수준의 인터페이스를 제공하는 Python에서 멀티프로세싱 모듈을 사용하는 방법을 보일 것이다. 또한 프로세스 통신과 동기화, 풀과 맵, 큐, 예외 처리, 프로세스 종료 등 멀티프로세싱에 유용한 몇개념과 기술을 설명할 것이다.

이 글을 학습하면 다음 작업을 수행할 수 있다.

  • Process 클래스와 startJoin 메서드를 사용하여 프로세스를 만들고 실행한다.
  • 공유 메모리와 잠금 장치를 사용하여 프로세스 통신과 동기화를 수행한다.
  • 여러 입력에 걸쳐 함수를 병렬로 실행하기 위해 풀과 맵을 사용한다.
  • FIFO 원칙을 적용하여 프로세스 간 통신을 위한 큐를 사용한다.
  • terminateclose 메서드를 사용하여 예외를 처리하고 프로세스를 종료한다.

이 포스팅을 이해하려면 다음에 대한 약간의 지식이 필요하다.

  • Python과 데이터 분석에 대한 기본적인 이해
  • 시스템에 설치된 Python 3.x 인터프리터
  • Python 표준 라이브러리의 일부인 multiprocessing 모듈
  • 원하는 코드 편집기 또는 IDE(통합 개발 환경)

Pyton 멀티프로세싱을 공부할 준비가 되었나요? 시작합시다!

멀티프로세싱이란?

이 절을 통하여 멀티프로세싱이 무엇인지, 왜 Python 프로그래밍에 유용한지 이해하게 될 것이다. 프로세스, CPU, 코어, 스레드, GIL, 동시성 등 멀티프로세싱과 관련된 기본 개념과 용어도 설명할 것이다.

멀티프로세싱은 코드를 독립적으로 동시에 실행할 수 있는 프로세스를 여럿 생성하고 실행할 수 있는 프로그래밍 기법이다. 프로세스는 자체 메모리 공간과 리소스를 갖는 프로그램의 인스턴스이다. 각 프로세스는 하나 이상의 스레드를 가질 수 있으며, 이는 프로세스의 동일한 메모리 공간과 리소스를 공유하는 실행의 하위 단위이다. 스레드는 운영 체제가 스케줄링할 수 있는 최소 실행 단위이다.

CPU는 프로그램의 명령어를 실행하는 하드웨어 구성요소이다. CPU는 명령어를 동시에 실행할 수 있는 독립적인 단위인 하나 이상의 코어를 가질 수 있다. 코어는 하나 이상의 스레드를 가질 수 있으며, 이들은 하나의 코어가 여러 명령어를 동시에 실행할 수 있는 하드웨어 기능이다.

Python은 인터프리터 언어로 소스 코드를 바이트코드로 변환한 후 Python 인터프리터를 이용해 실행한다. Python 인터프리터는 단일 프로세스와 단일 스레드에서 실행되는 프로그램이다. 즉 기본적으로 Python은 CPU 코어가 몇 개 있는지에 관계없이 한 번에 하나의 CPU 코어만 사용할 수 있다는 것을 뜻한다.

이 한계는 여러 스레드가 동시에 Python 코드를 실행 못하도록 하는 메커니즘인 GIL(Global Interpreter Lock)에 기인한다. GIL은 주어진 시간에 하나의 스레드만 Python 인터프리터와 Python 객체에 접근할 수 있도록 보장한다. 이를 통해 메모리 관리가 단순해지고 데이터 손상을 피할 수 있지만 Python 프로그램의 성능과 확장성을 떨어뜨린다.

이러한 한계를 극복하기 위한 한 방법은 Python 코드를 독립적으로 동시에 실행할 수 있는 여러 프로세스를 생성하고 실행할 수 있는 멀티프로세싱을 이용하는 것이다. 멀티프로세싱을 이용하면 여러 CPU 코어를 활용할 수 있고 연산 집약적 작업의 속도를 높일 수 있다. 예를 들어, 4개의 독립적인 하위 작업으로 나눌 수 있는 작업이 있다면 4개의 프로세스를 생성하고 각 하위 작업을 하나의 프로세스에 할당할 수 있다. 그러면 4개의 CPU 코어에서 4개의 프로세스를 동시에 실행할 수 있어 실행 시간을 4배로 줄일 수 있다.

멀티프로세싱은 특히 데이터 분석, 머신 러닝, 웹 스크래핑 및 많은 양의 데이터를 처리하거나 복잡한 계산을 수행하는 어플리케이션에 유용하다. 또한 멀티프로세싱은 사용자의 입력과 출력을 처리하는 역할을 하는 프로그램의 메인 스레드가 차단되는 것을 피할 수 있도록 도와줄 수 있다. 무거운 작업을 별도의 프로세스에 위임함으로써 메인 스레드의 응답성을 유지하고 사용자 경험을 향상시킬 수 있다.

그러나, 머리프로세싱은 다음과 같은 단점과 과제도 가지고 있다.

  • 프로세스를 생성하고 실행하는 것은 스레드를 생성하고 실행하는 것보다 더 많은 리소스와 오버헤드를 소비한다.
  • 프로세스 간의 통신과 동기화는 스레드 간의 통신보다 더 어렵고 비용이 많이 든다.
  • 프로세스 간에 데이터와 객체를 공유하는 것은 더 복잡하고 직렬화와 병렬화를 필요로 한다.
  • 예외 처리와 종료 프로세스는 스레드 처리보다 더 까다롭고 위험하다.

따라서 프로그램에 필요하고 유용한 경우에만 멀티프로세싱을 사용해야 한다. 다음과 같은 멀티프로세싱에 대한 모범 사례와 기법도 알고 있어야 한다.

  • CPU 코어 수와 작업의 성격에 따라 적절한 프로세스 수를 선택한다.
  • 프로세스 생성과 관리를 위한 높은 수준의 인터페이스를 제공하는 Python multiprocessing 모듈을 사용한다.
  • 공유 메모리, 잠금, 풀, 맵 및 큐 같은 프로세스 통신과 동기화 도구를 사용한다.
  • 종료와 닫기 같은 예외 처리와 프로세스 종료 메서드를 사용한다.

다음 절에서는 이러한 도구와 방법을 사용하는 방법에 대해 자세히 설명할 것이다. Python 멀티프로세싱을 연습하고 마스터하는 데 도움이 될 수 있는 예와 연습 문제도 줄 것이다.

프로세스 생성과 실행 방법

이 절에서는 Python에서 multiprocessing 모듈을 사용하여 프로세스를 생성하고 실행하는 방법을 설명한다. 또한 start와 join 메서드을 사용하여 프로세스의 실행을 제어하는 방법도 보인다.

multiprocerssing 모듈은 프로세스를 생성하고 관리하기 위한 높은 수준의 인터페이스를 제공한다. 사용할 주요 클래스는 Process 클래스로 주어진 인수는 실행할 대상 함수로 단일 프로세스를 나타낸다. 프로세스를 생성하려면 대상 함수와 인수를 프로세스 생성자에게 전달해야 한다. 예를 들어 어떤 숫자를 인수로 가져와 그 제곱을 출력하는 square라는 함수가 있다고 가정하자. 이 함수를 실행하는 프로세스를 다음과 같이 생성할 수 있다.

# Import the multiprocessing module
import multiprocessing

# Define the target function
def square(number):
    print(number ** 2)

# Create a process that executes the square function with 10 as an argument
p = multiprocessing.Process(target=square, args=(10,))

args 매개변수는 대상 함수에 대한 인수를 포함하는 튜플임을 주목하자. 인수가 하나뿐인 경우에는 그 뒤에 쉼표를 추가하여 튜플로 만들어야 한다. 인수가 없으면 args 매개변수를 생략할 수 있다.

일단 프로세스를 만든 다음, start 메서드를 호출하여 프로세스를 시작할 수 있다. 이렇게 하면 대상 함수를 주 프로세스와 병렬로 실행할 새로운 프로세스가 시작된다. 예를 들어 다음과 같이 프로세스 p를 시작할 수 있다.

# Start the process p
p.start()

이는 프로세스 p10을 인수로 하는 square 함수를 실행하기 때문에, 출력으로 100을 인쇄할 것이다. 다만, 출력의 순서는 운영체제에 의한 프로세스의 스케줄링에 따라 달라질 수 있다. 예를 들어, 프로세스 p를 시작한 후 main 프로세스에서 메시지를 인쇄하면 프로세스 p의 출력 전 또는 후에 메시지를 볼 수 있다. 예를 들면, 다음과 같다.

# Start the process p
p.start()
# Print a message in the main process
print("Main process")

다음 중 하나를 인쇄할 것이다.

Main process
100

또는

100
Main process

진행하기 전에 main 프로세스가 프로세스 p가 끝날 때까지 기다리도록 하려면 join 메서드를 사용하면 된다. 이렇게 하면 프로세스가 종료될 때까지 main 프로세스가 블럭된다. 예를 들어,

# Start the process p
p.start()
# Wait for the process p to finish
p.join()
# Print a message in the main process
print("Main process")

항상 다음과 같이 출력된다.

100
Main process

join 메서드는 프로세스들의 실행을 동기화하고 자식 프로세스들 전에 main 프로세스가 종료되지 않도록 하는 데 유용하다. main 프로세스가 프로세스 p가 종료될 때까지 기다릴 최대 초수를 지정하는 join 메서드에 타임아웃 인수를 전달할 수도 있다. 타임아웃이 만료되면, 프로세스 p가 종료될 때까지 기다리지 않고 main 프로세스가 재개될 것이다.

여러 개의 프로세스 클래스 인스턴스를 생성하고 이들 각각에 대해 start 메서드와 join 메서드를 호출하여 여러 프로세스를 생성하고 실행할 수 있다. 예를 들어 다른 인수를 사용하여 square 함수를 실행하는 프로세스 4개를 생성하려고 한다고 가정해 보겠다. 다음과 같이 할 수 있다.

# Import the multiprocessing module
import multiprocessing

# Define the target function
def square(number):
    print(number ** 2)

# Create four processes that execute the square function with different arguments
p1 = multiprocessing.Process(target=square, args=(1,))
p2 = multiprocessing.Process(target=square, args=(2,))
p3 = multiprocessing.Process(target=square, args=(3,))
p4 = multiprocessing.Process(target=square, args=(4,))
# Start the four processes
p1.start()
p2.start()
p3.start()
p4.start()
# Wait for the four processes to finish
p1.join()
p2.join()
p3.join()
p4.join()
# Print a message in the main process
print("Main process")

그러면 1, 2, 3, 4의 제곱이 출력되고 이어서 Main process라는 메시지가 출력된다. 다만, 출력의 순서는 운영체제에 의한 프로세스의 스케줄링에 따라 달라질 수 있습다. 예를 들어, 다음과 같이 출력될 수 있다.

1
4
9
16
Main process

또는

4
9
1
16
Main process

또는 다른 순서도 가능하다. 출력이 특정한 순서에 있는지 확인하려면 프로세스 통신과 동기화 도구를 사용해야 하며, 다음 절에서 다룰 것이다.

이 절에서는 Python에서 multiprocessing 모듈을 사용하여 프로세스를 만들고 실행하는 방법을 보였다. start와 join 메서드를 사용하여 프로세스의 실행을 제어하는 방법도 보였다. 다음 절에서는 공유 메모리와 잠금 같은 프로세스 통신과 동기화 도구를 사용하는 방법을 설명할 것이다.

프로세스 통신과 동기화 방법

이 절에서는 공유 메모리와 잠금 같은 프로세스 통신과 동기화 도구를 사용하는 방법을 설명한다. 또한 Value와 Array 클래스를 사용하여 프로세스 간에 데이터를 생성하고 공유하는 방법과 Lock 클래스를 사용하여 데이터 손상을 방지하고 데이터의 일관성을 보장하는 방법도 설명한다.

프로세스 통신과 동기화는 여러 프로세스 간의 실행과 데이터 교환을 조정할 수 있기 때문에 멀티프로세싱의 중요한 측면이다. 프로세스 통신은 프로세스 간의 데이터와 메시지의 전송을 의미하며, 프로세스 동기화는 프로세스의 실행 순서와 타이밍을 제어하는 것을 의미한다.

프로세스 간에 통신하고 동기화하는 한 가지 방법은 여러 프로세스에 의해 액세스되고 수정될 수 있는 메모리 영역인 공유 메모리를 사용하는 것이다. 공유 메모리를 사용하면 프로세스 간에 데이터와 객체를 직렬화와 병렬화 없이 공유할 수 있지만 비용이 많이 들고 복잡할 수 있다. 그러나 공유 메모리는 여러 프로세스가 동시에 동일한 데이터를 읽거나 쓰려고 시도할 수 있으므로 데이터 손상과 불일치의 위험도 초래한다. 이를 방지하기 위해서는 한 번에 하나의 프로세스만 공유 자원을 액세스할 수 있도록 보장하는 메커니즘인 잠금 장치를 사용해야 한다.

Python의 multoprocessing 모듈은 공유 메모리와 잠금을 생성하고 사용하는 클래스와 메서드를 제공한다. 사용할 주요 클래스는 각각 공유 값과 배열을 나타내는 ValueArray 클래스와 프로세스에 의해 획득하고 해제할 수 있는 lock 객체를 나타내는 Lock 클래스이다.

Value 클래스를 사용하면 특정 유형의 공유 값을 생성하고 주어진 값으로 초기화할 수 있다. 여러 프로세스가 Value 객체를 액세스하고 수정할 수 있지만 한 번에 한 프로세스만 액세스할 수 있도록 하려면 lock을 사용해야 한다. 예를 들어 공유 정수 값을 생성하고 0으로 초기화하려고 하면 다음과 같이 코딩할 수 있다.

# Import the multiprocessing module
import multiprocessing

# Create a shared integer value and initialize it with 0
# The first argument is the type code, which can be 'i' for integer, 'f' for float, etc.
# The second argument is the initial value
# The third argument is an optional lock object, which is True by default
shared_value = multiprocessing.Value('i', 0)

Array 클래스를 사용하면 특정 타입과 크기의 공유 array를 생성하고 주어진 시퀀스로 초기화할 수 있다. 여러 프로세스는 Array 객체를 액세스하고 수정할 수 있지만 한 번에 한 프로세스만 액세스할 수 있도록 하려면 lock 장치를 사용해야 한다. 예를 들어 4개의 정수로 구성된 공유 Array 객체를 생성하고 [1, 2, 3, 4]로 초기화하려고 하면 다음과 같이 프로그램할 수 있다.

# Import the multiprocessing module
import multiprocessing

# Create a shared array of four integers and initialize it with [1, 2, 3, 4]
# The first argument is the type code, which can be 'i' for integer, 'f' for float, etc.
# The second argument is the initial sequence
# The third argument is an optional lock object, which is True by default
shared_array = multiprocessing.Array('i', [1, 2, 3, 4])

Lock 클래스를 사용하면 프로세스가 획득하고 해제할 수 있는 Lock 객체를 만들 수 있다. lock 객체는 lockedunlocked 상태의 두 가지 상태가 있다. 프로세스가 Lock을 획득하면 상태가 locked로 변경되고 다른 프로세스가 lock을 획득하지 못하도록 한다. 프로세스가 lock을 해제하면 상태가 unlocked로 변경되고 다른 프로세스가 lock을 획득할 수 있도록 한다. 프로세스들의 lock에 대한 동시 액세스로부터 공유 자원을 보호하는 데 사용될 수 있다. 예를 들어 lock 객체를 생성하고 이를 사용하여 이전에 생성한 공유 값과 array를 보호하고자 한다면 다음과 같이 코딩할 수 있다.

# Import the multiprocessing module
import multiprocessing

# Create a shared integer value and initialize it with 0
shared_value = multiprocessing.Value('i', 0)
# Create a shared array of four integers and initialize it with [1, 2, 3, 4]
shared_array = multiprocessing.Array('i', [1, 2, 3, 4])
# Create a lock object
lock = multiprocessing.Lock()
# Define a target function that increments the shared value and array by 1
def increment():
    # Acquire the lock
    lock.acquire()
    # Increment the shared value by 1
    shared_value.value += 1
    # Increment each element of the shared array by 1
    for i in range(len(shared_array)):
        shared_array[i] += 1
    # Release the lock
    lock.release()

lock을 획득하고 해제함으로써 공유 값과 배열을 한 번에 하나의 프로세스만 접근하고 수정할 수 있도록 하였다. 이를 통해 데이터의 손상과 불일치를 방지하고 최종 결과가 정확한지 확인할 수 있다. 예를 들어 증분 함수를 실행하는 프로세스 4개를 생성하고 실행한다고 가정하자. 다음과 같이 할 수 있다.

# Create four processes that execute the increment function
p1 = multiprocessing.Process(target=increment)
p2 = multiprocessing.Process(target=increment)
p3 = multiprocessing.Process(target=increment)
p4 = multiprocessing.Process(target=increment)
# Start the four processes
p1.start()
p2.start()
p3.start()
p4.start()
# Wait for the four processes to finish
p1.join()
p2.join()
p3.join()
p4.join()
# Print the final value of the shared value and array
print(shared_value.value)
print(shared_array[:])

아래와 같이 출렫한다.

4
[5, 6, 7, 8]

보시다피 공유 값과 배열이 예상대로 4번 1씩 증가했다. 잠금을 해제하면 프로세스가 서로 간섭하고 데이터 손상과 불일치를 유발할 수 있기 때문에 다른 잘못된 결과를 얻을 수 있다.

이 절에서는 공유 메모리와 잠금 같은 프로세스 통신과 동기화 도구를 사용하는 방법을 설명하였다. Value와 Array 클래스를 사용하여 프로세스 간에 데이터를 생성하고 공유하는 방법, Lock 클래스를 사용하여 데이터 손상을 방지하고 데이터의 일관성을 보장하는 방법도 보였다. 다음 절에서는 여러 입력에 걸쳐 함수를 병렬로 실행할 때 풀과 매핑을 사용하는 방법을 설명할 것이다.

병렬 실행을 위한 풀과 맵 사용 방법

이 절에서는 여러 입력에 걸쳐 함수를 병렬로 실행하기 위해 풀(pool)과 맵(map)을 사용하는 방법을 설명한다. 또한 Pool 클래스와 프로세스 풀을 생성하고 관리하는 맵을 사용하는 방법과 을 이들 사이에 작업을 분배하기 위한 map과 apply 메서드 사용법을 설명한다.

풀과 맵은 프로세스의 생성과 관리, 그리고 그들 사이의 작업 분배를 단순화하고 자동화할 수 있기 때문에 멀티프로세싱를 위한 유용한 도구이다. 풀과 맵은 특히 데이터 분석, 머신 러닝, 웹 스크래핑, 그리고 함수를 많은 입력 또는 데이터 포인트에 적용하는 것을 수반하는 다른 응용 프로그램에 유용하다.

Pool 클래스를 사용하면 태스크를 병렬로 실행할 수 있는 프로세스 풀을 만들 수 있다. 프로세스 풀은 여러 태스크를 매번 생성하고 파괴할 필요 없이 여러 태스크에 재사용할 수 있는 프로세스 모음이다. 프로세스 풀은 프로세스 간의 태스크 할당과 스케줄링 뿐만아니라 결과 수집과 반환을 관리할 수도 있다.

프로세스 풀을 만들려면 프로세스 수를 풀 생성자에게 전달해야 한다. 이 인수를 생략하면 풀은 시스템에서 사용할 수 있는 CPU 코어 수를 사용한다. 예를 들어, 4개의 프로세스 풀을 생성하고자 한다고 가정하면 다음과 같이 할 수 있다.

# Import the multiprocessing module
import multiprocessing

# Create a pool of four processes
pool = multiprocessing.Pool(4)

일단 프로세스 풀을 만든 다음에는 map과 appy 메석드을 사용하여 그들 사이에 작업을 분배할 수 있다. 맵 메소드를 사용하면 리스트나 튜플과 같은 반복 가능한 입력에 함수를 apply하여 그 결과 리스트를 반환할 수 있다. map 메소드는 반복 가능한 청크를 청크로 나누고 각 청크를 풀의 프로세스에 할당한다. map 메소드는 또한 모든 결과가 준비될 때까지 main 프로세스를 블럭한다. 예를 들어, 어떤 숫자를 인수로 받고 그 제곱을 반환하는 square이라는 함수가 있다고 가정하자. map 메소드를 사용하여 다음과 같이 숫자 리스트에 이 함수를 apply 할 수 있다.

# Define the target function
def square(number):
    return number ** 2

# Create a list of numbers
numbers = [1, 2, 3, 4, 5]
# Use the map method to apply the square function to the list of numbers
# The map method will return a list of results
results = pool.map(square, numbers)
# Print the results
print(results)

출력은 아래와 같다.

[1, 4, 9, 16, 25]

보다시피 map 메서드는 숫자 리스트의 각 요소에 제곱 함수를 적용하고 결과 리스트를 반환한다. map 메서드는 수작업으로 생성하고 관리할 필요 없이 풀 내의 4개 프로세스 간에 작업을 분산시킨다.

apply 메소드는 함수를 하나의 입력에 적용하고 하나의 결과를 반환할 수 있도록 한다. apply 메소드는 풀에 있는 프로세스에 작업을 할당하고, 결과가 준비될 때까지 main 프로세스를 블럭한다. 예를 들어, apply 메소드를 사용하여 수에 제곱 함수를 적용하고자 한다고 가정하자. 이것을 다음과 같이 할 수 있다.

# Use the apply method to apply the square function to a single number
# The apply method will return a single result
result = pool.apply(square, (10,))
# Print the result
print(result)

출력은 아래와 같다.

100

보다시피, apply 메서드는 10square 함수를 적용하고 결과를 반환하였다. apply 메서드는 풀에 있는 프로세스에 태스크를 할당하고 결과가 준비될 때까지 main 프로세스를 블럭하였다.

map과 apply 메서드는 여러 입력에 걸쳐 함수를 병렬로 실행하는 데 유용하지만 다음과 같은 제한과 단점이 있다.

  • 모든 결과가 준비될 때까지 main 프로세스를 블럭하므로 프로그램의 응답성과 성능이 저하될 수 있다.
  • 키워드 인수(keyword argument)를 대상 함수에 전달하지 않고 위치 인수(positional argument)만 전달할 수 없다.
  • 대상 함수나 프로세스에서 발생할 수 있는 예외와 오류를 처리할 수 없다.

이러한 제한과 단점을 극복하기 위해 map과 apply 메스드의 비동기 버전인 map_async과 apply_async 메서드를 사용하할 수 있다. 이러한 메서드는 나중에 사용할 수 있는 결과의 프록시인 AsyncResult 객체를 반환한다. AsyncResult 객체에는 다음과 같은 몇 가지 메서드와 속성이 있다.

  • get: 이 메서드는 결과가 준비될 때까지 main 프로세스를 블럭한 다음 반환한다. 이 메서드는 main 프로세스가 결과를 기다릴 최대 시간(초)을 지정하는 타임아웃 인수를 전달할 수도 있다. 타임아웃이 만료되면 main 프로세스는 타임아웃 오류 예외를 발생시킨다.
  • ready: 이 메서드는 결과가 준비되면 True를 반환하고 그렇지 않으면 False를 반환한다.
  • wait: 이 메서드는 결과가 준비되거나 시간 초과가 만료될 때까지 main 프로세스를 블럭시틴다.
  • successful: 이 메서드는 결과가 준비되고 작업이 성공적으로 완료된 경우 True를 반환하고 그렇지 않으면 False를 반환한다. 결과가 준비된 후에만 이 메서드를 호출할 수 있으며 그렇지 않으면 AssertionError 예외가 발생한다.

예를 들어, square 함수를 숫자 리스트에 적용하기 위해 map_async 메서드를 사용한다고 가정하면 다음과 같이 사용할 수 있다.

# Use the map_async method to apply the square function to a list of numbers
# The map_async method will return an AsyncResult object
async_result = pool.map_async(square, numbers)
# Print the status of the async result
print(async_result.ready())
# Wait for the async result to be ready
async_result.wait()
# Print the status of the async result
print(async_result.ready())
# Get the result of the async result
results = async_result.get()
# Print the results
print(results)

출력은 아래와 같다.

False
True
[1, 4, 9, 16, 25]

보다시피 map_async 메서드는 AsyncResult 객체를 반환하며, 이를 통해 상태를 확인하고 작업 결과를 가져올 수 있다. map_async 메서드도 모든 결과가 준비될 때까지 main 프로세스를 블럭하지 않아 프로그램의 응답성과 성능이 향상되었다.

마찬가지로 apply_async 메서드를 사용하여 square 함수를 수 하나에 적용할 수 있으며 다음과 같다.

# Use the apply_async method to apply the square function to a single number
# The apply_async method will return an AsyncResult object
async_result = pool.apply_async(square, (10,))
# Print the status of the async result
print(async_result.ready())
# Wait for the async result to be ready
async_result.wait()
# Print the status of the async result
print(async_result.ready())
# Get the result of the async result
result = async_result.get()
# Print the result
print(result)

출력은 아래와 같다.

False
True
100

보다시피 apply_async 메서드는 AsyncResult 객체를 반환하며, 이를 통해 상태를 확인하고 작업 결과를 가져올 수 있다. apply_async 메서드도 결과가 준비될 때까지 main 프로세스를 차단하지 않아 프로그램의 응답성과 성능이 향상되었다.

map_async 메서드와 apply_async 메서드를 사용하면 키워드 인수를 대상 함수에 전달할 수 있을 뿐만 아니라 결과가 준비되면 실행되는 콜백 함수도 사용할 수 있다. 콜백 함수는 결과를 인수로 수신하여 결과를 인쇄, 저장 또는 처리하는 등의 작업을 수행할 수 있다. 예를 들어 map_async 메서드를 사용하여 square 함수를 숫자 리스트에 적용하고 사용할 지수를 지정하는 키워드 인수를 전달하고자 한다고 가정하자. 결과를 출력하는 콜백 함수도 사용하고자 한다. 다음과 같이 코딩할 수 있다.

from multiprocessing import Pool

# Define the target function
def square(number, exponent=2):
    return number ** exponent

# Define the callback function
def print_result(result):
    print(result)

# List of numbers
numbers = [1, 2, 3, 4, 5]

# Create a pool of processes
pool = Pool()

# Use the map_async method to apply the square function to a list of numbers
# Pass a keyword argument that specifies the exponent to use
# Pass a callback function that prints the result
async_result = pool.map_async(square, numbers, kwds={'exponent': 3}, callback=print_result)

# Close the pool to prevent any more tasks from being submitted
pool.close()

# Wait for all tasks to complete
pool.join()

# Output:
# 1
# 8
# 27
# 64
# 125

프로세스간 통신을 위한 큐 사용 방법

이 절에서는 프로세스 간 통신을 위해 큐를 사용하는 방법을 설명한다. 또한 Queue 클래스를 사용하는 방법과 프로세스 간 데이터를 저장하고 검색할 수 있는 Queue 객체를 생성하고 사용하는 방법도 설명한다.

queue는 FIFO(선입선출) 방식으로 프로세스 간에 데이터와 메시지를 교환할 수 있기 때문에 프로세스 통신과 동기화를 위한 또 다른 도구이다. queue는 데이터를 삽입하는 순서대로 저장하고 검색할 수 있는 데이터 구조를 말한다. queue에는 front과 rear의 두 끝이 있다. 데이터는 rear에 삽입하고 front에서 제거할 수 있다. queue는 FIFO의 원리를 따르는데, 이는 처음 삽입한 데이터가 처음 제거되는 데이터임을 의미한다.

Python의 multiprocessing 모듈은 Queue 클래스를 제공하는데, Queue 클래스는 여러 프로세스가 공유하고 액세스할 수 있는 queue 오브젝트를 나타낸다. Queue 클래스는 다음과 같은 메서드와 속성을 가지고 있어 queue 오브젝트를 생성하고 사용할 수 있다.

  • put: 이 방법을 사용하면 queue의 rear에 데이터를 삽입할 수 있다. 또한 이 방법에 타임아웃 인수를 전달할 수 있는데, 이 인수는 프로세스가 데이터를 삽입하기 위해 기다릴 최대 시간(초)을 지정한다. 타임아웃이 만료되면 프로세스는 Full 예외를 발생시킨다.
  • get: 이 메서드를 사용하면 queue의 front에서 데이터를 제거하고 반환할 수 있다. 이 메서드에 타임아웃 인수를 전달할 수도 있는데, 이 인수는 프로세스가 데이터를 제거할 때까지 기다릴 최대 시간(초)을 지정합니다. 타임아웃이 만료되면 프로세스는 Empty 예외를 발생시킨다.
  • empty: 이 메서드는 queue가 비어 있으면 True, 그렇지 않으면 False를 반환한다.
  • full: 이 메서드는 queue가 가득 차면 True, 그렇지 않으면 False를 반환한다.
  • qsize: 이 메서드는 큐의 대략적인 크기를 반환한다. 이 메서드는 메서드가 실행되는 동안 큐가 변경될 수 있으므로 신뢰할 수 없고 정확하지 않을 수 있다.

queue 객체를 만들려면 Queue 생성자를 호출해야 한다. 또한 Queue 생성자에게 maxsize 인수를 전달할 수 있으며, 이 인수는 queue가 갖을 수 있는 최대 항목 수를 지정한다. 이 인수를 생략하면 queue의 크기는 무한대가 된다. 예를 들어, 최대 크기가 10인 queue 객체를 생성하고자 한다면 다음과 같이 수행할 수 있다.

# Import the multiprocessing module
import multiprocessing

# Create a queue object with a maximum size of 10
queue = multiprocessing.Queue(10)

queue 객체를 만들고 나면 putget 메소드를 사용하여 queue에 데이터를 삽입하고 제거할 수 있다. 예를 들어, 1, 23을 큐의 rear에 삽입한 후 queue의 front에서 제거하고 인쇄하고자 할 때, 이 작업을 다음과 같이 수행할 수 있다.

# Insert the numbers 1, 2, and 3 at the rear of the queue
queue.put(1)
queue.put(2)
queue.put(3)
# Remove and print the numbers from the front of the queue
print(queue.get())
print(queue.get())
print(queue.get())

출력은 아래와 같다.

1
2
3

보다시피 queue FIFO 원칙을 따랐고, 가장 먼저 삽입된 데이터는 가장 먼저 제거된 데이터이다.

queue 객체를 사용하여 queue에 데이터를 삽입하고 제거함으로써 프로세스 간에 통신하고 동기화할 수 있다. 예를 들어, 1, 23을 queue의 rear에 삽입하는 프로세스와 이를 queue의 front쪽에서 제거하고 인쇄하는 두 프로세스를 만들고자 한다고 가정하자. 이 작업을 다음과 같이 수행할 수 있다.

# Import the multiprocessing module
import multiprocessing

# Create a queue object with a maximum size of 10
queue = multiprocessing.Queue(10)
# Define a target function that inserts the numbers 1, 2, and 3 at the rear of the queue
def insert():
    queue.put(1)
    queue.put(2)
    queue.put(3)
# Define a target function that removes and prints the numbers from the front of the queue
def remove():
    print(queue.get())
    print(queue.get())
    print(queue.get())
# Create two processes that execute the insert and remove functions
p1 = multiprocessing.Process(target=insert)
p2 = multiprocessing.Process(target=remove)
# Start the two processes
p1.start()
p2.start()
# Wait for the two processes to finish
p1.join()
p2.join()

출력은 아래와 같다.

1
2
3

보시다시피, queuequeue에서 데이터를 삽입하고 제거함으로써 두 프로세스가 서로 통신하고 동기화할 수 있도록 한다.

이 절에서는 프로세스 간 통신을 위해 큐를 사용하는 방법을 설명하였다. 또한 Queue 클래스를 사용하는 방법과 프로세스 간 데이터를 저장하고 검색할 수 있는 queue 객체를 생성하고 사용하는 방법도 보였다. 다음 절에서는 yerminate와 close 메서드를 사용하여 예외를 처리하고 프로세스를 종료하는 방법도 설명할 것이다.

예외 처리와 프로세스 종료 방법

이 절에서는 terminate와 close 메서드를 사용하여 예외와 종료 프로세스를 처리하는 방법을 설명한다. 또한 try와 except 블록을 사용하여 대상 함수나 프로세스에서 발생할 수 있는 오류를 파악하고 처리하는 방법도 설명한다.

예외와 종료는 대상 함수나 프로세스에서 발생할 수 있는 예기치 못한 상황과 오류를 처리할 수 있게 해주기 때문에 멀티프로세싱의 중요한 측면이다. 예외는 정상적인 실행 흐름을 방해하고 오류 메시지를 발생시키는 이벤트이다. 종료은 프로세스를 중지하고 종료하는 동작이다.

예외를 처리하고 프로세스를 종료하는 방법 중 하나는 terminate와 close 메서드를 사용하는 것이다. terminate 메서드를 사용하면 프로세스가 태스크를 완료할 때까지 기다리지 않고 프로세스를 즉시 중지하고 종료시킬 수 있다. terminate 메서드를 사용하면 프로세스 또는 프로세스 풀에 새로운 태스크가 제출되는 것을 방지하고 프로세스 또는 풀을 종료하기 전에 기존 태스크가 완료될 때까지 기다릴 수 있다.

terminate 메서드를 사용하려면 프로세스 객체에 대해 호출해야 한다. 이렇게 하면 프로세스의 상태에 관계없이 즉시 중지되고 종료된다. 예를 들어 무한 루프를 실행하는 프로세스를 생성하여 실행한 다음 5초 후에 종료시키려고 하면, 다음과 같이 할 수 있다.

# Import the multiprocessing and time modules
import multiprocessing
import time

# Define a target function that executes an infinite loop
def infinite_loop():
    while True:
        print("Looping")

# Create a process that executes the infinite_loop function
p = multiprocessing.Process(target=infinite_loop)
# Start the process
p.start()
# Wait for 5 seconds
time.sleep(5)
# Terminate the process
p.terminate()
# Print a message in the main process
print("Process terminated")

출력은 아래와 같다.

Looping
Looping
Looping
...
Process terminated

보다시피, terminate 메서드는 작업이 끝날 때까지 기다리지 않고 프로세스 p를 중지하고 즉시 종료시켰다. terminate 메소드는 고착되거나 응답하지 않거나 원하지 않는 프로세스를 종료시키는 데 유용하다. 그러나 terminate 메소드는 다음과 같은 단점과 위험도 가지고 있다.

  • 프로세스가 리소스를 정리하고 해제하는 것을 허용하지 않으며, 이는 메모리 유출과 데드락을 유발할 수 있다.
  • 프로세스가 데이터 손실과 불일치를 유발할 수 있는 결과 또는 오류 메시지를 반환하는 것을 허용하지 않는다.
  • 프로세스가 종료 신호를 무시하거나 차단할 수 있기 때문에 프로세스가 종료되는 것을 보장하지 않는다.

따라서terminate 메서드는 필요하고 불가피한 경우, 결과를 알고 있는 경우에만 사용해야 한다. terminate 메서드 호출 이후에도 join 메서드를 사용하여 프로세스가 종료되었는지 확인하고 그 자원을 확보해야 한다.

close 메서드를 사용하려면 pool 객체에 대해 호출해야 한다. 이렇게 하면 새 작업이 풀에 제출되는 것을 방지하고 기존 작업이 완료될 때까지 기다렸다가 풀을 종료한다. 예를 들어, 4 프로세스로 구성된 풀을 만들고 map 메서드를 사용하여 함수를 숫자 리스트에 적용한 다음 풀을 닫는다고 가정하자. 다음과 같이 이를 코딩할 수 있다.

# Import the multiprocessing module
import multiprocessing

# Define the target function
def square(number):
    return number ** 2

# Create a list of numbers
numbers = [1, 2, 3, 4, 5]
# Create a pool of four processes
pool = multiprocessing.Pool(4)
# Use the map method to apply the square function to the list of numbers
# The map method will return a list of results
results = pool.map(square, numbers)
# Print the results
print(results)
# Close the pool
pool.close()
# Print a message in the main process
print("Pool closed")

출력은 아래와 같다.

[1, 4, 9, 16, 25]
Pool closed

보다시피, close 메소드는 pool에 새로운 태스크가 제출되는 것을 막고, pool을 종료하기 전에 기존 태스크가 완료될 때까지 기다렸다. close 메소드는 프로세스 풀을 정상적으로 종료하고 모든 결과가 반환되도록 하는 데 유용하다. 그러나 close 메소드는 다음과 같은 몇 가지 제한과 단점도 가지고 있다:

  • 새로운 작업을 호출한 후 풀에 제출할 수 없으므로 프로그램의 유연성과 기능이 저하될 수 있다.
  • 풀을 즉시 종료할 수 없으므로 main 프로세스가 오래 걸리거나 기다릴 수 있다.
  • 풀 또는 프로세스에서 발생할 수 있는 예외 또는 오류를 처리할 수 없으며, 이로 인해 프로그램이 충돌하거나 예기치 않게 동작할 수 있다.

따라서 풀에 새로운 작업을 제출할 필요가 없다고 확신할 때, 그리고 잠재적인 오류와 지연을 알고 있을 때만 close 메소드를 사용해야 한다. 또한 close 메소드 이후에 join 메서드를 사용하여 풀이 실제로 종료되었는지 확인하고 풀의 자원을 확보해야 한다.

예외를 처리하고 프로세스를 종료하는 또 다른 방법은 대상 함수나 프로세스에서 발생할 수 있는 오류를 잡고 처리할 수 있는 Python 문인 try와 except 블럭을 사용하는 것이다. try 블록에는 예외를 발생시킬 수 있는 코드가 포함되어 있고 except 블록에는 예외를 발생시킬 경우 실행될 코드가 포함되어 있다. 예를 들어 try와 except 블럭을 사용하여 대상 함수에서 발생할 수 있는 ZeroDivisionError 예외를 잡고 처리하려고 한다aus 다음과 같이 작업할 수 있다.

# Import the multiprocessing module
import multiprocessing

# Define the target function
def divide(number):
    try:
        # Try to divide the number by zero
        result = number / 0
        return result
    except ZeroDivisionError:
        # Catch and handle the ZeroDivisionError exception
        print("Cannot divide by zero")
        return None

# Create a list of numbers
numbers = [1, 2, 3, 4, 5]
# Create a pool of four processes
pool = multiprocessing.Pool(4)
# Use the map method to apply the divide function to the list of numbers
# The map method will return a list of results
results = pool.map(divide, numbers)
# Print the results
print(results)
# Close the pool
pool.close()
# Print a message in the main process
print("Pool closed")

출력은 아래와 같다.

Cannot divide by zero
Cannot divide by zero
Cannot divide by zero
Cannot divide by zero
Cannot divide by zero
[None, None, None, None, None]
Pool closed

보다시피 tryexcept 블록은 대상 함수에서 발생한 ZeroDivisionError 예외를 잡아 처리하고 결과적으로 None을 반환했습니다. try와 except 블록은 대상 함수 또는 프로세스에서 발생할 수 있는 예외와 오류를 처리하고 프로그램이 충돌하거나 예기치 않게 동작하는 것을 방지하는 데 유용합하. 그러나 try와 except 블록도 다음과 같은 제한과 단점이 있다.

  • 일부 예외와 오류가 multiprocessing 모듈 또는 운영 시스템에 의해 제기될 수 있고, 대상 함수에서 발생하지 않을 수 있기 때문에 발생할 수 있는 모든 가능한 예외와 오류를 잡지 못할 수 있다.
  • 일부 예외와 오류는 로깅, 재시도 또는 종료 같은 상이한 액션 또는 응답을 요구할 수 있기 때문에, 원하는 방식으로 예외와 오류를 처리하지 않을 수 있다.
  • 일부 예외와 오류가 풀 또는 main 프로세스가 아닌 프로세스에 의해 포착되고 처리될 수 있기 때문에, 예외와 오류를 main 프로세스에 전파하지 않을 수 있다.

따라서 대상 함수 또는 프로세스에서 발생할 수 있는 예외와 오류를 파악하고 처리할 수 있다고 확신할 때, 그리고 잠재적인 결과와 부작용을 알고 있을 때에만 try와 except 블록을 사용해야 한다.

이 절에서는 terminate와 close 메서드를 사용하여 예외와 종료 프로세스를 처리하는 방법을 설명하였다. 또한 try와 except 블록을 사용하여 대상 함수나 프로세스에서 발생할 수 있는 오류를 파악하고 처리하는 방법도 보였다.

다음 절에서는 포스팅을 마무리하고 독자에게 몇 가지 추가 리소스와 연습문제를 제공할 것이다.

코드 블럭 테스트 필요함

요약

다음 절에서는 포스팅을 마무리할 것.