C++线程池

C++线程池记录

Posted by geek_li on May 28, 2020

在开始之前简单说明下,线程、线程池
线程,属于进程的一部分,需求较低的程序都只是在单线程的执行顺序,也就是一步接着一步执行代码
当需求有一定的复杂度的时候,就需要启动多线程(12306事件在那时候闹的还挺热闹的)
例子:当你在双十一的时候,肯定是同一时间全国会有几十万的人去访问同一个窗口,抢东西……
这就涉及了一个多线程的问题,单纯的多线程是在在同时段开启多个线程跑程序
要做到满足商城同时段内成百上千的访问,还能保证系统的稳如泰山,就看多线程管理和管理策略

怎样的场合需要多线程

    目前的大多数网络服务器,包括Web服务器、Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短。传统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务。任务执行完毕后,线程退出,这就是是“即时创建,即时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态。我们将传统方案中的线程执行过程分为三个过程:T1、T2、T3。

    T1:线程创建时间;     T2:线程执行时间,包括线程的同步等时间;
    T3:线程销毁时间;

    那么我们可以看出,线程本身的开销所占的比例为(T1+T3) / (T1+T2+T3)。如果线程执行的时间很短的话,这比开销可能占到20%-50%左右。如果任务执行时间很长的话,这笔开销将是不可忽略的。

    除此之外,线程池能够减少创建的线程个数。通常线程池所允许的并发线程是有上界的,如果同时需要并发的线程数超过上界,那么一部分线程将会等待。而传统方案中,如果同时请求数目为2000,那么最坏情况下,系统可能需要产生2000个线程。尽管这不是一个很大的数目,但是也有部分机器可能达不到这种要求。

    因此线程池的出现正是着眼于减少线程本身带来的开销。线程池采用预创建的技术,在应用程序启动之后,将立即创建一定数量的线程(N1),放入空闲队列中。这些线程都是处于阻塞(Suspended)状态,不消耗CPU,但占用较小的内存空间。当任务到来后,缓冲池选择一个空闲线程,把任务传入此线程中运行。当N1个线程都在处理任务后,缓冲池自动创建一定数量的新线程,用于处理更多的任务。在任务执行完毕后线程也不退出,而是继续保持在池中等待下一次的任务。当系统比较空闲时,大部分线程都一直处于暂停状态,线程池自动销毁一部分线程,回收系统资源。基于这种预创建技术,线程池将线程创建和销毁本身所带来的开销分摊到了各个具体的任务上,执行次数越多,每个任务所分担到的线程本身开销则越小,不过我们另外可能需要考虑进去线程之间同步所带来的开销。

     事实上,线程池并不是万能的。它有其特定的使用场合。线程池致力于减少线程本身的开销对应用所产生的影响,这是有前提的,前提就是线程本身开销与线程执行任务相比不可忽略。如果线程本身的开销相对于线程任务执行开销而言是可以忽略不计的,那么此时线程池所带来的好处是不明显的,比如对于FTP服务器以及Telnet服务器,通常传送文件的时间较长,开销较大,那么此时,我们采用线程池未必是理想的方法,我们可以选择“即时创建,即时销毁”的策略。 总之线程池通常适合下面的几个场合=======》

(1) 单位时间内处理任务频繁而且任务处理时间短;  
(2) 对实时性要求较高。如果接受到任务后在创建线程,可能满足不了实时要求,因此必须采用线程池进行预创建。  
  • 代码

class ThreadPool {
 
public:
    ThreadPool(size_t);                          
    template<class F, class... Args>            
    auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;//任务入队
    ~ThreadPool();                              
 
private:
    std::vector< std::thread > workers;           
    std::queue< std::function<void()> > tasks;      
 
    std::mutex queue_mutex;                       
    std::condition_variable condition;            
    bool stop;                                    
};
 
