std::future和std::promise两者结合可以实现异步的功能场景,本文将介绍的异步收发数据模版类是在实践中结合std::future和std::promise而摸索出来的。
工作过程中,我们可能会经常遇到这样的场景,需要从线程中获取运行的结果。现在我们有两种方式可以实现这样的效果。
- 第一种方式,属于通用用法,通过使用指针在线程间共享数据。传递指针给新建的线程,主线程使用条件变量等待被唤醒;当线程设置完成数据到传递过来的指针之后,发送条件变量信号,主线程被唤醒之后,从指针中提取数据。这种方式采用条件变量、锁、指针结合才实现了异步功能,比较复杂。
- 第二种方式,采用std::future和std::promise对象,也就是本文接下来要详细说明的一种异步实现方式。
std::future是一个类模版,内部存储一个将来用于分配的值,它提供了get()成员函数来访问该值的机制。如果关联值可用之前,调用了get函数,那么get函数将阻塞直到关联值不可用。
std::promise也是一个类模版,它用来设置上面的关联值,每一个stb::promise和一个std::future对象关联,一旦stb::promise设置值之后,std::future对象的get()函数就会获取到值,然后返回。std::promise与它关联的std::future共享数据。
一、阻塞等待获取数据
1、实现线程执行函数,入参是一个std::promise指针,函数内调用std::promise指针设置值
void thread_function(std::promise<std::string>* pPromiseObj)
{
if(nullptr == pPromiseObj)
{
return;
}
pPromiseObj->set_value("this is my name.");
}
2、定义std::promise对象,从该对象获取关联的std::future对象,启动线程并且传入std::promise对象的指针,调用std::future对象的get()函数阻塞等待,如果返回,那么打印输出返回的字符串信息。
// 定义std::promise对象,从该对象获取关联的std::future
std::promise<std::string> promise_obj;
std::future<std::string> future_obj = promise_obj.get_future();
// 启动线程
std::thread thread_obj(&thread_function, &promise_obj);
// 阻塞等待
std::string str = future_obj.get();
std::cout << "std = " << str << std::endl;
// 等待线程退出
thread_obj.join();
3、运行程序,输出的信息如下所示,从这里可以看出,std::promise在线程中设置值之后,std::future对象的get()函数成功获取并返回。
二、通知线程退出
基于std::promise和std::future的机制,我们可以利用std::promise的set_value来通知运行的线程退出。具体如何做呢,我们接下来给出例子进行说明。
1、实现线程的执行函数,入参为与std::promise关联的std::future对象,执行函数内部调用std::future的wait_for循环超时等待,如果std::future的wait_for在超时时间内没有收到std::promise调用set_value发送的信号,那么继续循环等待,如果在超时时间内收到std::promise调用set_value发送的信号,那么退出循环,同时线程页退出了。
void JThreadFunction(std::future<void> FutureObj)
{
// 调用std::future的wait_for循环超时等待
while(FutureObj.wait_for(std::chrono::milliseconds(1))
== std::future_status::timeout)
{
std::cout << "do something" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
2、创建std::promise对象,从std::promise对象提取关联的future对象,启动线程,并且将上面的future对象传递给线程,主线程休眠一段时间之后,调用std::promise对象的set_value函数来发送信号,通知线程退出。
//创建promise对象
std::promise<void> exit_signal;
//提取future对象
std::future<void> future_obj = exit_signal.get_future();
//启动线程
std::thread thread_obj(JThreadFunction, std::move(future_obj));
//休眠
std::this_thread::sleep_for(std::chrono::seconds(3));
//发送信号
std::cout << "send signal" << std::endl;
exit_signal.set_value();
//等待线程退出
thread_obj.join();
std::cout << "exit function" << std::endl;
3、从输出的结果信息看,线程一直在运行,当收到std::promise对象发送信号的信号之后就退出。
三、异步收发数据
经过上面两个例子的讲解,相信大家对std::future和std::promise已经有了一个大概的了解。下面就给出异步收发数据的模版类。
1、类模版JAsyncSender实现两个函数,一个是Send用于发送数据,它可以在线程中执行,另一个是Wait等待接收数据,如果第三个参数没有输入,那么默认一直等待,否则在指定时间内,没有收到信息,那么返回失败。
#ifndef JASYNCSENDER_H
#define JASYNCSENDER_H
#include <future>
#include <chrono>
#include <thread>
#include "log/easylogging++.h"
///
/// 模版类声明
///
template <class RealT>
class JAsyncSender
{
public:
JAsyncSender();
~JAsyncSender();
// 发送数据
bool Send(const RealT &data);
// 等待接收数据,需要先运行
bool Wait(std::promise<RealT> promiseObj, RealT &data, unsigned int uiTimeMills = 0);
private:
std::promise<RealT> m_promiseObj;
};
///
/// 类模版实现
///
template <typename RealT>
JAsyncSender<RealT>::JAsyncSender()
{
}
template <typename RealT>
JAsyncSender<RealT>::~JAsyncSender()
{
}
template <typename RealT>
bool JAsyncSender<RealT>::Send(const RealT &data)
{
try
{
m_promiseObj.set_value(data);
} catch (const std::exception &e)
{
LOG(INFO) << "exception: " << e.what();
}
return true;
}
template <typename RealT>
bool JAsyncSender<RealT>::Wait(std::promise<RealT> promiseObj, RealT &data, unsigned int uiTimeMills)
{
std::future<RealT> future_obj = promiseObj.get_future();
m_promiseObj = std::move(promiseObj);
if (uiTimeMills > 0)
{
while(future_obj.wait_for(std::chrono::milliseconds(uiTimeMills))
== std::future_status::timeout)
{
return false;
}
}
data = future_obj.get();
return true;
}
#endif // JASYNCSENDER_H
2、接下来说明类模版JAsyncSender的使用方法
定义成员变量m_AsyncSendInt,它由主线程和子线程共享。JAsyncSender的type为整型,也可以定义为字符串,甚至是自定义对象,根据具体需求场景具体定义。
JAsyncSender<int> m_AsyncSendInt;
通过lambda方式创建线程,当然你也可以使用其他方式,线程内部先休眠一段时间,然后发送数据。
// 通过lambda方式创建线程
std::thread thread_obj( [&]{
LOG(INFO) << ": lambda thread executing";
std::this_thread::sleep_for(std::chrono::seconds(3));
m_AsyncSendInt.Send(20);
} ) ;
std::promise<int> promise_obj;
int i_data = -1;
// 等待线程返回数据
m_AsyncSendInt.Wait(std::move(promise_obj), i_data);
LOG(INFO) << "i_data: " << i_data;
if (thread_obj.joinable())
{
thread_obj.join();
}
从运行结果看,基于future和promise实现的异步收发数据模版类的功能是正常的。
[2019-11-17 19:29:01,539829] [auto JDebugCPPAttr::TestAsyncSender()::(anonymous class)::operator()() const:235] : lambda thread executing
[2019-11-17 19:29:04,542497] [bool JDebugCPPAttr::TestAsyncSender():244] i_data: 20
四、总结
std::promise与std::future的结合使用,可以更加容易处理异步消息事件,另外C++11标准中提供的 std::asych和std::packaged_task也是结合std::future来处理异步的事件流程。std::promise与std::future虽然功能强大,但是std::promise与std::future是一一对应的,目前没有办法处理一对多的问题,比如一个std::promise对应多个std::future。std::promise如果设置过一次,再次设置会报错,如果需要重新使用,需要再创建std::promise对象。