/ CPLUSPLUS, MSGQUEUE

C++下消息队列(多消费者模式)的实现

不允许用现成的消息队列比如rabbitmq等,非要造轮子!

一. 生产者-消费者模式

  • 生产者:这里由于是通过txt文件来进行交互,相当于txt文件的内容就是生产者,同时还需要实时监控txt文件,将其新消息放入队列。
  • 消费者:从队列中取消息,并需要告诉队列该条消息已经被消费。

  • 断点续传:需要考虑到程序崩溃之后,知道从哪开始消费。

1.1 多消费者——多线程

这里如果如果是IO操作较多的话,推荐使用多线程来创建消费者。具体创建和消费的过程如下:

  • 我们可以看到其实这里的多消费者其实是串联的形式来进行消费,因此如果是CPU资源低,IO操作多的话,推荐这种形式。

  • 多线程之间的内存变量交互很友善,不像多进程(哎,难受啊)。

1.2 多消费者——多进程

  • 考虑到这里是需要像CPU请求较多资源,甚至是需要使用GPU的资源和利用CUDA加速(深度学习模型占用资源较多),因此我这里使用的是多进程来构建消费者。具体创建和消费过程如下:

二. 过程的实现

具体代码可以参考本人的git

  • 定时器:每隔一段时间扫描txt文件,并将文件中新加入的消息存放入队列中。

    // timer扫描
    int wait_sec = 1;
    // 单独启动一个线程持续扫描文件(每5秒)
    string path = "video.txt";
    Timer timer1;
    timer1.start(2000, std::bind(getVideoFromTxt, path, wait_sec, &output));
    
  • 中间件:需要存放已经消费的消息,这样知道消费的具体位置,且支持程序崩溃/断掉之后,重启后知道在哪开始消费,同样用txt进行保存到本地。

  • 多消费者:使用多进程创建消费者,这里考虑到进程中间的共享内存不好交互,直接使用txt来交互数据(反正进程之间的内存交互其实也是通过一个共享文件映射来完成的)。这里直接使用调用命令行的方式来构建消费者。

    void gen_multiProcess(int id, string input_txt,string output_txt) {
      
    	STARTUPINFO si;
    	si.cb = sizeof(si);
    	PROCESS_INFORMATION pi;
    	ZeroMemory(&si, sizeof(si));
    	ZeroMemory(&pi, sizeof(pi));
    	string cmdLine = "D:/vs_project/setTimer/x64/Debug/setTimer.exe " + input_txt + " " + output_txt;
    	cout << cmdLine << endl;
    	wstring str = StringToWString(cmdLine);
      
    	BOOL bSuccess = CreateProcess(NULL,
    		const_cast<LPWSTR>(str.c_str()),
    		NULL, NULL, FALSE, 0, NULL, NULL, &si, &pi);
      
    	if (bSuccess) {
    		//handleOfProcess[id] = pi.hProcess;
    		cout << "Process-" << id << "completed!" << endl;
    	}
    	else {
    		cout << "Error:" << id << endl;
    	}
    }
    

三. 收获

其实手写消息队列,帮助自己更多的了解消息消费机制,以及多进程和多线程的使用,当然了也更了解了C++的标准库。所以推荐大家还是亲手动手手写一哈。哎,作为一个python调包侠突然写这种偏底层,有点难受好吧。