inline ThreadPool::ThreadPool(size_t threads) : stop(false){
 
    for(size_t i = 0; i<threads; ++i)
        workers.emplace_back(
            [this]
            {
                for(;;)
                {
                    // task是一个函数类型,从任务队列接收任务
                    std::function<void()> task;  
                    {
                        //给互斥量加锁,锁对象生命周期结束后自动解锁
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        
                        //当匿名函数返回false时才阻塞线程,阻塞时自动释放锁。
                        //当匿名函数返回true且受到通知时解阻塞,然后加锁。
                        this->condition.wait(lock,[this]{ return this->stop || !this->tasks.empty(); });
                       
                         if(this->stop && this->tasks.empty())
                            return;
                        
                        //从任务队列取出一个任务
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }                            // unlock
                    task();                      // start Mission
                }
            }
        );
}
 
// 添加新的任务到任务队列
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) 
    -> std::future<typename std::result_of<F(Args...)>::type>
{
    // 获取函数返回值类型        
    using return_type = typename std::result_of<F(Args...)>::type;
 
    // 创建一个指向任务的只能指针
    auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        
    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queue_mutex);  //加锁
        if(stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");
 
        tasks.emplace([task](){ (*task)(); });          //把任务加入队列
    }                                                   //自动解锁
    condition.notify_one();                             //通知条件变量,唤醒一个线程
    return res;
}
 
// 析构函数,删除所有线程
inline ThreadPool::~ThreadPool()
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for(std::thread &worker: workers)
        worker.join();
}

  • 使用

ThreadPool pool(4);

// enqueue and store future
auto result = pool.enqueue([](int answer) { return answer; }, 42);

// get result from future, print 42
std::cout << result.get() << std::endl; 

  • another
void takeIt()
{
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    std::cout<<"worker thread ID:"<<std::this_thread::get_id()<<std::endl;
}

int main()
{
    ThreadPool pool(4);
    while(true)
    {
       pool.enqueue(fun);
    }
}

说明

