Как можно запустить пул процессов с определённым количеством процессов для обработки задач?

Для запуска пула процессов в Python можно использовать модуль multiprocessing. Вот пример:

import multiprocessing

def task(item):
  # Обработка задачи item
  return item * 2

if __name__ == '__main__':
  pool_size = multiprocessing.cpu_count() # Рекомендуется использовать количество ядер CPU
  items = range(10)

  with multiprocessing.Pool(processes=pool_size) as pool:
    results = pool.map(task, items) # Выполняет task для каждого элемента в items

  print(results) # Вывод результатов обработки
  
В этом примере multiprocessing.Pool создает пул из pool_size процессов. Метод pool.map применяет функцию task к каждому элементу в списке items, распределяя задачи между процессами в пуле. Важно оборачивать код создания пула в if __name__ == '__main__': из-за особенностей работы multiprocessing в Windows и macOS.

Для запуска пула процессов с определенным количеством процессов в Python можно использовать модуль multiprocessing, в частности класс multiprocessing.Pool.

Вот основные шаги и пример кода:

  1. Импортируйте модуль multiprocessing:
  2. import multiprocessing
  3. Определите функцию, которая будет выполнять задачу:
  4. def process_task(data):
      # Здесь код обработки данных
      result = data * 2  # Пример обработки: умножение на 2
      return result
  5. Создайте объект Pool, указав количество процессов:
  6. num_processes = 4  # Например, 4 процесса
    pool = multiprocessing.Pool(processes=num_processes)
  7. Передайте задачи в пул процессов:
  8. Существует несколько способов передачи задач:

    • pool.map(function, iterable): Применяет функцию к каждому элементу итерируемого объекта. Результаты возвращаются в виде списка, сохраняя порядок ввода.
    • pool.apply(function, args=(), kwds={}): Применяет функцию с заданными аргументами к одному процессу. Чаще используется для задач, где не важен порядок и нужно дождаться завершения одной задачи, прежде чем перейти к следующей. Блокирует основной поток, пока задача не будет выполнена.
    • pool.apply_async(function, args=(), kwds={}, callback=None): Аналогичен pool.apply, но выполняется асинхронно. Позволяет продолжить выполнение основного потока, а результат будет доступен позже. Можно указать функцию обратного вызова (callback), которая будет выполнена после завершения задачи.
    • pool.imap(function, iterable, chunksize=1): Ленивая версия pool.map. Возвращает итератор, который выдает результаты по мере их готовности. chunksize определяет, сколько задач за раз передается одному процессу.
    • pool.imap_unordered(function, iterable, chunksize=1): Похож на pool.imap, но результаты возвращаются в произвольном порядке, по мере их готовности. Это может быть более эффективным, если порядок не важен.

    Пример использования pool.map:

    data = [1, 2, 3, 4, 5]
    results = pool.map(process_task, data)
    print(results)  # Выведет: [2, 4, 6, 8, 10]

    Пример использования pool.apply_async:

    def callback(result):
      print(f"Результат: {result}")
    
    results = []
    for i in range(5):
      result = pool.apply_async(process_task, args=(i,), callback=callback)
      results.append(result)
    
    
    # Необходимо дождаться завершения всех задач.
    pool.close()
    pool.join()
    
    # results теперь содержит AsyncResult объекты, а callback функция уже вывела результаты.
    
  9. Закройте пул процессов и дождитесь завершения всех задач:
  10. pool.close()
    pool.join()

    pool.close() сообщает пулу, что больше не будет новых задач.

    pool.join() ждет завершения всех задач в пуле. Важно вызвать pool.close() перед pool.join(), иначе программа может зависнуть.

Полный пример кода:

import multiprocessing

def process_task(data):
  """Функция, обрабатывающая данные."""
  result = data * 2
  print(f"Обработка {data} в процессе с ID {multiprocessing.current_process().pid}")
  return result

if __name__ == '__main__':
  num_processes = 4
  data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

  with multiprocessing.Pool(processes=num_processes) as pool:
    results = pool.map(process_task, data)

  print(f"Результаты: {results}")

Важные моменты:

  • Защита от __main__: При использовании multiprocessing в Windows необходимо поместить код, использующий пул процессов, в блок if __name__ == '__main__':. Это связано с тем, как Windows запускает процессы. В Unix-подобных системах это не обязательно, но рекомендуется для переносимости.
  • Глобальные переменные: Будьте осторожны с использованием глобальных переменных в функциях, выполняемых в пуле процессов. Каждый процесс имеет свою собственную копию памяти, поэтому изменения глобальных переменных в одном процессе не будут видны в других.
  • Исключения: Если в процессе обработки задачи возникает исключение, оно будет перехвачено и возвращено в основной поток. Обязательно обрабатывайте исключения правильно.
  • Pickling: Функция и аргументы, передаваемые в пул процессов, должны быть "picklable" (сериализуемы). Это означает, что их можно преобразовать в поток байтов для передачи между процессами.
0