2017/7/31 10:47:42
Qt Concurrent Framework
Introduction
The QtConcurrent
namespace provides high-level APIs that make it possible to write multi-threaded programs without using low-level threading primitives such as mutexes, read-write locks, wait conditions, or semaphores. Programs written with QtConcurrent
automatically adjust the number of threads used according to the number of processor cores available. This means that applications written today will continue to scale when deployed on multi-core systems in the future. QtConcurrent
functions such as Map-Reduce and Filter-Reduce are deployed on non-distributed systems.
Concurrent Map and Map-Reduce
The QtConcurrent::map()
, QtConcurrent::mapped()
and QtConcurrent::mappedReduced()
functions run computations in parallel on the items in a sequence such as a QList
or a QVector
.
Concurrent Map
QtConcurrent::mapped()
:
Concurrent map takes an input sequence and a map function. This map functions is then called for each item in the sequence, and a new sequence containing the return values from the map function is returned. The map functions must be the form:
U function( const T &t );
U
is the return value and T
should be consistent with the type stored in the sequence.
QImage scaled(const QImage &image)
{
return image.scaled(100, 100);
}
QList<QImage> images = ...;
QFuture<QImage> thumbnails = QtConcurrent::mapped(images, scaled);
QtConcurrent::map()
modify a sequence in-place. The map function is:
U function( T &t );
The return value and return type are not used.
void scale(QImage &image)
{
image = image.scaled(100, 100);
}
QList<QImage> images = ...;
QFuture<void> future = QtConcurrent::map(images, scale);
Since the sequence is modified in place, QtConcurrent::map()
does not return any results via QFuture
. However, you can still use QFuture
and QFutureWatcher
to monitor the status of the map.
Concurrent Map-Reduce
QtConcurrent::mappedReduced()
is similar to QtConcurrent::mapped()
, but instead of returning a sequence with the new results, the results are combined into a single value using a reduce function. The reduce function must be of the form:
V function(T &result, const U &intermediate)
T
is the type of the final result, U
is the return type of the map function. The return value and return type of the reduce function are not used. Examples:
#include <QCoreApplication>
#include <QtConcurrent/QtConcurrentMap>
#include <QDebug>
#include <QDir>
#include <QFile>
#include <QMap>
#include <QStringList>
#include <QTextStream>
#include <QThread>
#include <QTime>
typedef QMap<QString, int> WordCounter;
QStringList findFiles( const QString &startDir, const QStringList &filters ) {
QStringList names;
QDir dir( startDir );
foreach ( const QString &file, dir.entryList( filters, QDir::Files ) ) {
names += startDir + '/' + file;
}
foreach ( const QString &subdir, dir.entryList( QDir::AllDirs | QDir::NoDotAndDotDot ) ) {
names += findFiles( startDir + '/' + subdir, filters );
}
return names;
}
WordCounter singleThreadFind( const QStringList &files ) {
WordCounter wordCounter;
foreach ( const QString &file, files ) {
QFile f( file );
f.open(QIODevice::ReadOnly);
QTextStream textStream( &f );
while ( !textStream.atEnd() ) {
foreach ( const QString &word , textStream.readLine().split(' ') ) {
wordCounter[word] += 1;
}
}
}
return wordCounter;
}
WordCounter countWords( const QString &file ) {
WordCounter wordCounter;
QFile f( file );
f.open( QIODevice::ReadOnly );
QTextStream textstream( &f );
while ( !textstream.atEnd() ) {
foreach ( const QString &word, textstream.readLine().split( ' ' ) ) {
wordCounter[word] += 1;
}
}
return wordCounter;
}
void reduce( WordCounter &result, const WordCounter &w ) {
// Jave-style iterator
QMapIterator<QString, int> iter( w );
while ( iter.hasNext() ) {
iter.next();
result[iter.key()] = iter.value();
}
}
int main( int argc, char **argv ) {
QCoreApplication app( argc, argv );
QTime time;
// find files
time.start();
const QStringList &files = findFiles( "../../../", QStringList() << "*.cpp" << "*.h" );
qDebug() << "Find " << files.size() << "files, using " << time.elapsed() / 1000.0 << "seconds";
// single thread test, 320s+
{
qDebug() << "Single thread";
time.start();
WordCounter words = singleThreadFind( files );
qDebug() << "Find " << words.size() << "Using " << time.elapsed() / 1000.0 << "seconds";
}
// concurrent map reduced, 80s
{
qDebug() << "Multi-thread";
time.start();
WordCounter words = QtConcurrent::mappedReduced( files, countWords, reduce );
qDebug() << "Find " << words.size() << "Using " << time.elapsed() / 1000.0 << "seconds";
}
return 0;
}
Additional APIs
- Using Iterators instead of Sequence
- Blocking Variants
- Using Member Functions
- Using Function Objects
- Wrapping Functions that Take Multiple Arguments
Concurrent Run
The QtConcurrent::run()
function runs a function in a separate thread. The return value of the function is made available through the QFuture
API.
Basic Usage
To run a function in another thread, use QtConcurrent::run():
extern void aFunction();
QFuture<void> future = QtConcurrent::run(aFunction);
This will run aFunction
in a separate thread obtained from the default QThreadPool
. You can use the QFuture
and QFutureWatcher
classes to monitor the status of the function.
Run in thread pool
To use a dedicated thread pool, you can pass the QThreadPool
as the first argument:
extern void aFunction();
QThreadPool pool;
QFuture<void> future = QtConcurrent::run(&pool, aFunction);
Pass arguments
Passing arguments to the function is done by adding them to the QtConcurrent::run() call immediately after the function name. For example:
extern void aFunctionWithArguments(int arg1, double arg2, const QString &string);
int integer = ...;
double floatingPoint = ...;
QString string = ...;
QFuture<void> future = QtConcurrent::run(aFunctionWithArguments, integer, floatingPoint, string);
A copy of each argument is made at the point where QtConcurrent::run() is called, and these values are passed to the thread when it begins executing the function. Changes made to the arguments after calling QtConcurrent::run() are not visible to the thread.
Return values
Any return value from the function is available via QFuture
:
extern QString functionReturningAString();
QFuture<QString> future = QtConcurrent::run(functionReturningAString);
...
QString result = future.result();
Run example
#include <QCoreApplication>
#include <QtConcurrent/QtConcurrentRun>
#include <QDebug>
#include <QThread>
#include <QMutex>
#include <QMutexLocker>
#include <iostream>
#include <functional>
int counter = 0;
QMutex mutex;
void foo_1() {
// qDebug() is not thread-safe
// https://stackoverflow.com/questions/22527253/is-qdebug-thread-safe
std::cout << QThread::currentThreadId() << "\n";
QMutexLocker locker( &mutex );
for ( int i = 0; i < 1000000; ++i ) {
counter ++;
}
}
void foo_2() {
std::cout << QThread::currentThreadId() << "\n";
QMutexLocker locker( &mutex );
for ( int i = 0; i < 2000000; ++i ) {
counter ++;
}
}
int main( int argc, char **argv ) {
QCoreApplication app( argc, argv );
QFuture<void> result_1 = QtConcurrent::run( foo_1 );
QFuture<void> result_2 = QtConcurrent::run( foo_2 );
QString str = "Hello world";
QFuture<QStringList> result_3 =
QtConcurrent::run( [=]() {
return str.split(" ");
} );
result_1.waitForFinished();
result_2.waitForFinished();
QStringList value = result_3.result();
std::cout << counter << "\n";
foreach ( const QString &s, value ) {
std::cout << s.toStdString() << "\n";
}
return 0;
}
Helper Class
QFuture
The QFuture
class represents the result of an asynchronous computation. QFuture
allows threads to be synchronized against one or more results which will be ready at a later point in time. If the result is not immediately available, this function will block and wait for the result to become available. Key functions:
-
T QFuture::result() const
: Returns the first result in the future. -
T QFuture::resultAt(int index) const
: Returns the result at index in the future.
QFuture
also offers ways to interact with a running computation. Key functions:
-
void QFuture::resume()
: Resumes the asynchronous computation represented by this future. void QFuture::setPaused(bool paused)
void QFuture::togglePaused()
void QFuture::cancel()
QFuture<void>
is specialized to not contain any of the result fetching functions. Any QFuture<T>
can be assigned or copied into a QFuture<void>
as well. This is useful if only status or progress information is needed - not the actual result data.
QFutureWatcher
The QFutureWatcher
class allows monitoring a QFuture using signals and slots. QFutureWatcher
provides information and notifications about a QFuture
. Use the setFuture()
function to start watching a particular QFuture
.
QFutureWatcher
provides lots of signals such as:
void QFutureWatcher::canceled()
void QFutureWatcher::finished()
void QFutureWatcher::paused()
void QFutureWatcher::progressValueChanged(int progressValue)
void QFutureWatcher::resultReadyAt(int index)
void QFutureWatcher::resultsReadyAt(int beginIndex, int endIndex)
Also, QFutureWatcher
provides some convenient slots:
void cancel()
void pause()
void resume()
void setPaused(bool paused)
void togglePaused()
QFutureWatcher
starts watching the given future by void QFutureWatcher::setFuture(const QFuture<T> &future)
. To avoid a race condition, it is important to call this function after doing the connections. Also, some functions are provided to report status.
Examples
Here will list a thumbnails generation example using QConcurrent
framework.
#ifndef IMAGEVIEWER_H
#define IMAGEVIEWER_H
#include <QWidget>
#include <QtConcurrent>
#include <QLabel>
#include <QList>
#include <QImage>
namespace Ui {
class ImageViewer;
}
class ImageViewer : public QWidget
{
Q_OBJECT
public:
explicit ImageViewer(QWidget *parent = 0);
signals:
private slots:
void open();
void finished();
void showImage( int i );
private:
QFutureWatcher<QImage> *m_watcher;
QList<QLabel *> m_labels;
Ui::ImageViewer *m_ui;
};
#endif // IMAGEVIEWER_H
#include "ImageViewer.h"
#include <QFileDialog>
#include "ui_ImageViewer.h"
static const int IMG_SIZE = 50;
QImage scale( const QString &path ) {
QImage image( path );
return image.scaled( QSize( IMG_SIZE, IMG_SIZE ), Qt::IgnoreAspectRatio, Qt::SmoothTransformation );
}
ImageViewer::ImageViewer(QWidget *parent) :
QWidget(parent), m_ui( new Ui::ImageViewer ) {
m_ui->setupUi( this );
m_watcher = new QFutureWatcher<QImage>();
connect( m_ui->m_openBtn, SIGNAL(clicked()), SLOT(open()) );
connect( m_ui->m_pauseBtn, SIGNAL(clicked()), m_watcher, SLOT(togglePaused()) );
connect( m_ui->m_cancelBtn, SIGNAL(clicked()), m_watcher, SLOT(cancel()) );
// do connection before setFuture for future watcher
connect( m_watcher, SIGNAL(resultReadyAt(int)), this, SLOT(showImage(int)) );
connect( m_watcher, SIGNAL(finished()), this, SLOT(finished()) );
}
void ImageViewer::open() {
if ( m_watcher->isRunning() ) {
m_watcher->cancel();
m_watcher->waitForFinished();
}
const QStringList &files = QFileDialog::getOpenFileNames( this, "Select Images",
QStandardPaths::writableLocation( QStandardPaths::PicturesLocation ),
"*.jpg *.png" );
if ( files.isEmpty() ) {
return;
}
qDeleteAll( m_labels );
m_labels.clear();
int dim = qSqrt( files.size() ) + 1;
for ( int i = 0; i < dim; ++i ) {
for ( int j = 0; j < dim; ++j ) {
QLabel *label = new QLabel;
label->setFixedWidth( IMG_SIZE );
m_labels.append( label );
m_ui->m_gridLayout->addWidget( label, i, j );
}
}
m_watcher->setFuture( QtConcurrent::mapped( files, scale ) );
m_ui->m_openBtn->setEnabled( false );
m_ui->m_pauseBtn->setEnabled( true );
m_ui->m_cancelBtn->setEnabled( true );
}
void ImageViewer::finished() {
m_ui->m_openBtn->setEnabled( true );
m_ui->m_pauseBtn->setEnabled( false );
m_ui->m_cancelBtn->setEnabled( false );
}
void ImageViewer::showImage( int i ) {
QLabel *image = m_labels[i];
if ( !image ) {
return;
}
image->setPixmap( QPixmap::fromImage( m_watcher->resultAt(i) ) );
}