Published on: Nov 12, 2022
Super fast Python (Part-3): Multi-processing
This is the third post in the series on Python performance and Optimization. The series points out the utilization of inbuilt libraries, low-level code conversions, and other Python implementations to speed-up Python. The other posts included in this series are
- (Part-1): Why Python is slow?
- (Part-2): Good practices to write fast Python code
- (Part-3): Multi-processing in Python (this post)
- (Part-4): Use Cython to get speed as fast as C
- (Part-5): Use Numba to speed up Python Functions and Numeric calculations
Problems with GIL
In the previous post on why python is slow?, we discussed that the problems with Global Interpreter Lock (GIL) in Python. GIL is part of Python's design that ensures thread-safe in Python. But, GIL only allows the interpreter to run with only one thread which makes multi-threading hard and slow in Python.
This makes multi-threading confined to use in cases like where system releases the GIL for tasks where it waits for external processes to complete and return result/status like I/O operations, network requests, etc. To achieve better concurrency in Python, use coroutines with asyncio.
So, even if the system has multiple cores and supports 1000s of threads, multi-threading is not suitable for CPU-intensive tasks. But, how to do parallel computing in Python? The answer is Multi-programming. Multi-programming creates multiple sub-processes (maximum processes can be created = no.of cores) where each process will have its interpreter with GIL and independent memory space. In processes, GIL won't be a problem because each one will have its interpreter to run the bytecode and the CPython manages multiple sub-processes. But, there are some overheads with processes like spawning/forking a process with its own memory space is slower than creating a thread and each process is separate which makes inter-process communication slow.
Also, there are external libraries like Numpy that release GIL and compute the tasks faster. Numpy releases GIL - means most of the Numpy computations are executed in C-libraries that don't require an interpreter which makes Numpy can release the GIL for that particular C-library.
multiprocessing module in Python offers a variety of APIs for achieving multiprocessing. In this blog, we discuss mulitprocessing.Pool class that takes multiple numbers of tasks and executes them parallelly by distributing tasks among multiple cores/workers.
concurrent.futures.ProcessPoolExecutor provides a higher level interface on top of multiprocessing and offers additonal functions to control the process pool.
Multiprocessing Pool
Why and what is a pool?
Imagine we have an 8-cores CPU and create a process by allocating each process to each core for every task and executing them parallelly. If no.of tasks is <= no.of cores, then the parallel computing works smoothly. If we increase the no.of tasks to let's say 100 and create a process for each task and then execute them parallelly, the system has to manage the lifecycle of each process and ensure that processes don't hang up or reach a deadlock state. If we have numerous processes that the system can't handle we run into problems like system crash or hang.
A pool is like a strategy in parallel computing to make sure that only a certain number of workers (or cores in the CPU) are allocated to run tasks given in any number. A pool is really helpful to prevent the system from the crash while serving a very huge number of processes that may overkill the CPU if not properly managed. We create a maximum of n-workers for a system with n-cores and distribute the task to every worker to run parallelly. The main difference is that in normal execution, for every task we create a separate process, but in pool processing, first, we create a pool of processes, then distribute the tasks among the processes in the pool.
If we have 100 tasks and a pool with 5 processes, distribute the 100 tasks to the process pool by running a batch of 20 tasks per process.
Consider a function that takes around 1-second to complete. And we have to call the function for 10 different input parameters.
The following example shows fun_1() as a function that takes a parameter a, sleeps for 1 second, and then returns the square of a.
Why prefer time.perf_counter() over time.time()? time.time() is not monotonic - which means that the system can change the clock time with synchronization to the internet or other update. Where time.perf_counter() is system-wide and the reference point is unknown so only the results between consecutive calls are considered.
Now, call the function 10 times by passing the elements in the range(0..10). If we call the function without parallel processing, the total time will be around 10 seconds because every function call takes 1 second, and for 10 function calls it will be around 10 seconds.
1from os import getpid 2from time import perf_counter, sleep 3from multiprocessing import Pool, cpu_count 4 5# function that sleeps for 1 second 6def fun_1(a): 7 print('WORKER ID:', getpid()) 8 sleep(1) 9 return a * a 10 11# call fun_1 for elements in range(0..10) 12 13# without multiprocessing 14atime = perf_counter() 15print('without multiprocessing') 16 17res = [fun_1(i) for i in range(10)] 18 19print(f"time taken without multiprocessing: {perf_counter()-atime}\n") 20 21# with multiprocessing 22atime = perf_counter() 23print('with multiprocessing') 24 25with Pool(processes=cpu_count()) as pool: 26 res = pool.map(fun_1, range(10)) 27 28print(f"time taken with multiprocessing: {perf_counter()-atime}") 29
output
1without multiprocessing 2WORKER ID: 32093 3WORKER ID: 32093 4WORKER ID: 32093 5WORKER ID: 32093 6WORKER ID: 32093 7WORKER ID: 32093 8WORKER ID: 32093 9WORKER ID: 32093 10WORKER ID: 32093 11WORKER ID: 32093 12time taken without multiprocessing: 10.019481873001496 13 14with multiprocessing 15WORKER ID: 33097 16WORKER ID: 33099 17WORKER ID: 33098 18WORKER ID: 33100 19WORKER ID: 33101 20WORKER ID: 33103 21WORKER ID: 33102 22WORKER ID: 33096 23WORKER ID: 33098 24WORKER ID: 33101 25time taken with multiprocessing: 2.077159455002402 26
In the above code, we applied parallel computing by passing tasks (calling fun_()) to the multiprocessing pool.
The pool class constructor takes the following arguments and returns the pool object
- processes: no.of workers, default is all cores available in the system. We can check the available cores by calling multiprocessing.cpu_count() (mine is 8)
- initializer: initializer function that will be called by every worker when it starts
- initargs: arguments to the initializer function above
- maxtasksperchild: max. no.of tasks a worker should execute before being replaced by another worker process. This behavior releases the workers that are using system resources for a very long time
We discuss initializer and initargs in the later section on sharing data between processes.
The pool object has several methods like apply, map, starmap, imap, etc, that serves different purpose for applying parallel computing. In the above snippet, we used Pool.map() function (same as normal map()) that takes a function and an iterable and calls the function by passing each element in the iterable as an argument. The function should accept only an argument. The Pool.map() is a blocking call and it distributes the tasks among the available cores by dividing the tasks into chunks.
Pool.map() takes the following arguments
- func: the function to be executed parallelly that takes one argument
- iterable: an iterable whose elements are passed as arguments to the function above
- chunksize: chunk size number that used to divide the tasks
based on the nature of tasks, different chunksize numbers schedules tasks differently
In UNIX and such systems, multiprocessing creates processes using a fork, and in windows, it starts processes using spawn. There are some additional restrictions one should be aware of when the process start method is other than fork like protecting the entry point of the program by creating processes inside _if __name__==__main___ because spawn creates new python interpreter for each process.
As we have 8 cores available and 10 tasks at hand, Pool.map() distributes the tasks among the 8 workers by dividing tasks into chunks like [2, 2, 1, 1, 1, 1, 1, 1] and pass them to workers.
The total time taken with multiprocessing is 2 seconds which is 5x times faster than without multiprocessing (10 seconds).
for large iterable size use Pool.imap() with chunksize for better efficiency
for multiple parameters function, use Pool.starmap() or Pool.map() with partial functions
Process vs Pool
Use process, when there is a small number of tasks and each task takes a large amount of time. Use a pool, when there is a large number of tasks and each task takes a small amount of time.
Sharing data between processes
It is strongly recommended to not share data between processes
Share data with global variables (copy-on-write)
Processes created using multiprocessing run independently with their own memory space and they don't have access to the parent process's local data but they inherit the global data of the parent (only when fork start method used).
Since child processes get a snapshot of the parent process's global data, we can utilize this behavior in a way such that we can make sharable data available to child processes by making the data global.
For the following example, we will share an integer value x and an array of integers a to calculate the sum of the product of each element in a with an element in the list of numbers from 0..n, n = 100000. And then compute the product of x and the whole sum.
1# function to be applied for each element 2def sum_product(e): 3 w_sum = sum([x*i for i in a]) 4 return w_sum * e 5 6# pool initializer function 7def pool_initializer(X, A): 8 global x 9 x = X 10 global a 11 a = A 12 13n = 100000 14X = 3 15A = [2, 4, 6, 8, 10, 12] 16 17with Pool(processes=cpu_count(), initializer=pool_initializer, initargs=(X, A)) as pool: 18 res = pool.map(sum_product, range(n)) 19
In the above snippet, we initialized the pool with the pool_initializer function, this function will be called for every worker after they start and any data globalized inside this initialization function is available to all child processes.
Even though we don't need the initializer function here to share data as we can make data global at the top level and all child processes would get a snapshot of the global data, there are other cases where the initializer function is useful. Like reading a file and sharing data, making a database connection, etc, which might work on variable parameters that we can pass in pool initargs.
Sharing data by leveraging global variables and copy-on-write data is only useful for read-only data. If any child process changes the global variable value, they don't reflect in the parent process.
1from random import randint 2 3def random_increment(i): 4 rand_n = randint(0, i) 5 a[rand_n] += 1 6 print(f'child {i}: {a}') 7 8n = 10 9a = list(range(n)) 10 11with Pool(processes=cpu_count()) as pool: 12 pool.map(random_increment, range(n)) 13 14print(f'parent {a}') 15
In the above program, we have initialized a global variable a which is a list. We applied the function random_increment() for n tasks, and, random_increment() function takes the number i and increments the list a at the index generated randomly in the range(0..i).
As a is global, we expect the list a change in the parent process too, but it is not true as each child process is created by copying a snapshot of the parent data and each child will have their own memory.
Look at the output of the above program below to check that, in the parent process, the value of a has not changed.
Output
1child 2: [1, 1, 2, 3, 4, 5, 6, 7, 8, 9] 2child 1: [1, 1, 2, 3, 4, 5, 6, 7, 8, 9] 3child 3: [0, 2, 2, 3, 4, 5, 6, 7, 8, 9] 4child 8: [1, 1, 3, 3, 4, 5, 6, 7, 8, 9] 5child 7: [1, 1, 2, 3, 4, 5, 6, 7, 8, 9] 6child 6: [0, 1, 2, 3, 5, 5, 6, 7, 8, 9] 7child 0: [1, 1, 2, 3, 4, 5, 6, 7, 8, 9] 8child 4: [0, 2, 2, 3, 4, 5, 6, 7, 8, 9] 9child 9: [1, 2, 2, 3, 4, 5, 6, 7, 8, 9] 10child 5: [0, 1, 2, 4, 4, 5, 6, 7, 8, 9] 11parent [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 12
This is because, the data sharing happens here by following a mechanism called copy-on-write where the OS instead of copying and creating the new memory for the global data, just facilitates access to the parent processes memory as long as the child process doesn't change the data. As we have seen in the above example, when the child process tries to change the data, OS just allocates the new memory by copying the shared data and then writes the changes to it. So, the child's process data changes are not reflected in the parent's process because both memory locations are different. This mechanism is vice-versa means that, after forking, when the data in the parent's process changes, they are not reflected in the child's process.
Share data using shared memory
To avoid the problems like above, multiprocessing provides a mechanism to share data. To share the data between child processes, one must use sockets or shared files. But, to share simple values or arrays, multiprocessing provides a mechanism called shared ctypes to share data safely between processes.
With multiprocessing.sharedctypes, we can allocate the ctypes object from the shared memory and then the child processes can inherit them. cytpes are nothing but primitive C compatible data types and ctypes module provides a wrapper around these data types to use in Python. The shared memory here is nothing but the underlying memory buffer and we will discuss later how can utilize this fact to share large Numpy arrays.
Multiprocessing provides two types of shared ctypes, one for read-and-write values and arrays, and the other one for read-only values and arrays.
To manipulate data in child processes, use
- multiprocessing.Value: to share only single value variables such as numbers or strings
- multiprocessing.Array: to share an array of values of the same data type in C
To share read-only data, use
- multiprocessing.RawValue: to share only single value variables
- multiprocessing.RawArray: to share array of values
The main difference between raw type and normal type is that the latter provides an automatic synchronization mechanism with locks to provide process-safe data sharing.
1from multiprocessing import Array 2 3def random_increment(i): 4 rand_n = randint(0, i) 5 a[rand_n] += 1 6 7 print(f'child {i}: {list(a)}') 8 9def initializer_func(A): 10 global a 11 a = A 12 13n = 10 14A = Array('i', range(n)) 15 16with Pool(processes=cpu_count(), initializer=initializer_func, initargs=(A,)) as pool: 17 pool.map(random_increment, range(n)) 18 19print(f'parent: {list(A)}') 20
In the above code, we have passed multiprocessing.Array instead of the normal list. Array is initialized with the data type ('i') and the iterable. We can also initialize the Array in other ways like defining the size first and assign later.
1A = Array('i', 10) 2A = range(10) 3
With Array, we can now see that the global variable A is updated and reflected in the parent process. Also with Array, we can get automatic synchronization with locks that manage by multiprocessing so that the shared data is process-safe and only one process can access the data.
Output
1child 3: [2, 1, 3, 4, 4, 5, 6, 7, 8, 9] 2child 7: [2, 1, 3, 5, 5, 7, 6, 7, 8, 9] 3child 5: [2, 1, 3, 5, 5, 5, 6, 7, 8, 9] 4child 2: [2, 1, 3, 3, 4, 5, 6, 7, 8, 9] 5child 0: [1, 1, 2, 3, 4, 5, 6, 7, 8, 9] 6child 1: [2, 1, 2, 3, 4, 5, 6, 7, 8, 9] 7child 4: [2, 1, 3, 5, 4, 5, 6, 7, 8, 9] 8child 8: [2, 1, 3, 5, 5, 8, 6, 7, 8, 9] 9child 9: [2, 1, 3, 5, 5, 8, 6, 8, 8, 9] 10child 6: [2, 1, 3, 5, 5, 6, 6, 7, 8, 9] 11parent: [2, 1, 3, 5, 5, 8, 6, 8, 8, 9] 12
Share large Numpy arrays between child processes
For each task, as we pass data as arguments to the processes, the data will be pickled so that it can be sent from one process to other. If we pass large amounts of data, pickling involves a huge amount of memory and consumes more time. So, it is better to share large data between child processes instead of passing the data individually.
As we have seen in the above approach that copy-on-write sharing is good for read-only and to work with data that changes inside the child process, we have to share the data using a shared array. In this section, we will discuss sharing large Numpy arrays using a shared array.
We know that shared arrays only support primitive C data types and mainly 1D objects. But, how to share the Numpy arrays which are multidimensional in general to the child processes as shared ctypes do not directly support Numpy arrays? We can share Numpy arrays with some work around that is explained below.
The shared ctypes objects we get are allocated from buffer memory memory. A buffer protocol is a framework in Python designed to provide a mechanism for Python objects to share their data among objects. Any Python objects that are implemented in C-APIs can export a set of functions called the buffer interface. With this buffer interface, any object can expose its data to other objects directly without the need for copying.
As shared ctypes objects are just wrappers around the C data types whose memory was allocated from buffer memory. This is the reason why the child process's changes reflect in the parent process because the shared memory passed to child processes are implemented as memoryview objects.
Consider the following example where we first create a large Numpy array of shape (1000, 1000), and using the process pool we assign the row number to every element in that row. As shared array doesn't support multidimensional and custom data types, a common approach is to pass the Numpy array to every worker and assign the row with a row number. But, passing the large arrays between processes take huge memory and latency. So, we utilize the buffer protocol here. Since shared array objects are provided with a buffer interface, we can rely on this shared memory and make it global so that there is no need to pass Numpy arrays as arguments to each process.
1def assign_int(i): 2 arr = np.frombuffer(np_x.get_obj(), dtype=np.int32).reshape(np_x_shape) 3 arr[i, :] = i 4 5def pool_initializer(X, X_shape): 6 global np_x 7 np_x = X 8 global np_x_shape 9 np_x_shape = X_shape 10 11X_shape = (1000, 1000) 12data = np.zeros(X_shape, dtype=np.int32) 13X = Array('i', X_shape[0] * X_shape[1]) 14# X as a Numpy array 15X_np = np.frombuffer(X.get_obj(), dtype=np.int32).reshape(X_shape) 16# copy data to the shared array 17np.copyto(X_np, data) 18 19with Pool(processes=cpu_count(), initializer=pool_initializer, initargs=(X, X_shape)) as pool: 20 pool.map(assign_int, range(X_shape[0])) 21 22print(f'Numpy array X_np:\n{X_np}') 23
First, we defined the shape of the array X_Shape, and then created a Numpy array with zeros of shape X_Shape. Second, we have created a synchronized shared array and assigned it to X. Third, we wrapped the shared array as Numpy using np.frombuffer.
np.frombuffer takes a 1-dimensional buffer array and interprets it into a Numpy array so we can manipulate the array easily. We used X.get_obj() instead X because X is a synchronized wrapper around the raw array and calling the get_obj() returns the raw buffer object.
For raw arrays (RawArray), objects should be passed normally without calling get_obj().
Why use np.frombuffer? Since Numpy arrays take a large amount of memory space, we don't want to create a copy that again takes large memory. So, using np.frombuffer by utilizing the buffer protocol, we can get a Numpy array which is just a wrapper around the buffer memory.
In initiargs, we should pass the shared array instead Numpy wrapper. We also pass the shape of the Numpy array as we need to reshape the 1D shared array into a multidimensional array in the worker function assign_int().
In the worker function assign_int(), again we interpreted the buffer as a Numpy array and assigned the row values at row number i.
At last, in the parent process, we can check the multiprocessing assignment of the Numpy array.
Output
1Numpy array X_np: 2[[ 0 0 0 ... 0 0 0] 3 [ 1 1 1 ... 1 1 1] 4 [ 2 2 2 ... 2 2 2] 5 ... 6 [997 997 997 ... 997 997 997] 7 [998 998 998 ... 998 998 998] 8 [999 999 999 ... 999 999 999]] 9
Even though multiprocessing looks easy and flexible to use, there are some issues one can face with multiprocessing if not careful,
- Why your multiprocessing Pool is stuck
- Multiprocessing Best Practices
- Handling Hang in Python Multiprocessing
- The Parallelism Blues: when faster code is slower
- Things I Wish They Told Me About Multiprocessing in Python
- Exception Handling in Methods of the Multiprocessing Pool Class in Python
In this blog, we discussed Python's multiprocessing module with the Pool function. Python released other modules for simple concurrent processing and better data sharing,
On top of internal libraries, there are multiple good external libraries available for parallel and concurrent processing,
For comparison among internal and external libraries,