# C++11 线程
# 函数接口
# 线程 id 的获取
std:: thread:: id get_id ( ) const noexcept ; std:: thread:: id get_id ( ) noexcept ;
void foo ( ) { this_thread:: sleep_for ( chrono:: seconds ( 1 ) ) ; } int main ( ) { thread t1 ( foo) ; thread:: id t1_id = t1. get_id ( ) ; thread t2 ( foo) ; thread:: id t2_id = t2. get_id ( ) ; cout << "t1 的 id: " << t1_id << '\n' ; cout << "t2 的 id: " << t2_id << '\n' ; t1. join ( ) ; t2. join ( ) ; cout << "合并后 t1 的 id: " << t1. get_id ( ) << '\n' ; cout << "合并后 t2 的 id: " << t2. get_id ( ) << '\n' ; }
# 线程的等待
void foo ( ) { this_thread:: sleep_for ( std:: chrono:: seconds ( 1 ) ) ; } void bar ( ) { this_thread:: sleep_for ( std:: chrono:: seconds ( 1 ) ) ; } int main ( ) { cout << "启动第一助手...\n" ; thread helper1 ( foo) ; cout << "启动第二助手...\n" ; thread helper2 ( bar) ; cout << "等待助手结束..." << endl; helper1. join ( ) ; helper2. join ( ) ; std:: cout << "完成!\n" ; }
# 线程入口函数的传参
# 普通函数
void threadFunc ( int x) { cout << "void threadFunc(int)" << endl; cout << "child thread id = " << std:: this_thread:: get_id ( ) << endl; cout << "x = " << x << endl; } int main ( ) { cout << "main thread id = " << std:: this_thread:: get_id ( ) << endl; thread th ( threadFunc, 1 ) ; cout << "child1 thread id = " << th. get_id ( ) << endl; th. join ( ) ; return 0 ; }
# 函数指针
void threadFunc ( int x) { cout << endl << "void threadFunc(int)" << endl; cout << "child thread id = " << std:: this_thread:: get_id ( ) << endl; cout << "x = " << x << endl; } int main ( ) { cout << "main thread id = " << std:: this_thread:: get_id ( ) << endl; typedef void ( * pFunc) ( int ) ; pFunc f = & threadFunc; thread th ( f, 2 ) ; cout << "child1 thread id = " << th. get_id ( ) << endl; th. join ( ) ; return 0 ; }
# 函数引用
void threadFunc ( int x) { cout << endl << "void threadFunc(int)" << endl; cout << "child thread id = " << std:: this_thread:: get_id ( ) << endl; cout << "x = " << x << endl; } int main ( ) { cout << "main thread id = " << std:: this_thread:: get_id ( ) << endl; typedef void ( & pFunc) ( int ) ; pFunc f = threadFunc; thread th ( f, 3 ) ; cout << "child1 thread id = " << th. get_id ( ) << endl; th. join ( ) ; return 0 ; }
# 函数对象
class Example { public : void operator ( ) ( int x) { cout << endl << "void threadFunc(int)" << endl; cout << "child thread id = " << std:: this_thread:: get_id ( ) << endl; cout << "x = " << x << endl; } } ; int main ( ) { cout << "main thread id = " << std:: this_thread:: get_id ( ) << endl; Example ex; thread th ( ex, 4 ) ; cout << "child1 thread id = " << th. get_id ( ) << endl; th. join ( ) ; return 0 ; }
# lambda 表达式
int main ( ) { cout << "main thread id = " << std:: this_thread:: get_id ( ) << endl; thread th ( [ ] ( int x) { cout << endl << "void threadFunc(int)" << endl; cout << "child thread id = " << std:: this_thread:: get_id ( ) << endl; cout << "x = " << x << endl; } , 5 ) ; cout << "child1 thread id = " << th. get_id ( ) << endl; th. join ( ) ; return 0 ; }
# function 对象
int main ( ) { cout << "main thread id = " << std:: this_thread:: get_id ( ) << endl; function< void ( int ) > f = [ ] ( int x) { cout << endl << "void threadFunc(int)" << endl; cout << "child thread id = " << std:: this_thread:: get_id ( ) << endl; cout << "x = " << x << endl; } ; thread th ( f, 6 ) ; cout << "child1 thread id = " << th. get_id ( ) << endl; th. join ( ) ; return 0 ; }
# bind
的返回结果
void threadFunc ( int x) { cout << endl << "void threadFunc(int)" << endl; cout << "child thread id = " << std:: this_thread:: get_id ( ) << endl; cout << "x = " << x << endl; } int main ( int argc, char * argv[ ] ) { cout << "main thread id = " << std:: this_thread:: get_id ( ) << endl; thread th ( bind ( threadFunc, 7 ) ) ; cout << "child1 thread id = " << th. get_id ( ) << endl; th. join ( ) ; return 0 ; }
# C++11 锁
# std::mutex
锁
int gCnt = 0 ; mutex _mtx; void threadFunc ( ) { for ( size_t idx = 0 ; idx != 10000000 ; ++ idx) { _mtx. lock ( ) ; ++ gCnt; _mtx. unlock ( ) ; } } int main ( ) { thread tha ( threadFunc) ; thread thb ( threadFunc) ; tha. join ( ) ; thb. join ( ) ; cout << "gCnt = " << gCnt << endl; return 0 ; }
使用 std::mutex
对共享资源进行加锁,但是 std::mutex
使用上具备隐患,上锁与解锁一定要成对出现,不然就卡死了。
# 封装 MutexLock
int gCnt = 0 ; mutex mtx; class MutexLock { public : explicit MutexLock ( mutex & mutx) : _mutx ( mutx) { _mutx. lock ( ) ; } ~ MutexLock ( ) { _mutx. unlock ( ) ; } private : mutex & _mutx; } ; void threadFunc ( ) { for ( size_t idx = 0 ; idx != 10000000 ; ++ idx) { MutexLock autoLock ( mtx) ; ++ gCnt; } }
自己封装 MutexLock
类,在该类的构造函数中进行上锁,在析构函数中进行解锁,利用了栈对象的生命周期管理资源,这样我们在使用的时候,直管上锁就不管解锁,降低了卡死风险,更加安全。
# lock_guard
锁
int gCnt = 0 ; mutex mtx; void threadFunc ( ) { for ( size_t idx = 0 ; idx != 10000000 ; ++ idx) { { lock_guard< mutex> lg ( mtx) ; ++ gCnt; } } }
# unique_lock
锁
int gCnt = 0 ; mutex mtx; void threadFunc ( ) { for ( size_t idx = 0 ; idx != 10000000 ; ++ idx) { unique_lock< mutex> ul ( mtx) ; ++ gCnt; ul. unlock ( ) ; ul. lock ( ) ; ul. unlock ( ) ; } }
lock_guard
和 unique_lock
都是 C++ 标准库中的线程同步工具,用于管理互斥锁(mutex)的所有权,它们都遵循 RAII(Resource Acquisition Is Initialization)原则,即资源的获取即是初始化。这意味着在对象的构造函数中获取锁,在对象的析构函数中释放锁,确保即使发生异常也能正确释放锁,防止死锁。
lock_guard
的使用非常简单,一旦构造了一个 lock_guard
对象,它就会立即锁定传给它的互斥锁,并且在 lock_guard
对象的生命周期结束时自动释放这个锁。由于 lock_guard
不支持提前释放锁,因此它的使用场景比较有限,仅适用于需要在整个作用域内持有锁的情况。
与 lock_guard
相比, unique_lock
提供了更多的灵活性。它不仅能够在构造函数中锁定互斥锁,还能在不需要持有锁时显式地解锁,或者在已经锁定的情况下延迟锁定。这使得 unique_lock
可以在更复杂的场景中使用,例如在某个操作完成后提前释放锁,或者在异常发生时保持锁的状态不变。
由于 unique_lock
提供了更多的功能,它的实现相对复杂一些,因此在某些情况下可能比 lock_guard
有更低的效率。然而,这种效率差异通常很小,而且 unique_lock
的灵活性使其成为许多复杂同步问题的首选解决方案。
# atomic
原子数据类型
atomic< int > gCnt ( 0 ) ; void threadFunc ( ) { for ( size_t idx = 0 ; idx != 10000000 ; ++ idx) { ++ gCnt; } } int main ( ) { thread tha ( threadFunc) ; thread thb ( threadFunc) ; tha. join ( ) ; thb. join ( ) ; cout << "gCnt = " << gCnt << endl; return 0 ; }
std::atomic
是 C++11 引入的原子操作库,它提供了一种操作方式,可以保证在多线程环境下,对共享数据的修改是安全的,不会因为多个线程同时访问而产生数据竞争(race condition)或一致性问题。
std::atomic
类模板及其相关操作的背后原理之一是 CAS(Compare-and-Swap)机制。CAS 是一种常用的原子操作,用于实现无锁的线程安全编程。其基本操作流程如下:
比较 (Compare):
线程会读取一个变量的当前值(记为 V),并将其与一个预期值(记为 A)进行比较。
交换 (Swap):
如果当前值 V 与预期值 A 相等,说明在读取值和比较期间,没有其他线程修改该变量,那么线程就可以通过原子操作将变量的值设置为新值 B。
失败处理 :
如果当前值 V 与预期值 A 不相等,说明在读取值和比较期间,有其他线程已经修改了该变量,因此当前线程不能简单地将变量值设置为 B。在这种情况下,线程需要根据具体的场景采取适当的失败处理策略,比如重试操作、自旋等待或者放弃操作。
CAS 操作通常由底层硬件支持,以确保在多线程环境中,即使多个线程同时尝试修改同一个变量,也能保证操作的原子性。这意味着在任何时刻,只有一个线程能够成功地修改变量的值。
# 生产者与消费者
# 概述
生产者与消费者问题,是一个经典的常规问题,其实也是线程问题。可以把生产者看成是一类线程,消费者看成是另一类线程,也就是 C++11 线程库中的 std::thread
。因为生产者与消费者需要从共享的仓库 中存数据或者取数据,涉及到对仓库的互斥访问,所以需要加锁,也就是 std::mutex
。这里我们把仓库 用一个任务队列 TaskQueue
进行封装,提供互斥锁、条件变量的基本数据成员,当然任务队列里面也就 是基本操作,队列是不是满的,是不是空的,存数据与取数据等基本操作。
# 原理图
# 类图
classDiagram direction LR class Producer { + produce(taskQue: TaskQueue &) void } class Consumer { + comsume(taskQue: TaskQueue &) void } class TaskQueue { - _capacity: size_t - _que: queue~int~ - _mutex: mutex - _notFull: condition_variable - _notEmpty: condition_variable + TaskQueue(capa: size_t) + ~TaskQueue( ) + push(value: const int &) void + pop( ) int + full( ) bool + empty( ) bool } Producer ..|> TaskQueue Consumer ..|> TaskQueue queue --* TaskQueue mutex --* TaskQueue condition_variable --* TaskQueue
# 代码
Consumer.h # ifndef CONSUMER_H_ # define CONSUMER_H_ class TaskQueue ; class Consumer { public : Consumer ( ) = default ; ~ Consumer ( ) = default ; void Consume ( TaskQueue& taskQue) ; } ; # endif
Consumer.cpp # include "Consumer.h" # include "TaskQueue.h" # include <thread> # include <iostream> using std:: cout; using std:: endl; * @brief 消费者消费任务 * @param taskQue 任务队列 * @return void */ void Consumer :: Consume ( TaskQueue & taskQue) { int cnt = 20 ; while ( cnt-- ) { int num = taskQue. pop ( ) ; cout << ">>Consumer consume = " << num << endl; std:: this_thread:: sleep_for ( std:: chrono:: seconds ( 1 ) ) ; } }
Producer.h # ifndef PRODUCER_H_ # define PRODUCER_H_ class TaskQueue ; class Producer { public : Producer ( ) = default ; ~ Producer ( ) = default ; void produce ( TaskQueue & taskQue) ; } ; # endif
Producer.cpp # include "Producer.h" # include "TaskQueue.h" # include <cstdlib> # include <ctime> # include <thread> # include <iostream> using std:: cout; using std:: endl; * @brief 生产者生产数据 * @param taskQue 任务队列 * @return void */ void Producer :: produce ( TaskQueue & taskQue) { :: srand ( :: clock ( ) ) ; int cnt = 20 ; while ( cnt-- ) { int num = :: rand ( ) % 100 ; taskQue. push ( num) ; cout << ">>Producer produce num = " << num << endl; std:: this_thread:: sleep_for ( std:: chrono:: seconds ( 1 ) ) ; } }
TaskQueue.h # ifndef TASKQUEUE_H_ # define TASKQUEUE_H_ # include <queue> # include <mutex> # include <condition_variable> using std:: queue; using std:: mutex; using std:: condition_variable; class TaskQueue { public : TaskQueue ( size_t capa) ; ~ TaskQueue ( ) = default ; void push ( const int & value) ; int pop ( ) ; bool empty ( ) ; bool full ( ) ; private : size_t _capacity; queue< int > _que; mutex _mutex; condition_variable _notEmpty; condition_variable _notFull; } ; # endif
TaskQueue.cpp # include "TaskQueue.h" using std:: unique_lock; * @brief 构造函数 * @param capa 队列容量 * @return void */ TaskQueue :: TaskQueue ( size_t capa) : _capacity ( capa) , _mutex ( ) , _notEmpty ( ) , _notFull ( ) { } * @brief 入队 * @param value */ void TaskQueue :: push ( const int & value) { unique_lock< mutex> lock ( _mutex) ; while ( full ( ) ) { _notFull. wait ( lock) ; } _que. push ( value) ; _notEmpty. notify_one ( ) ; } * @brief 出队 * @return tmp 队头元素 */ int TaskQueue :: pop ( ) { unique_lock< mutex> lock ( _mutex) ; while ( empty ( ) ) { _notEmpty. wait ( lock) ; } int tmp = _que. front ( ) ; _que. pop ( ) ; _notFull. notify_one ( ) ; return tmp; } * @brief 判空 * @return bool 队列是否为空 */ bool TaskQueue :: empty ( ) { return _que. empty ( ) ; } * @brief 判满 * @return bool 队列是否为满 */ bool TaskQueue :: full ( ) { return _capacity == _que. size ( ) ; }
TestPC.cpp # include "Producer.h" # include "Consumer.h" # include "TaskQueue.h" # include <iostream> # include <thread> using std:: cout; using std:: endl; using std:: thread; int main ( ) { Producer pr; Consumer co; TaskQueue taskQue ( 10 ) ; thread pro ( & Producer:: produce, & pr, std:: ref ( taskQue) ) ; thread con ( & Consumer:: Consume, & co, std:: ref ( taskQue) ) ; pro. join ( ) ; con. join ( ) ; return 0 ; }
# 线程池
# 线程池基础概述
为什么要有线程池?假设没有使用线程池时,一个请求用一个子线程来处理。每来一个请求,都得创建子线程,子线程执行请求,关闭子线程。当请求量(并发)比较大的时候,频繁地创建和关闭子线程, 也是有开销的。因此提出线程池,提前开辟好 N 个子线程,当有任务过来的时候,先放到任务队列中, 之后 N 个子线程从任务队列中获取任务,并执行,这样能大大提高程序的执行效率。其实当任务数大于线程池中子线程的数目的时候,就需要将任务放到缓冲区(队列)里面,所以本质上还是一个生产者消费者模型。
# 线程池的原理图
# 面向对象线程池
# 类图
classDiagram class ThreadPool { - _threadNum : size_t - _threads : vector ~thread~ - _queSize : size_t - _taskQue : TaskQueue - _isExit : bool + ThreadPool(threadNum : size_t, queSize : size_t) + ~ThreadPool( ) + start( ) void + stop( ) void + addTask(ptask: Task *) void - getTask( ) Task - doTask( ) void } note for ThreadPool "线程池类的作用就是在任务到来之后动多个子线程,等待任务的 到来,然后只要任务队列中有任务·那么线程池中的子线程就可以 一直执行任务。那么就需要线程类设置开启子线程数目的数据 成员 int_threadNum,以及存放子线程的容器 vector<thread> _thrads; 其次还需要有一个存储任务的数据结构,然后子线程 (也就是工作线程)从其中取任务,因为线程池中的线程是并发 的去取任务,所以需要对该数据结构进行上锁,同时工作线程如果 发现任务队列为空,还需要处于睡眠,这不正好就是生产者消者 模型中的 TaskQueue 类,所以就添加任务队列大小的数据成员 size_t _queSize以及 TaskQueue _taskQue 对象,除此之外 还可以添加线程池有没有退出来的标记位,这样就可以轻松通过 该标志位判断线程池是否退出了" note for ThreadPool "线程池涉及到开始与结束,也就是开启线程以及将线程进行回收 所以设置了 start 函数与 stop 函数,然后就是对于任务的处理 添加任务与获取任务,对于任务,因为不知道到底执行哪类操作 为了让代码具备通用性,我们设置了抽象类 Task 以及需要执行 任务的纯虚函数 process,然后具体的任务继承 Task 即可。 所以添加任务是 Task * 而不是 Task 对象,最后就是线程池 交给工作线程执行的任务 doTask 函数 " class Task { + process( ) void } class MyTask1 { + process( ) void } class MyTask2 { + process( ) void } Task ..> ThreadPool MyTask1 --|> Task MyTask2 --|> Task class ThreadThread --* ThreadPool class TaskQueue { - _Capacity : size_t - _que : queue~ElemType~ - _notFull : condition_variable - _notEmpty : condition_variable - _flag : bool + TaskQueue(capa : size_t) + ~TaskQueue( ) + push(ptask: ElemType) void + pop( ) ElemType + empty( ) bool + full( ) bool + wakeup( ) void } class queueclass mutexclass condition_variablequeue --* TaskQueue mutex --* TaskQueue condition_variable --* TaskQueue note for TaskQueue "using ElemType = Task *"
# 核心代码
class ThreadPool { public : ThreadPool ( size_t threadNum, size_t queSize) ; ~ ThreadPool ( ) ; void start ( ) ; void stop ( ) ; void addTask ( Task * ptask) ; private : Task * getTask ( ) ; void doTask ( ) ; private : size_t _threadNum; vector< thread> _threads; size_t _queSize; TaskQueue _taskQue; bool _isExit; } ;
void ThreadPool :: start ( ) { for ( size_t idx = 0 ; idx != _threadNum; ++ idx) { _threads. push_back ( thread ( & ThreadPool:: doTask, this ) ) ; } }
void ThreadPool :: doTask ( ) { while ( ! _isExit) { Task * ptask = getTask ( ) ; if ( ptask) { ptask-> process ( ) ; } else { cout << "nullptr == ptask" << endl; } } }
# 问题一:任务执行不完,程序就退出?
线程池启动之后,会创建工作线程,并且启动工作线程,也就是说工作线程会执行线程入口函数 doTask
,如果主线程不添加任务,那么工作线程会处于睡眠状态,但是如果主线程执行了 addTask
,添加了任务,那么子线程(工作线程)会被唤醒,那么主线程与子线程都在运行。但是如果主线程添加任务的速度比较快,那么很快就会执行到 stop
函数,那么如果子线程的任务还没有执行完,主线程就已经进入了 stop
函数,那么会将标志位 _isExit
设置为 true
,那么子线程可能就拿不到剩余的任务,就会被退出,所以就看到了任务没有执行完,程序就退出来了。
解决方案:在 stop
函数中,也就是即使主线程执行的要快,那么添加一个判断,只要任务执行不完,主线程就不继续向下执行,也就是添加 _taskQue.empty()
操作,可以保证任务执行完毕。
void ThreadPool :: stop ( ) { while ( ! _taskQue. empty ( ) ) { std:: this_thread:: sleep_for ( std:: chrono:: seconds ( 1 ) ) ; } _isExit = true ; for ( auto & th : _threads) { th. join ( ) ; } }
# 问题二:任务执行完毕,程序无法退出?
原因:工作线程在执行 doTask
的时候,将最后一个任务通过 getTask
取出来之后,这个时候子线程会继续执行任务,也就是执行 process
函数,但是主线程也会继续从 stop
继续向下执行,也就是 stop
中的 while
循环不满足条件,那么如果主线程执行的要快一些,可以很快的将标志位 _isExit
设置为 true
,也是就子线程执行 process
比较慢,那么程序就可以退出来;但是如果子线程执行 process
的速率比较快,主线程还没有来得及将标志位 _isExit
设置为 true
,那么子线程会继续进入到 doTask
的循环中,继续拿任务但是任务队列已经为空了,那么子线程就会再次处于睡眠状态
解决方案:如果子线程执行的比较慢,程序不会有问题;但是如果子线程跑的快,最终的结果是子线程会在 _notEmpty
条件变量上睡眠,可以让主线程在回收之前将子线程全部唤醒后再回收
void ThreadPool :: stop ( ) { while ( ! _taskQue. empty ( ) ) { std:: this_thread:: sleep_for ( std:: chrono:: seconds ( 1 ) ) ; } _isExit = true ; _taskQue. wakeup ( ) ; for ( auto & th : _threads) { th. join ( ) ; } }
然后就需要在 TaskQueue
中添加函数 wakeup
。但是发现唤醒之后,非空条件上的线程依旧睡眠。所以需要添加标志位。
void TaskQueue :: wakeup ( ) { _flag = false ; _notEmpty. notify_all ( ) ; } ElemType TaskQueue :: pop ( ) { unique_lock< mutex> ul ( _mutex) ; while ( empty ( ) && _flag) { _notEmpty. wait ( ul) ; } if ( _flag) { ElemType tmp = _que. front ( ) ; _que. pop ( ) ; _notFull. notify_one ( ) ; return tmp; } else { return nullptr ; } }
# 基于对象的线程池
# 类图
classDiagram class ThreadPool { - _threadNum : size_t - _threads : vector ~thread~ - _queSize : size_t - _taskQue : TaskQueue - _isExit : bool + ThreadPool(threadNum : size_t, queSize : size_t) + ~ThreadPool( ) + start( ) void + stop( ) void + addTask(taskcb: Task &&) void - getTask( ) Task - doTask( ) void } note for ThreadPool "线程池类的作用就是在任务到来之后动多个子线程,等待任务的 到来,然后只要任务队列中有任务·那么线程池中的子线程就可以 一直执行任务。那么就需要线程类设置开启子线程数目的数据 成员 int_threadNum,以及存放子线程的容器 vector<thread> _thrads; 其次还需要有一个存储任务的数据结构,然后子线程 (也就是工作线程)从其中取任务,因为线程池中的线程是并发 的去取任务,所以需要对该数据结构进行上锁,同时工作线程如果 发现任务队列为空,还需要处于睡眠,这不正好就是生产者消者 模型中的 TaskQueue 类,所以就添加任务队列大小的数据成员 size_t _queSize以及 TaskQueue _taskQue 对象,除此之外 还可以添加线程池有没有退出来的标记位,这样就可以轻松通过 该标志位判断线程池是否退出了" note for ThreadPool "线程池涉及到开始与结束,也就是开启线程以及将线程进行回收 所以设置了 start 函数与 stop 函数,然后就是对于任务的处理 添加任务与获取任务,对于任务,因为不知道到底执行哪类操作 为了让代码具备通用性,我们设置了抽象类 Task 以及需要执行 任务的纯虚函数 process,然后具体的任务继承 Task 即可。 所以添加任务是 Task * 而不是 Task 对象,最后就是线程池 交给工作线程执行的任务 doTask 函数 " note for Task "using Task = function ~void()~" class MyTask { + process(x: int) void } MyTask --|> Task class ThreadThread --* ThreadPool class TaskQueue { - _Capacity : size_t - _que : queue~ElemType~ - _notFull : condition_variable - _notEmpty : condition_variable - _flag : bool + TaskQueue(capa : size_t) + ~TaskQueue( ) + push(task: ElemType &&) void + pop( ) ElemType + empty( ) bool + full( ) bool + wakeup( ) void } class queueclass mutexclass condition_variablequeue --* TaskQueue mutex --* TaskQueue condition_variable --* TaskQueue note for TaskQueue "using ElemType = function ~void()~" TaskQueue --* ThreadPool
# 核心代码
# 添加任务
unique_ptr< MyTask> ptask ( new MyTask ( ) ) ; ThreadPool pool ( 4 , 10 ) ; pool. start ( ) ; int cnt = 20 ; while ( cnt-- ) { pool. addTask ( bind ( & MyTask:: process, ptask. get ( ) ) ) ; cout << "cnt = " << cnt << endl; } pool. stop ( ) ;
void ThreadPool :: doTask ( ) { while ( ! _isExit) { Task taskcb = getTask ( ) ; if ( taskcb) { taskcb ( ) ; } else { cout << "nullptr == ptask" << endl; } } }