inline ThreadPool::ThreadPool(size_t threads) : stop(false){},这个函数向线程队列worker插入threads个线程,每个线程对象用一个匿名函数lambda来初始化,每个匿名函数会一直循环着从任务队列中取出任务来执行。首先对互斥量加锁,获得互斥锁后,调用条件变量的wait()函数等待条件发生,传入wait()函数的第二个参数为一个匿名函数lambda,当函数返回值为false时,wait会使得该线程阻塞(任务队列为空时会阻塞),并且对互斥量进行解锁。当函数返回值为true(任务队列不为空)并且收到条件变量的通知后,wait函数使线程解阻塞,并且对互斥量加锁。接着从任务队列里面弹出一个任务,对互斥量自动解锁,并执行这个任务。

    接着我们看添加任务的函数:auto ThreadPool::enqueue(F&& f, Args&&… args)-> std::future<typename std::result_of<F(Args…)>::type>{},这个函数向任务队列中加入一个任务。它首先创建一个名为task的智能指针,接着对互斥量加锁,然后把该任务加入到任务队列中,对互斥量自动解锁。调用条件变量的notify_one()函数,使得阻塞到这个条件变量的线程解阻塞。

    总结:初始化时,线程中的每个函数都会阻塞到条件变量那里,当任务队列中新加入一个任务时,通知阻塞到条件变量的某一个线程,接着这个线程执行:互斥量加锁——>任务队列出队——>互斥量解锁——>执行任务。当线程执行完任务之后,如果任务队列不为空,则继续从任务队列那里取出任务执行,如果任务队列为空则阻塞到条件变量那里。

  (1) std::result_of<func()>::type:获取func()函数返回值的类型;

  (2) typename:告诉编译器,typename后面的字符串为一个类型名称,而不是成员函数或者成员变量;

  (3) 匿名函数lambda: 允许incline函数的定义被当做一个参数,用法为:->type{},中括号内指定了传递方式(值传递或引用传递),小括号内指明函数的形参,->type指定了返回值类型,花括号内指定了函数体。

  (4) std::funtion<void()> :声明一个函数类型,接收任意原型是void()的函数、或函数对象、或是匿名函数。void() 意思是不带参数,没有返回值。

  (5) std::unique_lock<> : C++多线程编程中通常会对共享的数据进行写保护,以防止多线程在对共享数据成员进行读写时造成资源争抢导致程序出现未定义的行为。通常的做法是在修改共享数据成员的时候进行加锁–mutex。在使用锁的时候通常是在对共享数据进行修改之前进行lock操作,在写完之后再进行unlock操作,进场会出现由于疏忽导致由于lock之后在离开共享成员操作区域时忘记unlock,导致死锁。针对以上的问题,C++11中引入了std::unique_lock与std::lock_guard两种数据结构。通过对lock和unlock进行一次薄的封装,实现对象生命周期结束后自动解锁的功能,参考。用法如下=====》

    1)新建一个unique_lock对象 :std::mutex mymutex; 

    2)给对象传入一个std::mutex 对象作为参数:unique_lock lock(mymutex);

  (6) std::condition_variable:当 std::condition_variable 对象的某个 wait 函数被调用的时候,它使用 std::unique_lock(通过 std::mutex) 来锁住当前线程。当前线程会一直被阻塞,直到另外一个线程在相同的 std::condition_variable 对象上调用了 notification 函数来唤醒当前线程。

    std::condition_variable::wait() :std::condition_variable提供了两种 wait() 函数=========》

    1) 第一种情况:void wait (unique_lock& lock);当前线程调用 wait() 后将被阻塞(此时当前线程应该获得了锁(mutex),不妨设获得锁 lock),直到另外某个线程调用 notify_* 唤醒了当前线程。在线程被阻塞时,该函数会自动调用 lock.unlock() 释放锁,使得其他被阻塞在锁竞争上的线程得以继续执行。另外,一旦当前线程获得通知(notified,通常是另外某个线程调用 notify_* 唤醒了当前线程),wait() 函数也是自动调用 lock.lock(),使得 lock 的状态和 wait 函数被调用时相同。

   2) 第二种情况:void wait (unique_lock& lock, Predicate pred)(即设置了 Predicate),只有当 pred 条件为 false 时调用 wait() 才会阻塞当前线程,并且在收到其他线程的通知后只有当 pred 为 true 时才会被解除阻塞。

   std::condition_variable::notify_one() :唤醒某个等待(wait)线程。如果当前没有等待线程,则该函数什么也不做,如果同时存在多个等待线程,则唤醒某个线程是不确定的;

  (7) using return_type = typename xxx :指定别名,和typedef的作用类似;

  (8) std::make_shared : 创建智能指针,需要传入类型;

  (9) std::future<> : 可以用来获取异步任务的结果,因此可以把它当成一种简单的线程间同步的手段;

  (10) std::packaged_task<> : std::packaged_task 包装一个可调用的对象,并且允许异步获取该可调用对象产生的结果,从包装可调用对象意义上来讲,std::packaged_task 与 std::function 类似,只不过 std::packaged_task 将其包装的可调用对象的执行结果传递给一个 std::future 对象(该对象通常在另外一个线程中获取 std::packaged_task 任务的执行结果);

  (11) std::bind() : bind的思想实际上是一种延迟计算的思想,将可调用对象保存起来,然后在需要的时候再调用。而且这种绑定是非常灵活的,不论是普通函数、函数对象、还是成员函数都可以绑定,而且其参数可以支持占位符,比如你可以这样绑定一个二元函数auto f = bind(&func, _1, _2);,调用的时候通过f(1,2)实现调用;

  (12) std::forward : 左值与右值的概念其实在C++0x中就有了。概括的讲,凡是能够取地址的可以称之为左值,反之称之为右值,C++中并没有对左值和右值给出明确的定义,从其解决手段来看类似上面的定义,当然我们还可以定义为:有名字的对象为左值,没有名字的对象为右值。std :: forward有一个用例:将模板函数参数(函数内部)转换为调用者用来传递它的值类别(lvalue或rvalue)。这允许右值参数作为右值传递,左值作为左值传递,一种称为“完全转发”的方案;

  (13) std::packged_task::get_future: 获取与共享状态相关联的 std::future 对象。在调用该函数之后,两个对象共享相同的共享状态。

