Как синхронизировать работу потоков с использованием `threading.Condition()`?

Используем threading.Condition() для реализации условной переменной. Потоки используют acquire() для блокировки и release() для освобождения. wait() блокирует поток и ждет уведомления, notify() уведомляет один ждущий поток, а notify_all() - все ждущие потоки. Важно вызывать wait(), notify() и notify_all() только после захвата блокировки с помощью acquire().

Синхронизация потоков с использованием `threading.Condition()` в Python

`threading.Condition()` используется для организации сложных взаимодействий между потоками, когда одного лок-а не хватает. Он позволяет потокам засыпать (ждать) до тех пор, пока не будет выполнено определенное условие. Другие потоки, в свою очередь, могут сигнализировать об изменении этого условия, пробуждая ждущие потоки.

Основная идея заключается в следующем: поток, которому нужно дождаться определенного условия, вызывает метод `wait()` объекта `Condition()`. Этот метод атомарно освобождает связанный с `Condition()` лок-а и засыпает, ожидая сигнала. Другой поток, который изменяет состояние, влияющее на условие, вызывает метод `notify()` или `notify_all()` объекта `Condition()`. `notify()` пробуждает один ждущий поток (если есть), а `notify_all()` пробуждает все ждущие потоки.

Ключевые методы `threading.Condition()`:

  • `acquire()`: Получает связанный лок-а. Обычно вызывается перед взаимодействием с общим ресурсом или проверкой условия. Вызывается автоматически при создании объекта `Condition()` если не указан explicit lock.
  • `release()`: Освобождает связанный лок-а. Вызывается после взаимодействия с общим ресурсом. Вызывается автоматически при завершении блока `with` (context manager).
  • `wait()`: Освобождает лок-а и засыпает, пока не будет получен сигнал (через `notify()` или `notify_all()`). Перед вызовом `wait()` текущий поток должен владеть лок-ом. Когда поток просыпается, он снова получает лок-а. Может принимать необязательный параметр `timeout` (в секундах), по истечении которого поток будет разбужен в любом случае. Если timeout достигнут, `wait()` возвращает `True` (в Python 3.9 и новее), в противном случае - `False`.
  • `notify(n=1)`: Пробуждает до `n` ждущих потоков (по умолчанию один). Вызывающий поток должен владеть лок-ом. Выбор, какой поток будет разбужен, не определен.
  • `notify_all()` (или `notify_all()`): Пробуждает все потоки, ждущие на этом условии. Вызывающий поток должен владеть лок-ом.
  • `__init__(lock=None)`: Конструктор. Принимает в качестве аргумента необязательный объект Lock или RLock. Если lock не передан, создается новый RLock.

Пример:


import threading
import time

class ProducerConsumer:
    def __init__(self):
        self.condition = threading.Condition()
        self.buffer = []
        self.buffer_size = 5

    def produce(self):
        while True:
            with self.condition:
                while len(self.buffer) == self.buffer_size:
                    print("Производитель: Буфер полон. Ожидаю...")
                    self.condition.wait()  # Освобождаем лок-а и ждем

                item = time.time()  # Просто timestamp для примера
                self.buffer.append(item)
                print(f"Производитель: Произвел {item}")
                self.condition.notify_all()  # Уведомляем потребителей
            time.sleep(1)

    def consume(self):
        while True:
            with self.condition:
                while len(self.buffer) == 0:
                    print("Потребитель: Буфер пуст. Ожидаю...")
                    self.condition.wait()  # Освобождаем лок-а и ждем

                item = self.buffer.pop(0)
                print(f"Потребитель: Потребил {item}")
                self.condition.notify_all()  # Уведомляем производителей
            time.sleep(2)


if __name__ == "__main__":
    pc = ProducerConsumer()
    producer_thread = threading.Thread(target=pc.produce)
    consumer_thread = threading.Thread(target=pc.consume)

    producer_thread.start()
    consumer_thread.start()

    producer_thread.join()
    consumer_thread.join()

  

Объяснение примера:

  1. Создается объект `threading.Condition()`.
  2. Производитель и потребитель используют этот объект для синхронизации доступа к общему буферу (`self.buffer`).
  3. Оба потока используют `with self.condition:` для получения и автоматического освобождения лок-а.
  4. Производитель ждет, если буфер полон (`len(self.buffer) == self.buffer_size`).
  5. Потребитель ждет, если буфер пуст (`len(self.buffer) == 0`).
  6. Когда производитель добавляет элемент в буфер, он вызывает `self.condition.notify_all()`, чтобы разбудить потребителя.
  7. Когда потребитель забирает элемент из буфера, он вызывает `self.condition.notify_all()`, чтобы разбудить производителя.

Важные моменты:

  • Всегда проверяйте условие внутри цикла `while` после пробуждения потока. Это необходимо, потому что поток мог быть разбужен по какой-то другой причине (например, другим потоком) или ложно разбужен (spurious wakeup).
  • Используйте `notify_all()` вместо `notify()`, если не уверены, какой поток должен быть разбужен. `notify()` может привести к "зависанию", если будет разбужен не тот поток. В большинстве сценариев `notify_all()` безопаснее и проще в использовании.
  • `Condition` использует Lock или RLock. RLock (Reentrant Lock) позволяет одному и тому же потоку получать лок-а многократно.
  • Метод `wait()` вызывается только когда поток владеет Lock-ом. Он атомарно освобождает Lock и переводит поток в состояние ожидания.

`threading.Condition()` является мощным инструментом для синхронизации потоков в сложных сценариях, где простого лок-а недостаточно. Он позволяет потокам эффективно ждать определенных условий, не занимая процессорное время.

0