Многопоточность - это возможность запускать множество потоков, где каждый поток может выполнять параллельные вычисления. Запуском и работой потокомами управляет программист, он запрашивает их у операционной системы (ОС), которая решает на какое ядро процессора поставить поток, поэтому кол-во потоков зависит от кол-во ядер на процессоре. Паттерн ThreadPool - позволяет запускать столько потоков, сколько есть на процессоре. На одном ядре процессора многопоточность создает иллюзию параллельной работы. В действительности процессор переключается на разные задачи с помощью алгоритма планирования и называется это многозадачность.
Многозадачность - это свойство программы совмещать нескольких задач в рамках одного потока выполнения.
Параллелизм - свойство среды исполения, возможность за определенное время совершить несколько задач в нескольких потоках на многоядерном процессоре.
Синхронность - поток выполения может быть заблокирован, если для него требуется наступление какого-нибудь события. Например, ожидание данных по сети, чтобы потом их прочитать.
Асинхронность - свойство не отслеживать выполнение разных задач во времени. Программист не контролирует потоком выполнения, поток самостоятельно отслеживает выполнение задачи и уведомляет программиста о завершении результата. Применять асинхронность можно при однопоточном или многопоточном программировании. Из асинхронности следует многозадачность, но из параллелизма асинхронность - не следует.
ОТЛИЧИЕ многопоточности и асинхронности: многопоточность - подразумевает много потоков и ими кто-то управляет, асинхронность - не подразумевает многопоточность (может быть 1 поток) и этим потоком программист не управляет, можно только получить уведомление о завершении и результат работы.
ОТЛИЧИЕ синхронности от асинхронности: в асинхроннности поток может приостановить выполнение задачи, сохранив текущее состояние, и начать выполнять другие задачи, а затем вернуться в предыдущую задачу продолжить ее исполнение. Например, асинхронное выполнение задачи можно добавиться с помощью корутин.
std::thread - это RAII обертка вокруг операционной системы, POSIX - API Linuxa, WindowsRest - API Windows
Методы:
- detach - разрывает связь между объектом thread и потоком, который начал выполняться. detach - не блокирует основной поток, не заставляя дожидаться окончания выполнения потока.
- join - блокирует основной поток, заставляя дожидаться окончания выполнения потока.
- joinble - проверяет ассоциирован std::thread с потоком, если нет (не было detach или join) - возвращает true.
Замечание: нельзя копировать.
std::jthread - по сравнению с std::thread преимущества:
- в деструкторе вызывается join.
- безопасный и предотвращает утечку ресурсов: исключение будет перехвачено и обработано.
Исключения (std::exception) в разных потоках (std::thread) - не пересекаются. Чтобы пробросить исключение в главный поток можно использовать:
std::current_exception()
- получение текущего exception.std::exception_ptr
- обертка для исключения (std::exception), из нее ничего нельзя получить, только пробросить дальше с помощью std::rethrow_exception.std::rethrow_exception
- пробрасывает исключение в другой try/catch.- Сохранение исключений:
- Способ: stack<exception_ptr> - сохраняет указатель на исключение (exception_ptr) из другого потока.
- Способ: promise - сохраняет указатель на исключение (exception_ptr) из другого потока с помощью метода set_exception и выводит исключение с помощью std::future метода get.
std::uncaught_exceptions()
- возвращает кол-во неперехваченных исключений в текущем потоке, помогает избежать double exception в деструкторе, написав проверку if (!std::uncaught_exceptions()) throw.
1 Способ: std::stack<std::exception_ptr>
std::stack<std::exception_ptr> exception_queue;
auto Function = [&exception_queue]()
{
try
{
throw std::runtime_error("Test exception");
}
catch (...)
{
std::exception_ptr exception = std::current_exception();
exception_queue.push(exception);
}
};
std::thread thread(Function);
thread.join();
while (!exception_queue.empty())
{
try
{
auto exception = exception_queue.top();
exception_queue.pop();
std::rethrow_exception(exception);
}
catch (const std::exception& exception)
{
std::cout << "Exception: " << exception.what() << std::endl;
}
}
2 Способ: std::promise + std::future
std::promise<int> promise;
auto future = promise.get_future();
auto Function = [&]()
{
try
{
throw std::runtime_error("Test exception");
}
catch (...)
{
std::exception_ptr exception = std::current_exception();
promise.set_exception(exception);
}
};
std::thread thread(Function);
thread.join();
try
{
future.get();
}
catch (const std::exception& exception)
{
std::cout << "Exception: " << exception.what() << std::endl;
}
При захвате std::mutex при std::exception: лучше использовать RAII оберкти std::lock_guard/std::unique_lock, которые при выходе из стека в деструкторе вызывается unlock, вместо обычного std::mutex, который при возникновении исключения не вызовет unlock.
std::mutex mutex;
auto Function = [&mutex]()
{
std::unique_lock lock(mutex);
try
{
throw std::runtime_error("Test exception");
}
catch (...)
{
auto exception = std::current_exception();
}
};
std::thread thread(Function);
thread.join();
- std::call_once - вызов объекта ровно один раз, даже если он вызывается одновременно из нескольких потоков.
- std::this_thread::get_id() - id текущего потока.
- std::thread::hardware_concurrency() - кол-во возможных потоков на компьютере.
- std::this_thread::yield() - приостановливает текущий поток, отдав преимущество другим потокам.
- std::this_thread::sleep_for(sleep_duration) - блокирует выполнение текущего потока на время sleep_duration.
- std::this_thread::sleep_until(sleep_time) - блокирует выполнение текущего потока до наступления момента времени, например 11:15:00.
Проблема: Race condition/data race (состояние гонки) - обращение к общим данным в разных потоках одновременно.
Решение: использование std::mutex.
Механизм синхронизации, который предназначен для контроля доступа к общим данным для нескольких потоков. Под капотом mutex работает через системный вызов (syscall) futex (fast userspace mutex), который работает в ядре. Он усыпляет поток с помощью планировщика задач (Schedule) и добавляет поток в очередь ожидающих потоков, если мьютекс занят. Когда mutex освобождается, futex выберет один из ждущих в очереди потоков и помечает его исполнить планировщику задач (Schedule). Планировщик переключает контекст на этот поток, и он окажется захваченным mutex. Поход в ядро через syscall и переключение контекста на другой поток — достаточно долгие операции.
Методы, внутри используются синхронизации уровня операционной системы (ОС):
- lock - происходит захват mutex текущим потоком и блокирует доступ к данным другим потокам; или поток блокируется, если мьютекс уже захвачен другим потоком.
- unlock - освобождение mutex, разблокирует данные другим потокам.
Замечание: при повторном вызове lock - вылезет std::exception или приведет к состоянию бесконечного ожидания, при повторном вызове unlock - ничего не будет или вылезет std::exception. - try_lock - происходит захват mutex текущим потоком, НО НЕ блокирует доступ к данным другим потокам, а возвращает значение: true - можно захватить mutex / false - нельзя; НО МОЖЕТ возвращать ложное значение, потому что в момент вызова try_lock mutex может быть уже lock/unlock. Использовать try_lock в редких случаях, когда поток мог что-то сделать полезное, пока ожидает unlock.
Использовать при рекурсии. Правило: кол-во lock = кол-во unlock. Если использовать обычный std::mutex, то при повторном вызове lock, либо будет deadlock, либо выскачит ошибка.
По сравнению с std::mutex имеет дополнительные методы:
- try_lock_for - блокирует доступ к данным другим потокам на ОПРЕДЕЛЕННОЕ ВРЕМЯ и возвращает true - произошел захват mutex текущим потоком/false - нет; НО МОЖЕТ возвращать ложное значение, потому что в момент вызова try_lock_for std::timed_mutex может быть уже lock/unlock.
- try_lock_until - блокирует выполнение текущего потока до НАСТУПЛЕНИЕ МОМЕНТА ВРЕМЕНИ (например, 11:15:00) и возвращает true - произошел захват mutex текущим потоком/false - нет; НО МОЖЕТ возвращать ложное значение, потому что в момент вызова try_lock_until std::timed_mutex может быть уже lock/unlock.
Обладает свойствами std::recursive_mutex + std::timed_mutex.
Имеет 2 разных блокировки:
- эксклюзивная блокировка (exclusive lock) - запись данных только для одного потока одновременно. Если один поток получил эксклюзивный доступ (через lock, try_lock), то никакие другие потоки не могут получить блокировку (включая общую).
Работает по аналогии std::mutex.
Методы:- lock
- unlock
- try_lock
- общая блокировка (shared lock) - чтение данных из нескольких потоков одновременно. Если один поток получил общую блокировку (через lock_shared, try_lock_shared), ни один другой поток не может получить эксклюзивную блокировку, но может получить общую блокировку.
Методы:- shared_lock
- unlock_shared
- try_lock_shared Замечание: в пределах одного потока одновременно может быть получена только одна блокировка (общая или эксклюзивная).
Плюсы:
- можно выделить участок кода, где будет происходить чтение из нескольких потоков.
- синхронизирует данные при записи/чтении, не вызывая Race condition/data race (состояние гонки). При записи данных одним потоком, чтение/запись данных блокируется для всех потоков. При чтении данных одним потоком, чтение данных - доступно для всех потоков, запись данных блокируется для всех потоков.
- параллелизм чтения и общая блокировка для чтения, которые недоступны при обычном std::mutex.
- отсутствие Livelock (голодание потоков) - ситуация, в которой поток не может получить доступ к общим ресурсам, потому что на эти ресурсы всегда претендуют какие-то другие потоки.
Минусы:
- больше весит, чем std::mutex.
Обладает свойствами std::shared_mutex + std::timed_mutex.
Отличие от atomic:
- atomic блокирует шину адреса и тратит меньше времени, чем mutex блокирует поток с помощью планировщика задач (Scheduler) с переводом потока в заблокированное состояние через походы в ядро процессора.
- метод is_lock_free в atomic проверяет возможность применения atomic к типу: true - можно использовать atomic к данному типу / false - нет, лучше использовать mutex (например, нетривиальный тип и больше > 8 байт - размер регистра).
RAII обертки, захват mutex.lock() ресурса происходит на стеке в конструкторе и высвобождение unlock при выходе из стека в деструкторе.
Делает захват mutex.lock() ресурса происходит на стеке в конструкторе и высвобождение unlock при выходе из стека в деструкторе.
Дополнительные тэги:
- std::adopt_lock - считается lock сделан до этого.
Замечание: нельзя копировать.
Имеет возможности std::lock_guard + std::mutex. Приоритетнее использовать std::unique_lock, чем std::lock_guard.
Методы:
- owns_lock - проверка на блокировку, возвращает значение: true - mutex заблокирован / false - нет.
Дополнительные тэги:
- std::try_to_lock - происходит попытка захвата mutex текущим потоком, НО НЕ блокирует доступ к данным другим потокам.
- std::defer_lock - lock не выполнется.
- std::adopt_lock - считается lock сделан до этого.
Замечание: можно копировать.
Имеет возможности std::unique_lock, но для std::shared_timed_mutex. Для обычного std::mutex не подойдет.
Делает общую блокировку (shared lock) - чтение данных из нескольких потоков одновременно. Если один поток получил общую блокировку, ни один другой поток не может получить эксклюзивную блокировку (exclusive lock) - запись данных только для одного потока одновременно, но может получить общую блокировку.
- Livelock - голодание потоков.
- Starvation - нехватка ресурсов для потока.
- Deadlock - взаимная блокировка mutex.
Отличия:
- В Livelock потоки находятся в состоянии ожидания и выполняются одновременно. Livelock — это частный случай Starvation - процесс в потоке не прогрессирует.
- В Deadlock потоки находятся в состоянии ожидания.
Голодание потоков - ситуация, в которой поток не может получить доступ к общим ресурсам, потому что на эти ресурсы всегда претендуют какие-то другие потоки, которым отдаётся предпочтение. Потоки не блокируются — они выполняют не столь полезные действия. Решение: Расставлять приоритеты потоков и правильно использовать mutex.
Голодание - поток не может получить все ресурсы, необходимые для выполнения его работы.
Ситуация, в которой есть 2 mutex и 2 thread и 2 функции. В function1 идет mutex1.lock(), mutex2.lock(). В функции function2 идет обратная очередность mutex2.lock(), mutex1.lock(). Получается thread1 захватывает mutex1, thread2 захватывает mutex2 и возникает взаимная блокировка:
std::mutex mutex1;
std::mutex mutex2;
auto function1 = [&]()
{
std::lock_guard guard1(mutex1);
std::this_thread::sleep_for(std::chrono::milliseconds(10)); // задержка, чтобы thread2 успел сделать lock в mutex2 в function2
std::lock_guard guard2(mutex2);
};
auto function2 = [&]()
{
std::lock_guard guard1(mutex2);
std::this_thread::sleep_for(std::chrono::milliseconds(10)); // задержка, чтобы thread1 успел сделать lock в mutex1
std::lock_guard guard2(mutex1);
};
std::thread thread1(function1);
std::thread thread2(function2);
thread1.join();
thread2.join();
- Захватывать (lock) несколько мьютексов всегда в одинаковом порядке.
- Отпускать (unlock) захваченные (lock) mutex в порядке LIFO («последним пришёл — первым ушёл»).
- Можно использовать алгоритм предотвращения взаимоблокировок std::lock, порядок std::mutexов неважен:
3.1:
std::lock(mutex1, mutex2);
std::lock_guard lock1(mutex1, std::adopt_lock); // считается lock сделан до этого
std::lock_guard lock2(mutex2, std::adopt_lock); // считается lock сделан до этого
3.2:
std::unique_lock lock1(mutex1, std::defer_lock); // lock не выполнется
std::unique_lock lock2(mutex2, std::defer_lock); // lock не выполнется
std::lock(lock1, lock2);
- std::try_lock - пытается захватить (lock) в очередном порядке каждый std::mutex.
При успешном захвате всех переданных std::mutex возвращает -1.
При неуспешном захвате всех std::mutexoв, все std::mutexы освобождаются и возвращается индекс (0,1,2..) первого std::mutex, которого не удалось заблокировать.
Замечание: при повторном захвате все std::mutex вызовется unlock.
При исключении в std::try_lock вызывается unlock для всех std::mutex.
Порядок std::mutexов неважен:
if (std::try_lock(mutex1, mutex2) == -1)
{
std::lock_guard lock1(mutex1, std::adopt_lock); // считается lock сделан до этого
std::lock_guard lock2(mutex2, std::adopt_lock); // считается lock сделан до этого
}
std::scoped_lock - улучшенная версия std::lock_guard, конструктор которого делает захват (lock) произвольного кол-во мьютексов в очередном порядке и высвобождает (unlock) при выходе из стека в деструкторе, использование идиомы RAII.
Замечание: отсутствует конструктор копироввания.
std::scoped_lock scoped_lock(mutex1, mutex2);
Условная переменная - механизм синхронизации между потоками, который работает ТОЛЬКО в паре mutex + unique_lock. Используется для блокировки одного или нескольких потоков с помощью wait до тех пор, пока другой поток не уведомит condition_variable о разблокировке одного (notify_one) или нескольких потоков (notify_any).
Методы:
- notify_one - в wait удаляется ОДИН поток из очереди и он пробуждается. Например, 1 читатель.
- notify_all - в wait удаляются ВСЕ потоки из очереди и они пробуждаются. Например, много читателей.
- wait(mutex)/wait(mutex, predicate) - освобождает (unlock) mutex для текущего потока, чтобы другие потоки могли захватить mutex. При использовании предиката, проверяется условие в придикате:
- условие НЕвыполнено. Идет засыпание потока и помещение его в очередь ожидающих потоков с помощью планировщика (Scheduler) через поход в ядро процессора - и так все потоки-читатели поочередно помещаются в очередь ожидания.
- условие ВЫПОЛНЕНО или пришло уведомление notify_one (удаляется ОДИН поток из очереди и он пробуждается)/notify_all (удаляются ВСЕ потоки из очереди и они пробуждаются). Происходит попытка захвата mutex (lock) ОДНИМ потоком, продолжающаяся до тех пор, пока он не будет захвачен (бывает другой поток успел раньше захватить mutex и не отпустил его еще). После захвата (lock) mutexа ОДНИМ потоком, он выходит из wait, читает данные, выходит из области видимости unique_lock, освобождая mutex (unlock). При notify_all остальные пробудившиеся потоки поочередно выходят из wait, захватывая (lock) и освобождая (unlock) mutex.
- wait_for(mutex) - приостанавливает выполнение потоков на ОПРЕДЕЛЕННОЕ ВРЕМЯ.
- wait_until(mutex) - приостанавливает выполнение потоков до НАСТУПЛЕНИЕ МОМЕНТА ВРЕМЕНИ (например, 11:15:00).
Принимает любой объект (не только unique_lock, но может быть обычный mutex), который может выполнить lock/unlock. Работает медленее, чем std::condition_variable.
Ложные пробуждения - пробуждение потока без веской причины. Это происходит потому что между моментом сигнала (notify) от condition_variable и моментом запуска ожидающего потока, другой поток запустился и изменил условие, вызвав состояние гонки. Если ожидающий поток просыпается вторым, он проигрывает гонку и происходит ложное пробуждение, поэтому после wait рекомендуют всегда проверять истинность условие пробуждения в цикле или в предикате.
Принимает в качестве аргументов std::condition_variable и std::unique_lock. При завершении потока и выхода из стека, когда все деструкторы локальных (по отношению к потоку) объектов отработали, выполняется notify_all в захваченном condition_variable. Поток, вызвавший std::notify_all_at_thread_exit, будет обладать mutex до самого завершения, поэтому необходимо позаботиться о том, чтобы не произошёл deadlock где-нибудь в другом месте. std::notify_all_at_thread_exit - использовать в редких случаях, когда необходимо гарантировать к определенному моменту уничтожение локальных объектов + нельзя использовать join у потока по какой-то причине.
auto Write = [&](const std::string& iSymbols)
{
{
std::lock_guard lock(mutex);
data = iSymbols;
}
cv.notify_all(); // разрешаем одному потоку
};
auto Read = [&]()
{
std::unique_lock lock(mutex);
cv.wait(lock, [&data](){ return !data.empty();});
std::cout << data << std::endl;
};
PRODUCER
Поток-писатель после записи данных уведомляет потоков-читателей, что данные готовы для чтения:
• notify_one: только один поток уснувший в wait может пробудиться и удалиться из очереди ожидающих потоков.
• notify_all: все потоки усувшие в wait могут пробудиться и удалиться из очереди ожидающих потоков.
CONSUMER
Поток-читатель захватывает mutex (lock) в unique_lock и освобождает его (unlock) в wait, чтобы другие потоки-читатели могли захватить mutex и попасть в wait.
WAIT С ПРЕДИКАТОМ
Идет проверка условия в предикате для предотвращения ложных пробуждений:
- в случае НЕвыполнения условия (например, поток-писатель еще не выполнил условие): идет засыпание потока и помещение его в очередь ожидающих потоков с помощью планировщика (Scheduler) через поход в ядро процессора - и так все потоки-читатели поочередно помещаются в очередь ожидания.
- в случае ВЫПОЛНЕНИЯ условия очередь ожидающих потоков может быть ПУСТОЙ и НЕПУСТОЙ:
- при НЕПУСТОЙ очереди (например, потоки-читатели успели захватить mutex (lock) РАНЬШЕ, чем поток-писатель, невыполнивший еще условие), пришло уведомление:
• notify_one: удаляется ОДИН поток из очереди и он пробуждается.
• notify_all: удаляются ВСЕ потоки из очереди и они пробуждаются.
Происходит попытка захвата mutex (lock) ОДНИМ потоком, продолжающаяся до тех пор, пока он не будет захвачен (бывает другой поток успел раньше захватить mutex и не отпустил его еще). После захвата (lock) mutexа ОДНИМ потоком, он выходит из wait, читает данные, выходит из области видимости unique_lock, освобождая mutex (unlock). При notify_all остальные пробудившиеся потоки поочередно выходят из wait, захватывая (lock) и освобождая (unlock) mutex. - при ПУСТОЙ очереди (например, потоки-читатели успели захватить mutex (lock) ПОЗЖЕ, чем поток-писатель, уже выполнивший условие): происходит тоже самое, что и при НЕПУСТОЙ ОЧЕРЕДИ, но с одним ОТЛИЧИЕМ - потоки не находятся в очереди и их от туда удалять не нужно и делать уведомление notify_one/notify_all так же не нужно, они поочередно выходят из wait, захватывая (lock) и освобождая (unlock) mutex.
- при НЕПУСТОЙ очереди (например, потоки-читатели успели захватить mutex (lock) РАНЬШЕ, чем поток-писатель, невыполнивший еще условие), пришло уведомление:
WAIT БЕЗ ПРЕДИКАТА
Потоки-читатели работают также как и с wait с ПРЕДИКАТОМ, но с 2 отличиями:
- не нужно проверять постоянно предикат.
- потоки всегда будут попадать в очередь ожидающих потоков и извлекать их из очереди нужно с помощью notify_one/notify_all.
Атомарная операция. Операция называется атомарной, если операция выполнена целиком, либо не выполнена полностью, поэтому нет промежуточного состояние операции.
- обычная переменная number1 имеет три операции: read-modify-write, поэтому нет гарантий, что другое ядро процессора не выполняет другой операции над number1.
- std::atomic переменная number2 имеет одну операцию с lock сигналом на уровне процессора, которая блокирует шину адреса, которая обеспечивает обмен информацией между процессором и кэшами, и следовательно atomic эксклюзивно владеет этой переменной из кэша, т.е. если два потока выполняют операцию инкремента над переменной, то эта переменная гарантированно будет увеличена на 2, в отличии от обычной переменной, где между 2 потоками нет синхронизации и из-за этого они могут одновременно изменять эту переменную, поэтому переменная может быть увеличена как на 1, так и на 2.
int number1 = 0;
std::atomic<int> number2 = 0;
++number1;
/* Generated x86-64 assembly:
mov eax, DWORD PTR v1[rip]
add eax, 1
mov DWORD PTR v1[rip], eax
*/
++number2;
/* Generated x86-64 assembly:
mov eax, 1
lock xadd DWORD PTR _ZL2v2[rip], eax
*/
ХАРАКТЕРИСТИКИ:
- atomic может быть встроенный тип (int, double, char), либо это тривиальный класс/структура, которая дожна весить <= 4 или 8 байт из-за размера регистра в 4 байта для 32-битного процессора / 8 байт для 64-битного процессора. В процессоре есть только 2^n разрядные регистры, поэтому для 3/7 байтных структур не подойдет. Однако, это ограничение можно обойти с помощью выравнивания, изменив порядок типов и подложив (padding) неиспользуемые байты границам блоков памяти по 4/8 байт.
- с помощью метода is_lock_free - можно проверить возможность применения atomic к типу: true - можно использовать atomic к данному типу / false - нет, лучше использовать mutex (например, нетривиальный тип и больше > 8 байт - размер регистра).
- ожидание wait блокировки предполагается недолгим, при длительной блокировке невыгоден - пустая трата процессорных ресурсов.
- в atomic нет атомарного умножения: number2 *= 2; так можно, но это уже будет не атомарная операция: auto int = number * 2;
Тривиальный класс/структура (std::is_trivial) - занимает непрерывную область памяти, поэтому компилятор может самостоятельно выбирать способ упорядочивания членов. Характеристики: — конструктор: явно отсутствующий или явно задан как default. — копирующий конструктор: явно отсутствующий или явно задан как default. — перемещающий конструктор: явно отсутствующий или явно задан как default. — копирующий оператор присваивания: явно отсутствующий или явно задан как default. — перемещающий оператор присваивания: явно отсутствующий или явно задан как default. — деструктор: явно отсутствующий или явно задан как default.
- все поля и базовые классы — тоже тривиальные типы.
- все поля класса - не инициализированны.
- поля могут иметь разный модификатор доступа (public/protected/private).
- не имеет виртуальных методов (включая деструктор).
- не имеет виртуальных базовых типов (класс/структура).
Методы:
- is_lock_free - проверяет возможность применения atomic к типу и возвращает значение: true - можно использовать atomic к данному типу / false - нет, лучше использовать mutex (например, нетривиальный тип и больше > 8 байт - размер регистра).
- store - запись значения.
- load - возвращение значения.
- exchange - замена значения и возвращение старого значения.
- compare_exchange_weak(expected, desired) - сравнивает значение atomic с аргументом expected. Если значения совпадают, то desired записывается в atomic и возвращается true. Если значения НЕ совпадают, то в аргумент expected записывается значение atomic и возвращается false. Возможны ложные пробуждения (spurious wakeup).
- compare_exchange_strong(expected, desired) - в отличии от compare_exchange_weak включает обработку ложных срабатываний и реализован внутри как вызов compare_exchange_weak в цикле.
- wait - блокирует поток до тех пор, пока не будет уведомления notify_one/notify_all и не изменится значение. Например, wait(true) блокируется при значении true, разблокируется при значении false. wait(false) блокируется при значении false, разблокируется при значении true. Ложные пробуждения базовой реализация wait скрыты в стандартной библиотеки STL, поэтому std::wait не имеет ложных пробуждений. Внутри std::wait повторно выполняются действия по порядку:
- сравнивает значение со старым значение -> пункт 1.
- если они != то возврат из функции или -> пункт 3.
- блокируется до тех пор, пока он не будет разблокирован notify_one/notify_all или не будет разблокирован ложно -> пункт 1.
- notify_one - снятие блокировки, ожидание 1 потока. Например, 1 писатель или много писателей.
- notify_all - ожидание многих потоков. Например, много читаталей.
- fetch_add - сложение значения c atomic и возвращение старого значения.
- fetch_sub - вычитание значения из atomic и возвращение старого значения.
- fetch_and - побитовое И между значением и atomic и возвращение старого значения.
- fetch_or - побитовое ИЛИ между значением и atomic и возвращение старого значения.
- fetch_xor - побитовое исключающее ИЛИ между значением и atomic и возвращение старого значения.
Атомарная булева операция. Операция называется атомарной, если операция выполнена целиком, либо не выполнена полностью, поэтому нет промежуточного состояние операции.
Методы:
- clear - сбрасывает значение в false.
- test - возвращение значения.
- test_and_set - устанавливает значение true и возвращает предыдущее значение.
- wait - блокирует поток до тех пор, пока не будет уведомления notify_one/notify_all и не изменится значение. Например, wait(true) блокируется при значении true, разблокируется при значении false. wait(false) блокируется при значении false, разблокируется при значении true. Ложные пробуждения базовой реализация wait скрыты в стандартной библиотеки STL, поэтому std::wait не имеет ложных пробуждений. Внутри std::wait повторно выполняются действия по порядку:
- сравнивает значение со старым значение -> пункт 1.
- если они != то возврат из функции или -> пункт 3.
- блокируется до тех пор, пока он не будет разблокирован notify_one/notify_all или не будет разблокирован ложно -> пункт 1.
- notify_one - снятие блокировки, ожидание 1 потока. Например, 1 писатель или много писателей.
- notify_all - ожидание многих потоков. Например, много читаталей.
Сложные пробуждения - пробуждение потока без веской причины. Это происходит потому что между моментом сигнала от std::atomic и моментом запуска ожидающего потока, другой поток запустился и изменил условие, вызвав состояние гонки. Если поток просыпается вторым, он проиграет гонку и произойдет ложное пробуждение. После wait рекомендуют всегда проверять истинность условие пробуждения в цикле или в предикате.
ОТЛИЧИЕ от volatile:
- atomic более дорогостоящая операция, чем volatile, т.к. блокирует шину.
- volatile использует только одну модель памяти/барьер памяти - memory_order_seq_cst (самая дорогая операция), atomic может использовать 6 моделей памяти/барьеров памяти (строгие/нестрогие - relaxed/acquire/release).
- volatile не является атомарной операцией, поэтому плохо работает с потоками. Например, если два потока выполняют операцию инкремента над атомарной переменной, то эта переменная гарантированно будет увеличена на 2, в отличии от volatile переменной, где между 2 потоками нет синхронизации и из-за этого они могут одновременно изменять эту переменную, поэтому переменная может быть увеличена как на 1, так и на 2. Однако, volatile с типом bool можно применять безопасно в многопоточности (статья Андрея Александреску).
ОТЛИЧИЕ от mutex:
- atomic блокирует шину адреса и тратит меньше времени, чем mutex блокирует поток с помощью планировщика задач (Scheduler) с переводом потока в заблокированное состояние через походы в ядро процессора.
- метод is_lock_free в atomic проверяет возможность применения atomic к типу: true - можно использовать atomic к данному типу / false - нет, лучше использовать mutex (например, нетривиальный тип и больше > 8 байт - размер регистра).
Для оптимизации работы с памятью у каждого ядра имеется его личный кэш памяти, над ним стоит общий кэш памяти процессора, далее оперативная память. Задача синхронизации памяти между ядрами - поддержка консистентного представления данных на каждом ядре (в каждом потоке). Если применить строгую упорядоченность изменений памяти, то операции на разных ядрах уже не будут выполнятся параллельно: остальные ядра будут ожидать, когда одно ядро выполнит изменения данных. Поэтому процессоры поддерживают работу с памятью с менее строгими гарантиями консистентности памяти. Разработчику предоставляется выбор: гарантии по доступу к памяти из разных потоков требуются для достижения максимальной корректности и производительности многопоточной программы.
Модели памяти в std::atomic - это гарантии корректности доступа к памяти из разных потоков. По-умолчанию компилятор предполагает, что работа идет в одном потоке и код будет выполнен последовательно, но компилятор может переупорядочить команды программы с целью оптимизации. Поэтому в многопоточности требуется соблюдать правила упорядочивания доступа к памяти, что позволяет с синхронизировать потоки с определенной степенью синхронизации без использования дорогостоящего mutex.
По возрастанию строгости
enum memory_order
{
memory_order_relaxed, // Все можно делать: можно пускать вниз/вверх операцию LOAD/STORE.
memory_order_consume, // Упрощенный memory_order_acquire, гарантирует, что операции будут выполняться в правильном порядке только те, которые зависят от одних данных.
memory_order_acquire, // Можно пускать вниз операцию STORE, но нельзя операцию LOAD и никакую операцию вверх.
memory_order_release, // Можно пускать вверх операцию LOAD, но нельзя операцию STORE и никакую операцию вниз.
memory_order_acq_rel, // memory_order_acquire + memory_order_acq_rel. Можно пускать вниз операцию LOAD, но нельзя операцию STORE. Можно пускать вверх операцию STORE, но нельзя операцию LOAD. Дорогая операция, но дешевле mutex. Используется в x86/64.
memory_order_seq_cst // Установлено по-умолчанию в atomic, компилятор не может делать переупорядочивание (reordining) - менять порядок в коде. Самая дорогая операция, но дает 100% корректности.
};
Семафор - механизм синхронизации работы потоков, который может управлять доступом к общему ресурсу. В основе семафора лежит счётчик, над которым можно производить две атомарные операции: увеличение и уменьшение кол-во потоков на единицу, при этом операция уменьшения для нулевого значения счётчика является блокирующей. Служит для более сложных механизмов синхронизации параллельно работающих задач. В качестве шаблонного параметра указывается максимальное допустимое кол-во потоков. В конструкторе инициализируется счетчик - текущее допустимое кол-во потоков.
Вычислительный - могут принимать целочисленные неотрицательные значения и используются для работы с ресурсами, количество которых ограничено, либо участвуют в синхронизации параллельно исполняемых задач.
Методы:
- acquire - атомарно уменьшает счетчик на 1 - кол-во потоков, контролирующих доступ к ресурсу. Eсли при уменьшении счетчик равен 0, тогда он блокируется до тех пор, пока он не станет > 0.
- release - атомарно увеличивает счетчик на 1 - кол-во потоков, контролирующих доступ к ресурсу.
- try_acquire - пытается атомарно уменьшить счетчик на 1 - кол-во потоков, контролирующих доступ к ресурсу, но при 0 счетчик не блокируется. Возвращает значение: true - счетчик уменьшился / false - нет.
- try_acquire_for - пытается атомарно уменьшить счетчик на 1 - кол-во потоков, контролирующих доступ к ресурсу. Eсли при уменьшении счетчик равен 0, тогда он блокируется до тех пор, пока он не станет > 0 или он разблокируется через ОПРЕДЕЛЕННОЕ ВРЕМЯ. Возвращает значение: true - счетчик уменьшился / false - нет, НО МОЖЕТ возвращать ложное значение, потому что в момент вызова try_acquire_until семафор может быть уже acquire/release.
- try_acquire_until - пытается атомарно уменьшить счетчик на 1 - кол-во потоков, контролирующих доступ к ресурсу. Eсли при уменьшении счетчик равен 0, тогда он блокируется до тех пор, пока он не станет > 0 или он разблокируется после НАСТУПЛЕНИЕ МОМЕНТА ВРЕМЕНИ. Возвращает значение: true - счетчик уменьшился / false - нет, НО МОЖЕТ возвращать ложное значение, потому что в момент вызова try_acquire_until семафор может быть уже acquire/release.
- max - возвращает максимально возможное значение счетчика. В качестве шаблонного параметра указывается максимальное допустимое кол-во потоков.
Двоичный (std::binary_semaphore) - простая версия семафора, которая может иметь только два значения: 0 и 1. binary_semaphore - псевдоним using binary_semaphore = std::counting_semaphore<1>. Он используется как мьютекс с более легким интерфейсом.
Отличие от std::mutex: семафор не привязан к потокам выполнения, освобождение (release) и захват (acquire) семафора могут производиться в разных потоках, mutex должен освобождаться тем же потоком, который его захватил.
- В отличие от mutex, вычислительный семафор допускает более одного потока к ресурсу.
- В отличие от двоичного семафора, начальное состояние mutex не может быть захваченным.
Механизм синхронизации работы потоков, который может управлять доступом к общему ресурсу и позволяет блокировать любое количество потоков до тех пор, пока ожидаемое количество потоков не достигнет барьера.
Защелки нельзя использовать повторно, барьеры можно использовать повторно.
Защелка - механизм синхронизации работы потоков, который может управлять доступом к общему ресурсу. В основе лежит уменьшающийся счетчик, значение счетчика инициализируется при создании. Защелка блокирует текущие потоки до тех пор, пока другие потоки не уменьшат значение счётчика до нуля. Нет возможности увеличить или сбросить счетчик, что делает защелку одноразовым барьером.
Методы:
- count_down - атомарно уменьшает значение счётчика на определенное значение (по умолчанию 1) без блокировки потока. Когда счетчик достигает нуля, все потоки, ожидающие фиксации, разблокируются. Если значение счётчика становится отрицательным, то поведение неопределено.
- wait - блокирует текущий поток до тех пор, пока счётчик не достигнет нулевого значения. Если значение счётчика уже равно 0, то управление возвращается немедленно.
- try_wait - проверяет, достигло ли значение счетчика нуля. Возвращает значение: true - счетчик равен нулю / false - нет, НО МОЖЕТ возвращать ложное значение, потому что в момент вызова try_wait защелка может быть уже count_down.
- arrive_and_wait - атомарно уменьшает значение счётчика на определенное значение (по умолчанию 1) и блокирует текущий поток до тех пор, пока счётчик не достигнет нулевого значения. Если значение счётчика становится отрицательным, то поведение неопределено.
- max - возвращает максимально возможное значение счетчика. В конструкторе инициализируется счетчик - текущее допустимое кол-во потоков.
Барьер - механизм синхронизации работы потоков, который может управлять доступом к общему ресурсу. В основе лежит уменьшающийся счетчик, значение счетчика инициализируется при создании. Барьер блокирует потоки до тех пор, пока все потоки не уменьшат значение счётчика до нуля, как только ожидающие потоки разблокируются, значение счётчика устанавливается в начальное состояние и барьер может быть использован повторно. Методы:
- arrive - атомарно уменьшает значение счётчика на определенное значение (по умолчанию 1). Если значение счётчика становится отрицательным, то поведение не определено. Метод возвращает идентификатор фазы, который имеет тип std::arrival_token.
- wait - блокирует текущий поток до тех пор, пока указанная фаза std::arrival_token не завершится. Принимает идентификатор фазы в качестве аргумента.
- arrive_and_wait - атомарно уменьшает значение счётчика на 1 и блокирует текущий поток до тех пор, пока счётчик не обнулится. Внутри функции вызов wait(arrive()). Если значение счётчика становится отрицательным, то поведение неопределено.
- arrive_and_drop - атомарно уменьшает на 1 значение счётчика и счётчика для следующих фаз. Если значение счётчика становится отрицательным, то поведение неопределено.
- max - возвращает максимально возможное значение счетчика. В конструкторе инициализируется счетчик - текущее допустимое кол-во потоков.
Отличия std::latch от std::barrier:
- std::latch может быть уменьшен одним потоком более одного раза.
- std::latch - можно использовать один раз, std::barrier является многоразовым: как только ожидающие потоки разблокируются, значение счётчика устанавливается в начальное состояние и барьер может быть использован повторно.
- использовать при n > 10000 при ОЧЕНЬ ПРОСТЫХ операциях. Чем сложнее операции, тем быстрее выполняется параллельность.
- OpenMP все равно быстрее, поэтому лучше его использовать. Но TBB быстрее OpenMP.
Примеры:
- std::execution::seq (обычная сортировка)
{
std::vector<int> numbers = { 3, 2, 4, 5, 1 };
std::ranges::sort(std::execution::seq, numbers); // в xcode не работает
}
- std::execution::par (параллельная сортировка)
{
std::vector<int> numbers = { 3, 2, 4, 5, 1 };
std::ranges::sort(std::execution::par, numbers); // в xcode не работает
}
- std::execution::unseq (parallel + vectorized (SIMD))
{
// TODO
}
- std::execution::par_unseq (vectorized, C++20)
{
// TODO
}
Open Multi-Processing — это библиотека, используемая для многопоточности на уровне цикла. Использование параллельной версии STL (все алгоритмы внутри поддерживают OpenMP и будут выполняться параллельно), для этого нужно передать libstdc++ parallel в компилятор GCC. До цикла for будет определено кол-во ядер в системе. Код внутри for будет выделен в отдельную функцию, которая запустится в отдельном потоке (НЕ сам For!!! try catch бессмысленен - нельзя отловить исключение в отдельном потоке, break, continue, return - невозможны). В конце области видимости для каждого потока будет вызван join(). Например, для 10 ядерной системы будет запущено 10 потоков. При 10.000 итераций. Каждый поток обработает 1.000 / 10 = 1.000 элементов в контейнере.
Не подходит:
- для рекурсии (может кончиться стек). Есть исключение с использованием очереди задач #pragma omp task. При условии, что размер очереди < 255. Можно на определенном уровне стека запоминать состояния (значения переменных) и кидать в очередь. После окончания функции в рамках того же потока или другого продолжаем вызывать ту же функцию с такой же логикой. Таким развязываем рекурсию по потокам через очередь.
- не всегда обеспечивает хорошую производительность.
Подробнее: OpenMP
Threading Building Blocks - библиотека Intel, является высокоуровневой библиотекой, чем OpenMP. В TBB есть планировщих задач, который помогает лучше оптимизировать работу. Это достигается с помощью алгоритма work stealing, который реализует динамическую балансировку нагрузки. Есть функция разной сложности, какой-то поток очень быстро обработал свою очередь задач, то он возьмет часть свободных задач другого потока. В TBB самому создать поток нельзя, поэтому в каких потоках идет выполнение знать не нужно. Можно только создать задачу и отдать ее на исполнение планировщику.
Подробнее: TBB
Низкоуровневый интерфейс чтения результата, который вычислится в будущем. Работает по pull-модели, идет опрос, готовы ли данные. Позволяет не использовать std::mutex, std::condition_variable.
Методы:
- wait - блокирует текущий поток, пока promise не запишет значение. Если std::future не связан с std::promise, то поведение неопределено.
- get - возвращает результат вычисления из другого потока, если результат не готов, то блокирует текущий поток до появления результата, внутри вызывая wait. Если нужно получить один и тот же результат в разных потоках, то нужно использовать std::shared_future, иначе terminate. Может вывести исключение сохраненное в std::promise с помощью метода set_exception.
- valid - проверяет готов ли результат вычисления, если готов, то можно вызвать метод get(). Если std::future не связан с std::promise, то поведение неопределено.
- wait_for - блокирует текущий поток на ОПРЕДЕЛЕННОЕ ВРЕМЯ или пока promise не запишет значение. Возвращает future_status(ready - результат готов, timeout - время истекло, deferred - время не истекло).
- wait_until - блокирует текущий поток до НАСТУПЛЕНИЕ МОМЕНТА ВРЕМЕНИ (например, 11:15:00) или пока promise не запишет значение. Возвращает future_status(ready - результат готов, timeout - время истекло, deferred - время не истекло).
- share - создает shared_future, который помогает получить один и тот же результат в разных потоках. Может вывести исключение сохраненное в std::promise с помощью метода set_exception.
Низкоуровневый интерфейс для записи результата, который вычисляется в отедельном потоке. std::promise служит каналом связи между текущим и основным потоками. Каждый объект promise связан с объектом future - интерфейс чтения результата. Он более удобный, чем std::mutex + std::condition_variable.
Методы:
- set_value - сохраняет значение, которое можно запросить с помощью связанного объекта std::future.
- get_future - получение другого канал передачи результата - std::future.
- set_exception - можно сохранить указатель на исключение (exception_ptr) и вывести исключение с помощью std::future метода get.
- set_value_at_thread_exit и set_exception_at_thread_exit() - сохраняют значение или исключение после завершения потока аналогично тому, как работает std::notify_all_at_thread_exit.
Оборачивает функцию и записывает результат вычисления функции сам вместо std::promise. Работает как std::function + std::promise и более удобный, чем std::promise.
Методы:
- get_future - получение другого канал передачи результата - std::future.
- make_ready_at_thread_exit - позволяет дождаться полного завершения потока перед тем, как привести std::future в состояние готовности.
- reset - очищает результаты предыдущего запуска задачи.
Высокоуровневый интерфейс, внутри которого находится std::promise и Threadpool для создания потоков. Аналог std::packaged_task, возвращает std::future, куда передается результат вычислений.
Если результат объекта std::future работы std::async не сохранять, тогда деструктор std::future заблокируется до тех пор, пока асинхронная операция не будет завершена, т.е. std::async будет выполняться не асинхронно, а просто последовательно в однопоточном режиме. Это легко упустить из виду, когда не требуется возвращаемое значение.
При сохранении результата объекта std::future, вызов деструктора объекта std::future будет отложен, он будет разрушен при выходе из стека. Внутри деструктора std::future вызывается std::get и дожидается окончания асинхронного результата. Деструкторы объектов std::future, полученных не из std::async, не блокируют поток.
Стратегии запуска:
- std::launch::async - другой поток.
- std::launch::deferred - текущий поток.
- по умолчанию std::async выберет стратегию в зависимости от загруженности потоков, но лучше на это не полагаться.
Корутина - функция с несколькими точками входа и выхода, из нее можно выйти середине, а затем вернуться в нее и продолжить исполнение. По сути это объект, который может останавливаться и возобновляться. Является более простым аналогом future.then, где then осуществляет запуск цепочки выполнения в будущем последовательных асинхронных операций для вычисления промежуточных результатов.
Пример — программы, выполняющие много операций ввода-вывода. Пример, веб-сервер, который пока данные не будут переданы по сети или получены, он ждёт. Если реализовать веб-сервер обычным способом, то на каждого клиента будет отдельный поток. В нагруженных серверах это будет означать тысячи потоков. Эти потоки по большей части приостанавливаются и ждут, нагружая операционную систему переключением контекстов. При использовании корутины поток приостанавливает выполнение задачи, сохранив текущее состояние, и начинает выполнять другие задачи, а затем может вернуться в предыдущую задачу продолжить ее исполнение.
ХАРАКТЕРИСТИКИ:
- stackfull - держат свой стек в памяти на протяжении всего времени жизни корутины. В STL стандартах 20/23 корутины не работают.
- stackless - не сохраняет свой стек в памяти на протяжении всего времени жизни корутины, а только во время непосредственной работы. При этом стек аллоцируется в вызывающем корутину контексте.
C++20 stl: stackless, userserver (yandex): stackfull.
Методы:
- co_await — для прерывания функции и последующего продолжения.
- co_yield — для прерывания функции с одновременным возвратом результата. Это синтаксический сахар для конструкции с co_await.
- co_return — для завершения работы функции.
Лекция 5. Multithreading in C++ (потоки, блокировки, задачи, атомарные операции, очереди сообщений)
Лекция 9. OpenMP и Intel TBB
Модель памяти C++ - Андрей Янковский
C++ Russia 2018: Иван Пузыревский, Асинхронность в программировании
ШБР 2023 — Асинхронное программирование (C++)
Изучение С++ {#28}. Многопоточность. Задачи. std::async. Уроки C++.
C++11 STL: Future - Ожидание события (Многопоточность)
Multithreading
Субъективный объективизм
shared_mutex
difference between std::mutex and std::shared_mutex
Does std::shared_mutex favor writers over readers?
why std::lock() supports deallock avoidence but std::try_lock() does not?
std::atomic. Модель памяти C++ в примерах
std::conditional_variable и std::atomic_flag в С++20
std::condition_variable::notify_all() - I need an example
How std::atomic wait operation works?
Spurious wakeup with atomics and condition_variables