c++不像Java有现成的且好用的,boost库不够轻量级。就是在这样的条件下才能享受到自主化的快乐,不是吗

测试

有一个 主线程 ,即运行程序时创建的线程可以从 “用户” 中获取任务,还有一个 管理线程 ,用于进行线程池中线程的调度,还有初始化线程池时创建的若干空闲线程,用于执行任务

  • 具体的分类:

Task:任务类,内有任务的优先级,和一个纯虚Run方法,我们需要派生Task,将要完成的任务写到Run方法中

MyThread:线程类,封装了C++11的thread,每一个线程可以关联一个Task对象,执行其Run方法

BusyThreadContainer:工作容器类,采用std::list<MyThread*>实现,储存工作状态的线程

IdleThreadContainer:空闲容器类,采用std::vector<MyThread*>实现,储存处于空闲状态的线程

TaskContainer:任务容器类,采用priority_queue<Task*>实现,储存所有用户添加未执行的任务

MyThreadPool:线程池类,用于从用户获取任务,管理任务,实现对线程池中线程的调度

不多废话,上码

  • Task类

namespace
{
    enum  PRIORITY
    {

        MIN = 1, NORMAL = 25, MAX = 50
    };
}

class Task
{
    
public:
    Task()
    {

    }
    void SetPriority(int priority)
    {
        if (priority>(PRIORITY::MAX))
        {
            priority = (PRIORITY::MAX);
        }
        else if (priority>(PRIORITY::MAX))
        {
            priority = (PRIORITY::MIN);
        }
    }    
    virtual void Run() = 0;
protected:
    int priority_;
};

void SetPriority(int priority) :设置线程的优先级,数值在1-50之间,值越大,优先级越高

virtual void run() = 0:线程执行的方法,用户需要重写为自己的方法

  • MyThread类

class MyThread
{
    friend bool operator==(MyThread my1, MyThread my2);
    friend bool operator!=(MyThread my1, MyThread my2);
public:
    MyThread(MyThreadPool *pool);
    void Assign(Task *Task);
    void Run();
    void StartThread();
    int getthreadid();
    void setisdetach(bool isdetach);    
private:
    MyThreadPool *mythreadpool_;
    static int  s_threadnumber;
    bool isdetach_;
    Task *task_;
    int threadid_;
    std::thread thread_;
};

MyThread(MyThreadPool *pool) // 造一个MyThread对象,将自己与指定的线程池相关联起来

void Assign(Task *Task) // 将一个任务与该线程相关联起来

void Run() // 调用了Task的Run方法,同时在Task的Run方法结束后将自己从工作容器移回空闲容器

void StartThread() // 执行线程的Run方法,即执行了Task的Run方法

int getthreadid() // 获取线程的id号

void setisdetach(bool isdetach) // 设置线程在运行的时候是join还是detach的

  • BusyThreadContainer类

class BusyThreadContainer
{
    
public:
    BusyThreadContainer();
    ~BusyThreadContainer();
    void push(MyThread *m);
    std::list<MyThread*>::size_type size();
    void erase(MyThread *m);

private:
    std::list<MyThread*> busy_thread_container_;
    typedef std::list<MyThread*> Container;
    typedef Container::iterator Iterator;
};

void push(MyThread *m) // 将一个线程放入工作容器中

void erase(MyThread *m) // 删除一个指定的线程

std::list<MyThread*>::size_type size() // 返回工作容器的大小

  • IdleThreadContainer类

class IdleThreadContainer
{
    
public:
    IdleThreadContainer();
    ~IdleThreadContainer();
    std::vector<MyThread*>::size_type size();
    void push(MyThread *m);
    void assign(int n,MyThreadPool* m);    
    MyThread* top();
    void pop();
    void erase(MyThread *m);
private:
    std::vector<MyThread*> idle_thread_container_;
    typedef std::vector<MyThread*> Container;
    typedef Container::iterator Iterator;
};

