C++下消息队列(多消费者模式)的实现
不允许用现成的消息队列比如rabbitmq等,非要造轮子!
一. 生产者-消费者模式
- 生产者:这里由于是通过txt文件来进行交互,相当于txt文件的内容就是生产者,同时还需要实时监控txt文件,将其新消息放入队列。
-
消费者:从队列中取消息,并需要告诉队列该条消息已经被消费。
- 断点续传:需要考虑到程序崩溃之后,知道从哪开始消费。
1.1 多消费者——多线程
这里如果如果是IO操作较多的话,推荐使用多线程来创建消费者。具体创建和消费的过程如下:
-
我们可以看到其实这里的多消费者其实是串联的形式来进行消费,因此如果是CPU资源低,IO操作多的话,推荐这种形式。
-
多线程之间的内存变量交互很友善,不像多进程(哎,难受啊)。
1.2 多消费者——多进程
- 考虑到这里是需要像CPU请求较多资源,甚至是需要使用GPU的资源和利用CUDA加速(深度学习模型占用资源较多),因此我这里使用的是多进程来构建消费者。具体创建和消费过程如下:
二. 过程的实现
具体代码可以参考本人的git
-
定时器:每隔一段时间扫描txt文件,并将文件中新加入的消息存放入队列中。
// timer扫描 int wait_sec = 1; // 单独启动一个线程持续扫描文件(每5秒) string path = "video.txt"; Timer timer1; timer1.start(2000, std::bind(getVideoFromTxt, path, wait_sec, &output));
-
中间件:需要存放已经消费的消息,这样知道消费的具体位置,且支持程序崩溃/断掉之后,重启后知道在哪开始消费,同样用txt进行保存到本地。
-
多消费者:使用多进程创建消费者,这里考虑到进程中间的共享内存不好交互,直接使用txt来交互数据(反正进程之间的内存交互其实也是通过一个共享文件映射来完成的)。这里直接使用调用命令行的方式来构建消费者。
void gen_multiProcess(int id, string input_txt,string output_txt) { STARTUPINFO si; si.cb = sizeof(si); PROCESS_INFORMATION pi; ZeroMemory(&si, sizeof(si)); ZeroMemory(&pi, sizeof(pi)); string cmdLine = "D:/vs_project/setTimer/x64/Debug/setTimer.exe " + input_txt + " " + output_txt; cout << cmdLine << endl; wstring str = StringToWString(cmdLine); BOOL bSuccess = CreateProcess(NULL, const_cast<LPWSTR>(str.c_str()), NULL, NULL, FALSE, 0, NULL, NULL, &si, &pi); if (bSuccess) { //handleOfProcess[id] = pi.hProcess; cout << "Process-" << id << "completed!" << endl; } else { cout << "Error:" << id << endl; } }
三. 收获
其实手写消息队列,帮助自己更多的了解消息消费机制,以及多进程和多线程的使用,当然了也更了解了C++的标准库。所以推荐大家还是亲手动手手写一哈。哎,作为一个python调包侠突然写这种偏底层,有点难受好吧。