任务队列和线程池队列(C++11)

C/C++代码 blackfeather


任务队列可以认为是执行同一个方法来处理数据的队列,指定回调函数。

线程池就是先开辟好多个线程,然后将要执行的方法+参数丢到线程池中,支持返回值获取。


均依赖了无锁队列库 https://github.com/cameron314/concurrentqueue

本来不想依赖三方组件的,但是测试发现这个库效率是真的高,比带锁的队列快非常多。

而且vs中的std::queue是有BUG的,pop后不释放内存,也有一定的隐患(急死强迫症)


任务队列库TaskQueue:

#pragma once

#include <functional>
#include <vector>
#include <thread>
#include <future>
#include <atomic>
#include <chrono>

#include "queue/blockingconcurrentqueue.h" //for tsBlockQuque

template <class T, class P = void*>
class CTaskQueue
{
private:
	bool m_bExit;
	std::atomic<uint64_t> m_nTotalCount;
	std::atomic<uint64_t> m_nProcessedCount;

	tsBlockQuque <T> m_taskQueue;
	typedef std::function<void(P &param)> functionTaskInit;
	typedef std::function<void(P &param)> functionTaskExit;
	typedef std::function<bool(T &data)> functionTaskCallback;
	typedef std::function<bool(T &data, P &param)> functionTaskExCallback;

	functionTaskCallback m_TaskCallBack;

	functionTaskInit m_TaskInitCallBack;
	functionTaskExit m_TaskExitCallBack;
	functionTaskExCallback m_TaskExCallBack;

	std::vector <std::future<int>> m_arrWorkThreads;

	typedef std::function<void(uint64_t, uint64_t)> functionWaitCallback;

	void _init(int nThreadCount)
	{
		for (int i = 0; i < nThreadCount; i++)
		{
			m_arrWorkThreads.push_back(std::async(std::launch::async, [this]()->int{

				P param;
				if (m_TaskInitCallBack)
					m_TaskInitCallBack(param);

				while (true)
				{
					T data;
					if (m_taskQueue.wait_dequeue_timed(data, std::chrono::milliseconds(100)))
					{
						//找到一组数据 回调
						if (m_TaskCallBack)
						{
							if (!m_TaskCallBack(data))
							{
								//处理失败了 重新丢回队列里面去
								m_taskQueue.enqueue(data);
								continue;
							}
						}
						else if (m_TaskExCallBack)
						{
							if (!m_TaskExCallBack(data, param))
							{
								//处理失败了 重新丢回队列里面去
								m_taskQueue.enqueue(data);
								continue;
							}
						}

						m_nProcessedCount++;
						continue;
					}

					//结束标记并且队列没有数据了 返回
					if (m_bExit)
						break;
				}

				if (m_TaskExitCallBack)
					m_TaskExitCallBack(param);

				return 0;
			}));
		}
	}
public:
	CTaskQueue()
	{
		m_bExit = false;
		m_nTotalCount = 0;
		m_nProcessedCount = 0;
	}
	~CTaskQueue()
	{
		m_bExit = true;

		for (auto& fu : m_arrWorkThreads)
		{
			try
			{
				fu.get();
			}
			catch (...){}
		}
	}

	/*
	参数:
		线程数
		回调方法
		等待间隔(ms)
	*/
	void InitTaskQueue(int nThreadCount, functionTaskCallback fnTaskCallback)
	{
		if (m_arrWorkThreads.size())
			return;

		m_TaskCallBack = fnTaskCallback;
		m_TaskExCallBack = nullptr;
		m_TaskInitCallBack = nullptr;
		m_TaskExitCallBack = nullptr;

		_init(nThreadCount);
	}
	/*
	参数:
	线程数
	数据处理回调方法
	队列线程初始化回调
	队列线程销毁回调
	等待间隔(ms)
	*/
	void InitTaskQueue(int nThreadCount, functionTaskExCallback fnTaskExCallback, functionTaskInit fnTaskInitCallback, functionTaskExit fnTaskExitCallback)
	{
		if (m_arrWorkThreads.size())
			return;

		m_TaskCallBack = nullptr;
		m_TaskExCallBack = fnTaskExCallback;
		m_TaskInitCallBack = fnTaskInitCallback;
		m_TaskExitCallBack = fnTaskExitCallback;

		_init(nThreadCount);
	}

	//添加数据
	void AddData(const T &data)
	{
		m_taskQueue.enqueue(data);
		m_nTotalCount++;
	}

	//读取当前队列中待处理的数量
	uint64_t GetQueueSize()
	{
		return m_taskQueue.size_approx();
	}

	//读取已经处理过的数量
	uint64_t GetProcessedCount()
	{
		return m_nProcessedCount;
	}

	//读取所有加入过队列的数量
	uint64_t GetTotalCount()
	{
		return m_nTotalCount;
	}

	//查询队列是否处理完毕  立即返回
	bool IsFinished()
	{
		if (GetQueueSize() == 0 && m_nTotalCount == m_nProcessedCount)
			return true;

		return false;
	}

	//等待队列结束  阻塞执行
	void WaitforFinish(int nCheckms = 100, functionWaitCallback fnWaitCallback = nullptr)
	{
		printf("Wait for finish...\n");

		while (!IsFinished())
		{
			if (fnWaitCallback)
				fnWaitCallback(m_nProcessedCount, m_nTotalCount);

			std::this_thread::sleep_for(std::chrono::milliseconds(nCheckms));
		}

		if (fnWaitCallback)
			fnWaitCallback(m_nProcessedCount, m_nTotalCount);

		printf("All finished. \n");
	}
};