~IdleThreadContainer(); // 负责析构空闲容器中的线程

void push(MyThread *m):// 将一个线程放回空闲容器中

void assign(int n,MyThreadPool* m) // 创建n个线程与线程池m相关联的线程放入空闲容器中

MyThread* top() // 返回位于空闲容器顶端的线程

void pop() // 弹出空闲容器顶端的线程

void erase(MyThread *m) // 删除一个指定的线程

  • TaskContainer类

class TaskContainer
{
public:
    TaskContainer();
    ~TaskContainer();
    void push(Task *);
    Task* top();
    void pop();
    std::priority_queue<Task*>::size_type size();
private:
    std::priority_queue<Task*> task_container_;
};

void push(Task *) // 将一个任务放入任务容器中

Task* top() // 返回任务容器顶端的任务

void pop() // 将任务容器顶端的线程弹出

std::priority_queue<Task*>::size_type size() // 返回任务容器的大小

  • MyThreadPool类

class MyThreadPool
{
public:
    
    MyThreadPool(){}
    MyThreadPool(int number);
    ~MyThreadPool();
    void AddTask(Task *Task,int priority);
    void AddIdleThread(int n);
    void RemoveThreadFromBusy(MyThread *myThread);
    void Start();
    void EndMyThreadPool();private:
    BusyThreadContainer busy_thread_container_;
    IdleThreadContainer idle_thread_container_;
    bool issurvive_;
    TaskContainer task_container_;
    std::thread thread_this_;
    std::mutex busy_mutex_;
    std::mutex idle_mutex_;
    std::mutex task_mutex_;
    int number_of_thread_;
};

MyThreadPool(int number) // 构造MyThreadPool,创建包含number个线程的空闲容器

void AddTask(Task *Task,int priority) // 添加一个优先级为priority的任务到任务容器中

void AddIdleThread(int n) // 在创建n个空闲线程到空闲容器中

void RemoveThreadFromBusy(MyThread *myThread) // 将一个线程从工作容器中删除,并移回空闲容器中

void Start() // 判断是否有空闲线程,如有将任务从从任务容器中提出,放入空闲容器中,等待执行

void EndMyThreadPool() // 结束线程池的运行

  • 派生自Task的MyTask类

class MyTask :public Task
{
    friend bool operator<(MyTask  &lv,MyTask &rv)
    {
        return lv.priority_ < rv.priority_;
    }
public:
    MyTask();
    ~MyTask();
    virtual void Run();
    void setdata(int d);
private:
    int data_;
};


MyTask::MyTask()
{
}
MyTask::~MyTask()
{
}
void MyTask::setdata(int d)
{
    data_ = d;
}
void MyTask::Run()
{
    std::cout << "Hello I am "<<data_ << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(1));
}

friend bool operator<(MyTask &lv,MyTask &rv) // 用于确定任务在任务容器中的位置

Run // 自定义的Run方法

void setdata(int d) // 设置数据

分析

  • MyThread::Run()
    void MyThread::Run()
    {
      cout <<"Thread:"<< threadid_ << " run ";
      task_->Run();
      mythreadpool_->RemoveThreadFromBusy(this);
    }
    

调用了Task的Run方法,同时在Task的Run方法结束后,通知线程池将自己从工作容器中移回空闲容器

  • MyThread::StartThread()
    void MyThread::StartThread()
    {
      thread_ = thread(&MyThread::Run, this);
      if (isdetach_ == true)
          thread_.detach();
      else
          thread_.join();
    }
    

将MyThread的Run方法与thread_相绑定,this表示类的Run方法的第一个隐含的参数

然后根据isdetach的值,判断是否detach() or join()

  • RemoveThreadFromBusy
    void MyThreadPool::RemoveThreadFromBusy(MyThread *myThread)
    {
    
      busy_mutex_.lock();
      cout << "Thread:" << myThread->getthreadid()<< " remove from busylist" << endl;
      busy_thread_container_.erase(myThread);
      busy_mutex_.unlock();
    
      idle_mutex_.lock();
      idle_thread_container_.push(myThread);
      idle_mutex_.unlock();
    }
    

