multiprocessing.Pool
для параллельной обработки задач можно следующим образом:
Pool(processes=N)
, где N - количество процессов.pool.map(func, iterable)
, pool.apply(func, args)
или pool.apply_async(func, args)
. map
возвращает результаты в том же порядке, что и входные данные. apply
блокирует до завершения, apply_async
- нет.pool.close()
и дождаться завершения всех процессов с помощью pool.join()
.
import multiprocessing
def worker(x):
return x * x
if __name__ == '__main__':
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(worker, [1, 2, 3, 4, 5])
print(results) # Output: [1, 4, 9, 16, 25]
Важно: убедитесь, что код, выполняемый в процессах, сериализуем.
Реализация параллельной обработки задач с помощью `multiprocessing.Pool()` включает в себя несколько ключевых шагов:
import multiprocessing
import time # для примера
def worker_function(item):
# Здесь код обработки одного элемента 'item'
print(f"Обрабатываю элемент: {item} в процессе {multiprocessing.current_process().name}")
time.sleep(1) # Имитация сложной работы
return item * 2 # Пример возвращаемого значения
if __name__ == '__main__': # Обязательно для Windows
pool_size = multiprocessing.cpu_count() # Используем количество ядер процессора
pool = multiprocessing.Pool(processes=pool_size)
items = [1, 2, 3, 4, 5]
results = pool.map(worker_function, items)
print(f"Результаты: {results}") # Результаты: [2, 4, 6, 8, 10]
async_results = []
for item in items:
result = pool.apply_async(worker_function, (item,)) # Обратите внимание на запятую для args как tuple
async_results.append(result)
# ... делаем что-то еще ...
results = [res.get() for res in async_results] # Получаем результаты (блокируется, пока все не будут готовы)
print(f"Результаты (асинхронно): {results}") # Результаты (асинхронно): [2, 4, 6, 8, 10]
for result in pool.imap(worker_function, items, chunksize=1):
print(f"Результат (imap): {result}") # Вывод по мере готовности результатов
pool.close()
pool.join()
print("Все задачи завершены.")
Пример полного кода:
import multiprocessing
import time
def worker_function(item):
print(f"Обрабатываю элемент: {item} в процессе {multiprocessing.current_process().name}")
time.sleep(1)
return item * 2
if __name__ == '__main__':
pool_size = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=pool_size)
items = [1, 2, 3, 4, 5]
# Используем pool.map()
results = pool.map(worker_function, items)
print(f"Результаты (map): {results}")
# Закрываем пул и ждем завершения
pool.close()
pool.join()
print("Все задачи завершены.")
Важные замечания: