Linux–线程池(包含日志的解释)

线程系列:
Linux–线程的认识(一)
Linux–线程的分离、线程库的地址关系的理解、线程的简单封装(二)
线程的互斥:临界资源只能在同一时间被一个线程使用
生产消费模型
信号量

线程池

线程池(Thread Pool)是一种基于池化技术设计用于管理复用线程的机制
在多线程编程中,频繁地创建和销毁线程会消耗大量的系统资源,并且由于线程的创建和销毁需要时间,这也会降低程序的执行效率。线程池通过预先创建一定数量的线程并放入池中,需要时从池中取出线程执行任务,执行完毕后线程并不销毁而是重新放回池中等待下一次使用,从而避免了线程的频繁创建和销毁所带来的开销。

主要优点

  • 降低资源消耗:通过复用已存在的线程,减少线程创建和销毁的开销。
  • 提高响应速度:当任务到达时,可以立即分配线程进行处理,减少了等待时间。
  • 提高线程的可管理性:线程池可以统一管理线程,包括线程的创建、调度、执行和销毁等。
  • 关键参数

  • 核心线程数:线程池维护线程的最少数量,即使这些线程处于空闲状态,线程池也不会回收它们(一般为主线程)。
  • 最大线程数:线程池中允许的最大线程数。
  • 阻塞队列:用于存放待执行的任务的队列。当线程池中的所有线程都忙时,新任务会被添加到这个队列中等待处理。
  • 非核心线程空闲存活时间:当线程池中的线程数量超过核心线程数时,如果线程空闲时间超过这个时间,多余的线程将被终止。
  • 线程工厂:用于创建新线程的工厂。
  • 拒绝策略:当线程池和队列都满了时,对于新来的任务将采取的拒绝策略。
  • 实现原理

    线程池的实现原理通常包括以下几个关键部分:

  • 线程池管理器:负责创建并管理线程池,包括初始化线程池、创建工作线程、销毁线程池等。
  • 工作线程:线程池中的线程,负责执行具体的任务。工作线程通常会不断从任务队列中取出任务并执行,直- 到线程池被销毁或所有任务都执行完毕。
  • 任务接口:每个任务必须实现的接口,用于定义任务的执行逻辑。在Java中,这通常是通过实现Runnable或Callable接口来实现的。
  • 任务队列:用于存放待处理的任务。当线程池中的工作线程数量达到最大值时,新到达的任务会被放入任务队列中等待处理。任务队列的实现通常依赖于Java的BlockingQueue接口。
  • 实现流程

  • 当有新任务提交给线程池时,线程池会首先判断当前正在运行的线程数量是否小于核心线程数。如果是,则直接创建新的线程来执行任务;否则,将任务加入任务队列等待处理。
  • 如果任务队列已满,且当前正在运行的线程数量小于最大线程数(maximumPoolSize),则创建新的线程来处理任务;如果线程数量已经达到最大线程数,则根据配置的拒绝策略来处理新任务(如抛出异常、直接丢弃等)。
  • 当一个线程完成任务后,它会从任务队列中取下一个任务来执行;如果没有任务可供执行,并且线程池中的线程数量超过了核心线程数,且这些线程空闲时间超过了设定的存活时间,则这些线程会被销毁,直到线程池中的线程数量减少到核心线程数为止。
  • 实例

    Thread.hpp

    #ifndef __THREAD_HPP__
    #define __THREAD_HPP__
    
    #include<iostream>
    #include<string>
    #include<pthread.h>
    #include<functional>
    #include<unistd.h>
    
    using namespace std;
    
    
    
    namespace ThreadMdule
    {
        using func_t = std::function<void(string)>;
    
        class Thread
        {
        public:
            void Excute()
            {
                _func(_threadname);
            }
            Thread(func_t func, const std::string &name="none-name")
                : _func(func), _threadname(name), _stop(true)
            {}
    
            static void* threadroutine(void* args)
            {
                Thread* self=static_cast<Thread*>(args);
                self->Excute();
                return nullptr;
            }
            
    
            bool start()
            {
                int n=pthread_create(&_tid,nullptr,threadroutine,this);
                if(!n)
                {
                    _stop = false;
                    return true;
                }
                else
                {
                    return false;
                }
            }
            void Detach()
            {
                if(!_stop)
                {
                    pthread_detach(_tid);
                }
            }
            void Join()
            {
                if(!_stop)
                {
                    pthread_join(_tid,nullptr);
                }
            }
            string name()
            {
                return _threadname;
            }
            void Stop()
            {
                _stop = true;
            }
            ~Thread() {}
    
        private:
            pthread_t _tid;
            std::string _threadname;
            func_t _func;
            bool _stop;
        };
    }
    
    #endif
    

    ThreadPool.hpp

    #pragma once
    
    #include<iostream>
    #include<vector>
    #include<queue>
    #include<pthread.h>
    #include"Thread.hpp"
    #include"Log.hpp"
    #include"LockGuard.hpp"
    using namespace ThreadMdule;
    using namespace std;
    const static int gdefaultthreadnum=3;//默认线程池的线程数
    
    template <class T>
    class ThreadPool
    {
    public:
        ThreadPool(int threadnum=gdefaultthreadnum) :_threadnum(threadnum),_waitnum(0),_isrunning(false)
        {
            pthread_mutex_init(&_mutex,nullptr);
            pthread_cond_init(&_cond,nullptr);
            LOG(INFO,"ThreadPool COnstruct.");
        }
       
        //各个线程独立的任务函数
        void HandlerTask(string name)
        {
            LOG(INFO,"%s is running...",name.c_str());
            while(true)
            {
                LockQueue();//开启保护
                //等到有任务时才退出循环执行下列语句
                while(_task_queue.empty()&&_isrunning)
                {
                    _waitnum++;
                    ThreadSleep();
                    _waitnum--;
                }
                //当任务队列空并且线程池停止时线程退出
                if(_task_queue.empty()&&!_isrunning)
                {
                    UnlockQueue();
                    cout<<name<<" quit "<<endl;
                    sleep(1);
                    break;
                }
                //1.任务队列不为空&&线程池开启
                //2.任务队列不为空&&线程池关闭,直到任务队列为空
                //所以,只要有任务,就要处理任务
                T t=_task_queue.front();//取出对应任务
                _task_queue.pop();
                UnlockQueue();
                LOG(DEBUG,"%s get a task",name.c_str());
                //处理任务
                t();
                LOG(DEBUG,"%s handler a task,result is: %s",name.c_str(),t.ResultToString().c_str());
            }
        }
    
         //线程池中线程的构建
        void InitThreadPool()
        {
            for(int i=0;i<_threadnum;i++)
            {
                string name="thread-"+to_string(i+1);
                _threads.emplace_back(bind(&ThreadPool::HandlerTask,this,placeholders::_1),name);
                LOG(INFO,"init thread %s done",name.c_str());
            }
            _isrunning=true;
        }
        //线程池的启动
        void Start()
        {
            for(auto& thread:_threads)
            {
                thread.start();
            }
        }
        //线程池停止
        void Stop()
        {
            LockQueue();
            _isrunning=false;
            ThreadWakeupAll();
            UnlockQueue();
        }
        void Wait()
        {
            for(auto& thread:_threads)
            {
                thread.Join();
                LOG(INFO,"%s is quit...",thread.name().c_str());
            }
        }
        //将任务入队列
        bool Enqueue(const T& t)
        {
            bool ret=false;
            LockQueue();
            if(_isrunning)
            {
                _task_queue.push(t);
                //如果有空闲的线程,那么唤醒线程让其执行任务
                if(_waitnum>0)
                {
                    ThreadWakeup();
                }
                LOG(DEBUG,"enqueue task success");
                ret=true;
            }
            UnlockQueue();
            return ret;
        }
        ~ThreadPool()
        {
            pthread_mutex_destroy(&_mutex);
            pthread_cond_destroy(&_cond);
        }
    private:
        void LockQueue()
        {
            pthread_mutex_lock(&_mutex);
        }
        void UnlockQueue()
        {
            pthread_mutex_unlock(&_mutex);
        }
        void ThreadSleep()
        {
            pthread_cond_wait(&_cond, &_mutex);
        }
        void ThreadWakeup()
        {
            pthread_cond_signal(&_cond);
        }
        void ThreadWakeupAll()
        {
            pthread_cond_broadcast(&_cond);
        }
    
    
        int _threadnum;//线程数
        vector<Thread> _threads;//存储线程的vector
        queue<T> _task_queue;//输入的任务队列
        pthread_mutex_t _mutex;//互斥锁
        pthread_cond_t _cond;//条件变量
    
        int _waitnum;//空闲的线程数
        bool _isrunning;//表示线程池是否启动
    
    };
    
    

    Task.hpp

    #include<iostream>
    #include<string>
    #include<functional>
    using namespace std;
    
    class Task
    {
    public:
        Task(){}
        Task(int a,int b): _a(a),_b(b),_result(0)
        {}
    
        void Excute()
        {
            _result=_a+_b;
        }
        string ResultToString()
        {
            return to_string(_a) + "+"+to_string(_b)+"="+to_string(_result);
        }
        string DebugToString()
        {
           return to_string(_a) + "+" + to_string(_b) + "= ?";
        }
        void operator()()
        {
            Excute();
        }
    private:
        int _a;
        int _b;
        int _result;
    };
    

    main.cc

    int main()
    {
        srand(time(nullptr)^getpid()^pthread_self());
        //EnableScreen();
        EnableFile();
        unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>(5));
    
        tp->InitThreadPool();
        tp->Start();
    
        int tasknum=10;
        while(tasknum)
        {
            sleep(1);
            int a=rand()%10+1;
            usleep(1024);
            int b=rand()%20+1;
            Task t(a,b);
            LOG(INFO,"main thread push task: %s",t.DebugToString().c_str());
            tp->Enqueue(t);
            tasknum--;
        }
        tp->Stop();
        tp->Wait();
    }
    

    Loh.hpp

    #pragma once
    
    #include<iostream>
    #include<fstream>
    #include<ctime>
    #include<cstdarg>
    #include<string>
    #include<sys/types.h>
    #include<unistd.h>
    #include<cstdio>
    #include"LockGuard.hpp"
    
    using namespace std;
    
    
    bool gIsSave=false;//默认输出到屏幕
    const string logname="log.txt";
    //1.日志是有等级的
    enum Level
    {
        DEBUG=0,
        INFO,
        WARNING,
        ERROR,
        FATAL 
    };
    
    
    void SaveFile(const string& filename,const string& messages)
    {
        ofstream out(filename,ios::app);
        if(!out.is_open())
        {
            return;
        }
        out<<messages;
        out.close();
    }
    //等级转化为字符串
    string LevelToString(int level)
    {
        switch (level)
        {
        case DEBUG:
            return "Debug";
        case INFO:
            return "Info";
        case WARNING:
            return "Warning";
        case ERROR:
            return "Error";
        case FATAL:
            return "Fatal";
        default:
            return "Unkonwn";
        }
    }
    
    //获取当前时间
    string GetTimeString()
    {
        time_t curr_time=time(nullptr);//时间戳
        struct tm* format_time=localtime(&curr_time);//转化为时间结构
        if(format_time==nullptr)
            return "None";
        char time_buffer[1024];
        snprintf(time_buffer,sizeof(time_buffer),"%d-%d-%d %d:%d:%d",
                 format_time->tm_year + 1900,
                 format_time->tm_mon + 1,
                 format_time->tm_mday,
                 format_time->tm_hour,
                 format_time->tm_min,
                 format_time->tm_sec);
        return time_buffer;
    
    }
    
    pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;
    //获取日志信息
    void LogMessage(string filename,int line,bool issave,int level,char* format,...)
    {
        string levelstr=LevelToString(level);
        string timestr=GetTimeString();
        pid_t selfid=getpid();
    
        char buffer[1024];
        va_list arg;
        va_start(arg,format);
        vsnprintf(buffer,sizeof(buffer),format,arg);
        va_end(arg);
    
        string message= "[" + timestr + "]" + "[" + levelstr + "]" +
                              "[" + std::to_string(selfid) + "]" +
                              "[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer + "\n";
    
    
        LockGuard lockguard(&lock);
        if(!issave)
        {
            cout<<message;
        }
        else
        {
            SaveFile(logname,message);
        }
                                                            
    }
    
     #define LOG(level,format,...)                                               \
        do                                                                        \
        {                                                                          \
            LogMessage(__FILE__,__LINE__,gIsSave,level,format,##__VA_ARGS__);       \
        } while (0)
    
    
    #define EnableFile()         \
        do                       \
        {                        \
            gIsSave=true;        \  
        } while (0)
    
    #define EnableScreen()         \
        do                         \
        {                          \
            gIsSave=false;         \  
        } while (0)
        
    

    线程池解释(代码)





    日志解释(代码)

    日志是系统或程序记录所有发生事件的文件:





    作者:诡异森林。

    物联沃分享整理
    物联沃-IOTWORD物联网 » Linux–线程池(包含日志的解释)

    发表回复