将一个线程从任务容器中移除,并将其放回空闲容器中,

使用busy_mutex_和idle_mutex_进行加锁和解锁,确保数据的一致性

  • MyThreadPool

MyThreadPool::MyThreadPool(int number)
{
    issurvive_ = true;
    number_of_thread_ = number;
    idle_thread_container_.assign(number, this);
    thread_this_ =thread(&MyThreadPool::Start, this);
    thread_this_.detach();
}

  • Start
void MyThreadPool::Start()
{
    
    while (true)
    {
        if (issurvive_==false)
        {
            busy_mutex_.lock();
            if (busy_thread_container_.size()!=0)
            {
                busy_mutex_.unlock();
                continue;
            }
            busy_mutex_.unlock();
            break;
        }
        idle_mutex_.lock();
        if (idle_thread_container_.size() == 0)
        {
            idle_mutex_.unlock();
            continue;
        }
        idle_mutex_.unlock();
        task_mutex_.lock();
        if (task_container_.size() == 0)
        {
            task_mutex_.unlock();
            continue;
        }
        Task *b = task_container_.top();;
        task_container_.pop();
        task_mutex_.unlock();
        
        idle_mutex_.lock();
        MyThread *mythread = idle_thread_container_.top();;
        idle_thread_container_.pop();
        mythread->Assign(b);
        idle_mutex_.unlock();

        busy_mutex_.lock();
        busy_thread_container_.push(mythread);
        busy_mutex_.unlock();
        mythread->StartThread();
    }
}

管理线程对应的Start方法,内有一个死循环,不停的判断任务容器中是否有任务,和是否有空闲线程来执行任务,若有,则将任务从

任务容器中提出,从空闲线程中提取出一个空闲线程与其绑定,执行该任务,同时将该线程从空闲容器移动到工作容器中。

当线程池想要结束运行时,即survive为false时,首先要判断工作容器是否为空,若不为空,则代表还有任务正在被线程执行,线程池不能结束运行

否则可以结束线程池的运行,跳出死循环

  • 主函数

MyThreadPool mythreadPool(10);

MyTask j[50];
for (int i = 0; i < 50;i++)
{
    j[i].setdata(i);
}
for (int i = 0; i < 50; i++)
{
    mythreadPool.AddTask(&j[i],i);
}
int i;
//按100添加一个任务
//按-1结束线程池
while (true)
{
    cin >> i;    
    if (i == 100)
    {
        MyTask j;
        j.setdata(i);
        mythreadPool.AddTask(&j, i);
    }
    if (i == -1)
    {        
        mythreadPool.EndMyThreadPool();
        break;
    }        
}

  • 结果 ```` Thread:1 remove from busylist
    Thread:4 run Hello I am 3
    Thread:1 run Thread : 7 run Hello I am 1Hello I am 2

Thread:5 remove from busylist
Thread:6 remove from busylist
Thread:3 remove from busylist
Thread:2 remove from busylist
Thread:10 remove from busylist
Thread:8 remove from busylist
Thread:9 remove from busylist
Thread:4 remove from busylist
Thread:7 remove from busylist
Thread:1 remove from busylist
100
Thread:1 run Hello I am 100
Thread:1 remove from busylist
100
Thread:1 run Hello I am 100
Thread:1 remove from busylist
````

后记?

线程池并不是万能的,线程池减少了创建与销毁线程本身对任务照成的影响,但如果任务本身的运行时间很长,那么这些开销相当于任务本身执行开销而言是可以忽略的。那么我们也可以

选择“即时创建,即时销毁”的策略(嗯嗯)

线程池通常适合下面的几个场合:

(1) 单位时间内处理的任务数较多,且每个任务的执行时间较短

(2) 对实时性要求较高的任务,如果接受到任务后在创建线程,再执行任务,可能满足不了实时要求,因此必须采用线程池进行预创建。

关于多线程的实验
to be continued