示例代码:

    
    //
    CTaskQueue<int, int> taskTester;  //第二个int可忽略
	
	//单一回调
	taskTester.InitTaskQueue(8, [](int data)->bool{
		printf("task:%d\n", data);
		return true; //返回false会重新插回队列中
	}
	
	//多个回调的初始化方法
	taskTester.InitTaskQueue(8, [](int data, int &param)->bool{
		printf("task:%d, param:%d\n", data, param);
		return true;
	},
	[](int &param){
		param = GetCurrentThreadId();
		printf("task work thread init:%d \n", param);
	},
	[](int &param){
		printf("task work thread exit:%d \n", param);
	});

	for (int i = 0; i < 10000; i++)
	{
		taskTester.AddData(i);
	}

	taskTester.WaitforFinish(100, [](uint64_t processed, uint64_t total){
		uint64_t ret = (processed * 100) / total;
		printf("processed:%I64d, total:%I64d, %d%% \n", processed, total, (int)ret);
	});




线程池是用之前的帖子中的线程池修改的,增加了几个方法,修正了vs下的BUG 。。。

#pragma once

#include <vector>
#include <memory>
#include <thread>
#include <future>
#include <functional>
#include <stdexcept>
#include <atomic>
#include <chrono>

#include "queue/blockingconcurrentqueue.h" //for tsBlockQuque

class ThreadPool 
{
private:
	// need to keep track of threads so we can join them
	std::vector<std::future<int>> m_arrWorkThreads;
	// the task queue
	tsBlockQuque < std::function<void()> > m_TaskQueue;

	std::atomic<uint64_t> m_nTotalCount;
	std::atomic<uint64_t> m_nProcessedCount;

	typedef std::function<void(uint64_t, uint64_t)> functionWaitCallback;

	bool m_bExit;

public:

	// the constructor just launches some amount of workers
	ThreadPool(size_t threads)
	{
		m_nTotalCount = 0;
		m_nProcessedCount = 0;
		m_bExit = false;

		InitThreadPool(threads);
	}

	ThreadPool()
	{
		m_nTotalCount = 0;
		m_nProcessedCount = 0;
		m_bExit = false;
	}

	// the destructor joins all threads
	~ThreadPool()
	{
		m_bExit = true;

		for (auto& fu : m_arrWorkThreads)
		{
			try
			{
				fu.get();
			}
			catch (...){}
		}
	}

	void InitThreadPool(size_t threads)
	{
		assert(m_arrWorkThreads.size() == 0);

		for (size_t i = 0; i < threads; i++)
		{
			m_arrWorkThreads.push_back(std::async(std::launch::async, [this]()->int{

				printf("thread pool thread start...\n");

				while (true)
				{
					std::function<void()> task;
					if (m_TaskQueue.wait_dequeue_timed(task, std::chrono::milliseconds(100)))
					{
						task();
						m_nProcessedCount++;
						continue;
					}

					//结束标记并且队列没有数据了 返回
					if (m_bExit)
						break;
				}

				printf("thread pool thread exit...\n");
				return 0;
			}));
		}
	}

	// add new work item to the pool back
	template<class F, class... Args>
	auto AddTask(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>
	{
		assert(m_arrWorkThreads.size());

		using return_type = typename std::result_of<F(Args...)>::type;

		auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...));

		std::future<return_type> res = task->get_future();

		m_TaskQueue.enqueue([task](){ (*task)(); });
		m_nTotalCount++;

		return res;
	}

	//获取总共插入过的任务数
	uint64_t GetTotalCount()
	{
		return m_nTotalCount;
	}

	//获取已经处理完毕的任务数
	uint64_t GetProcessedCount()
	{
		return m_nProcessedCount;
	}

	//读取当前队列中待处理的数量
	uint64_t GetQueueSize()
	{
		return m_TaskQueue.size_approx();
	}

	//判断是否已经执行完毕已经插入的任务
	bool IsFinished()
	{
		if (m_TaskQueue.size_approx() == 0 && m_nProcessedCount == m_nTotalCount)
			return true;

		return false;
	}


	//阻塞等待队列执行完毕
	void WaitForFinish(int nCheckms = 100, functionWaitCallback fnWaitCallback = nullptr)
	{
		printf("Wait for finish...\n");

		while (!IsFinished())
		{
			if (fnWaitCallback)
				fnWaitCallback(m_nProcessedCount, m_nTotalCount);

			std::this_thread::sleep_for(std::chrono::milliseconds(nCheckms));
		}

		if (fnWaitCallback)
			fnWaitCallback(m_nProcessedCount, m_nTotalCount);

		printf("All finished. \n");
	}
};



最后说说vs神奇的BUG。。。

#include <iostream>
#include <string>
#include <vector>
#include <thread>


class MyClass
{
private:
	std::vector<std::thread> arrthreads;
	bool bexit;
public:
	MyClass()
	{
		bexit = false;
	};
	~MyClass()
	{
		bexit = true;
		for (auto &x : arrthreads)
		{
			x.join();  //vs编译的这里会卡死,gcc编译的正常执行
		}
	};

	void init()
	{
		for (int i = 0; i < 4; i++)
		{
			arrthreads.emplace_back([this](){

				printf("trhead start...\n");

				while (true)
				{
					if (bexit)
						break;

				}

				printf("trhead exit...\n");
			});
		}
	};
};


MyClass testthread;

int main()
{
    printf("main2\n");
    
    testthread.init();
    
    return 0;
}

MyClass析构函数中让工作线程退出,毫无问题,但是vs编译的,join会卡死阻塞或者崩溃跑飞,gcc编译的无问题。

各位大师有何看法。。。

评论列表:

发表评论: