threading
и concurrent.futures
.
Важно помнить о Global Interpreter Lock (GIL), который ограничивает параллельное исполнение CPU-bound задач.
queue.Queue
) и выполняют их.threading.Lock
, threading.RLock
), семафоры (threading.Semaphore
), условия (threading.Condition
) для предотвращения гонок данных и обеспечения потокобезопасности.queue.Queue
уже обеспечивает безопасную работу с данными между потоками.multiprocessing
).Работа с многопоточностью в Python требует понимания нескольких ключевых концепций и инструментов. Главная сложность - Global Interpreter Lock (GIL), который позволяет только одному потоку выполнять байт-код Python в один момент времени. Это сильно ограничивает использование потоков для увеличения производительности задач, интенсивно использующих CPU (например, математические расчеты).
Использование `threading` модуля:
`threading` модуль предоставляет инструменты для создания и управления потоками. Основные шаги:
import threading
def worker(num):
"""Функция, которую будет выполнять поток"""
print(f"Поток {num}: Начало работы")
# ... полезная работа ...
print(f"Поток {num}: Завершение работы")
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join() # Ожидание завершения всех потоков
Распределение работы между потоками:
Корректное распределение работы критически важно для эффективности многопоточности. Вот несколько стратегий:
import threading
data = list(range(1000))
num_threads = 4
chunk_size = len(data) // num_threads
def process_chunk(chunk):
# ... обработка данных в chunk ...
for item in chunk:
# Какая-то операция над item
item * 2 # Пример
print("Chunk обработан")
threads = []
for i in range(num_threads):
start = i * chunk_size
end = (i + 1) * chunk_size if i < num_threads - 1 else len(data) # Последний чанк может быть больше
chunk = data[start:end]
t = threading.Thread(target=process_chunk, args=(chunk,))
threads.append(t)
t.start()
for t in threads:
t.join()
import threading
import queue
task_queue = queue.Queue()
def worker(q):
while True:
task = q.get()
if task is None:
break # Сигнал завершения потока
# ... обработка task ...
print(f"Обработка задачи: {task}")
q.task_done() # Сигнализируем очереди о завершении задачи
num_threads = 3
threads = []
for i in range(num_threads):
t = threading.Thread(target=worker, args=(task_queue,))
threads.append(t)
t.daemon = True # Поток-демон, завершится автоматически
t.start()
for i in range(10):
task_queue.put(i) # Добавляем задачи в очередь
# Сигнализируем потокам о завершении работы
for i in range(num_threads):
task_queue.put(None)
task_queue.join() # Ожидаем завершения всех задач в очереди
for t in threads:
t.join()
import concurrent.futures
def worker(num):
"""Функция, которую будет выполнять поток"""
print(f"Поток {num}: Начало работы")
# ... полезная работа ...
print(f"Поток {num}: Завершение работы")
return f"Результат потока {num}" # Пример возвращаемого значения
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(worker, i) for i in range(5)]
for future in concurrent.futures.as_completed(futures):
print(future.result()) # Получаем результаты по мере завершения
Проблемы и синхронизация:
Многопоточность вносит свои сложности. Необходимо учитывать:
Для решения этих проблем используются механизмы синхронизации:
Когда использовать многопоточность:
Многопоточность подходит для задач, связанных с I/O (например, ожидание данных из сети или с диска). Для задач, интенсивно использующих CPU, лучше использовать многопроцессорность (модуль `multiprocessing`), чтобы обойти ограничение GIL.
Пример использования блокировки:
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
with lock: # Получаем блокировку
counter += 1 # Критическая секция
# Блокировка автоматически освобождается при выходе из блока with
threads = []
for _ in range(1000):
t = threading.Thread(target=increment)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Counter: {counter}") # Гарантированно 1000