作者都是各自领域经过审查的专家,并撰写他们有经验的主题. 我们所有的内容都经过同行评审,并由同一领域的Toptal专家验证.
安德烈·斯米尔诺夫的头像

Andrei Smirnov

Andrei在微软这样的公司工作了15年以上, EMC, Motorola, and Deutsche Bank on mobile, desktop, and web using C++, C#, and JS.

Expertise

Previously At

Microsoft
Share

C++ developers 努力构建健壮的多线程Qt应用程序, 但是,在所有这些竞争条件下,多线程从来都不是一件容易的事, synchronization, and deadlocks and livelocks. 值得称赞的是,你没有放弃,发现自己在搜索StackOverflow. Nevertheless, 从一打不同的答案中选择正确且有效的解决方案是相当重要的, 特别是考虑到每种解决方案都有其自身的缺点.

多线程是一种广泛使用的编程和执行模型,它允许多个线程存在于一个进程的上下文中. 这些线程共享进程的资源,但能够独立执行. 线程编程模型为开发人员提供了并发执行的有用抽象. 还可以将多线程应用于一个进程,以便在多处理系统上实现并行执行..

Wikipedia

本文的目标是汇总有关使用Qt框架进行并发编程的基本知识, 尤其是最容易被误解的话题. 读者需要有Qt和c++的背景才能理解本书的内容.

Choosing between using QThreadPool and QThread

Qt框架为多线程提供了许多工具. 一开始,选择正确的工具可能是一个挑战, but in fact, 决策树只包含两个选项:您希望Qt为您管理线程, 或者您希望自己管理线程. 然而,还有其他重要的标准:

  1. 不需要事件循环的任务. 具体来说,是指在任务执行过程中没有使用信号/槽机制的任务.
    Use: QtConcurrent and QThreadPool + QRunnable.

  2. 使用信号/槽的任务,因此需要事件循环.
    使用:工作对象移动到+ QThread.

Qt框架的巨大灵活性允许您绕过“缺失事件循环”问题并添加一个事件循环 QRunnable:

类MyTask:公共QObject,公共QRunnable
{
    Q_OBJECT
    
public:
    void MyTask::run() {
        _loop.exec();  
    }
    
public slots:
    //你需要一个连接到这个槽的信号来退出循环;
    //否则运行循环的线程将保持阻塞状态...
    void finishTask() {
        _loop.exit();
    }
    
private:
    QEventLoop _loop;
}

尽量避免这种“变通方法”, though, 如果线程池中的一个线程(运行MyTask)由于等待信号而阻塞, 然后,它不能执行池中的其他任务.

alt text

You can also run a QThread 没有任何事件循环重写 QThread::run() 方法,只要你知道你在做什么,这是完全没问题的. 例如,不要期望方法 quit() to work in such case.

一次运行一个任务实例

假设您需要确保一次只能执行一个任务实例,并且运行同一任务的所有挂起请求都在某个队列上等待. 当任务访问排他性资源时,通常需要这样做, 例如写入同一个文件或使用TCP套接字发送数据包.

Let’s forget about computer science and producer-consumer pattern for a moment and consider something trivial; something that can be easily found in real projects.

此问题的naïve解决方案可以使用 QMutex. Inside the task function, 您可以简单地获取互斥锁,有效地序列化所有试图运行该任务的线程. 这将保证一次只有一个线程可以运行该函数. 但是,此解决方案通过引入 high contention 因为所有这些线程在继续之前都会被阻塞(在互斥对象上). 如果您有许多线程正在使用这样的任务并在其间执行一些有用的工作, 然后所有这些线程大部分时间都处于睡眠状态.

void logEvent(const QString & event) {  
    static QMutex lock;  
    QMutexLocker locker(& lock);   // high contention!  
    logStream << event;            // exclusive resource  
}  

To avoid contention, 我们需要一个队列和一个工作线程,它在自己的线程中处理队列. 这是非常经典的 producer-consumer pattern. The worker (consumer)将一个接一个地从队列中挑选请求,每个 producer 可以简单地将其请求添加到队列中吗. 一开始听起来很简单,你可能会想到使用 QQueue and QWaitCondition等等,让我们看看我们是否可以在没有这些原语的情况下实现目标:

  • We can use QThreadPool 因为它有一个待挂任务队列

Or

  • We can use default QThread::run() because it has QEventLoop

The first option is to use QThreadPool. We can create a QThreadPool instance and use QThreadPool: setMaxThreadCount (1). Then we can be using QtConcurrent::run() to schedule requests:

class Logger: public QObject
{
public:
    显式Logger(QObject *parent = nullptr): QObject(parent) {
        threadPool.setMaxThreadCount(1);
    }

    void logEvent(const QString &event) {
        QtConcurrent::run(&threadPool, [this, event]{
            logEventCore(event);
        });
    }

private:
    无效logEventCore(const QString) &event) {
        logStream << event;
    }

    QThreadPool threadPool;
};

这个解决方案有一个好处: QThreadPool::clear() allows you to instantly cancel 所有挂起的请求,例如当您的应用程序需要快速关闭时. 然而,也有一个明显的缺点与之相关 thread-affinity: logEventCore 函数很可能在不同的线程中执行. 我们知道Qt有一些类需要 thread-affinity: QTimer, QTcpSocket and possibly some others.

Qt规范中关于线程亲和性的说明:计时器在一个线程中启动, 无法从另一个线程停止. 只有拥有套接字实例的线程才能使用这个套接字. 这意味着您必须在启动计时器的线程中停止任何正在运行的计时器,并且必须在拥有套接字的线程中调用QTcpSocket::close(). 这两个例子通常都在析构函数中执行.

更好的解决方案依赖于使用 QEventLoop provided by QThread. 这个想法很简单:我们使用信号/槽机制来发出请求, 在线程内部运行的事件循环将充当一个队列,每次只允许执行一个插槽.

//将被移动到线程的工作线程
类LogWorker:公共的QObject
{
    Q_OBJECT

public:
    明确LogWorker(QObject *parent = nullptr);

public slots:
    //这个槽将被事件循环执行(一次一个调用)
    void logEvent(const QString &event);
};

Implementation of LogWorker constructor and logEvent 是直接的,因此这里没有提供. 现在我们需要一个服务来管理线程和worker实例:

// interface
类LogService:公共的QObject
{
    Q_OBJECT
    
public:
    明确LogService(QObject *parent = nullptr);
    ~LogService();

signals:
    //要使用该服务,只需调用这个信号发送一个请求:
    // logService->logEvent("event");
    void logEvent(const QString &event);

private:
    QThread *thread;
    LogWorker *worker;
};

// implementation
LogService::LogService(QObject *parent): QObject(parent) {
    thread = new QThread(this);
    worker = new LogWorker;
    worker->moveToThread(thread);
    connect(this, &LogService::logEvent, worker, &LogWorker::logEvent);
    connect(thread, &QThread::finished, worker, &QObject::deleteLater);
    thread->start();
}

LogService::~LogService() {
    thread->quit();
    thread->wait();
}

alt text

让我们来讨论一下这段代码是如何工作的:

  • 在构造函数中,我们创建了一个线程和工作实例. 注意,工作线程没有接收父线程,因为它将被移动到新线程中. Because of this, Qt将不能自动释放工作线程的内存, and therefore, 我们需要通过连接来做到这一点 QThread::finished signal to deleteLater slot. 我们还连接了代理方法 LogService::logEvent() to LogWorker::logEvent() which will be using Qt::QueuedConnection 模式,因为线程不同.
  • 在析构函数中,我们将 quit 事件进入事件循环的队列. This event will be handled after all other events are handled. 例如,如果我们做了几百个 logEvent() 在析构函数调用之前调用, 记录器将在获取退出事件之前处理所有这些事件. 当然,这需要时间,所以我们必须 wait() until the event loop exits. 值得一提的是,所有未来的日志请求都会被发布 after 退出事件永远不会被处理.
  • The logging itself (LogWorker::logEvent)总是在同一个线程中完成, 因此,这种方法很适合需要的类 thread-affinity. At the same time, LogWorker 构造函数和析构函数在主线程(特别是线程)中执行 LogService 运行),因此,您需要非常小心您在那里运行的代码. Specifically, 不要停止计时器或在工作者的析构函数中使用套接字,除非你可以在同一个线程中运行析构函数!

在同一线程中执行worker的析构函数

如果您的工作程序正在处理定时器或套接字, 您需要确保析构函数在同一个线程中执行(为worker创建的线程和将worker移动到的线程)。. 支持这一点的明显方法是创建子类 QThread and delete worker inside QThread::run() method. 考虑以下模板:

template 
class Thread : QThread
{
public:
    显式线程(TWorker *worker, QObject *parent = nullptr)
        : QThread(parent), _worker(worker) {
        _worker->moveToThread(this);
        start();
    }

    ~Thread() {
        quit();
        wait();
    }

    TWorker worker() const {
        return _worker;
    }

protected:
    void run() override {
        QThread::run();
        delete _worker;
    }

private:
    TWorker *_worker;
};

使用这个模板,我们重新定义 LogService from the previous example:

// interface
class LogService : public Thread
{
    Q_OBJECT

public:
    明确LogService(QObject *parent = nullptr);

signals:
    void **logEvent**(const QString &event);
};

// implementation
LogService:: * * LogService * * (QObject *父母)
    : Thread(new LogWorker, parent) {
    connect(this, &LogService: logEvent、工人(), &LogWorker::logEvent);
}

让我们讨论一下这应该是如何工作的:

  • We made LogService to be the QThread 对象,因为我们需要实现自定义 run() function. 我们使用私有子类来防止访问 QThread因为我们想在内部控制线程的生命周期.
  • In Thread::run() 函数,通过调用默认的方法来运行事件循环 QThread::run() 实现,并在事件循环退出后立即销毁工作实例. 注意,worker的析构函数是在同一个线程中执行的.
  • LogService::logEvent() 代理函数(信号)将发布日志事件到线程的事件队列.

暂停和恢复线程

另一个有趣的机会是能够挂起和恢复我们的自定义线程. 假设您的应用程序正在执行一些需要在应用程序最小化时暂停的处理, locked, 或者只是失去了网络连接. 这可以通过构建一个自定义异步队列来实现,该队列将保存所有挂起的请求,直到工作线程恢复. However, 因为我们在寻找最简单的解决方案, 出于同样的目的,我们将(再次)使用事件循环的队列.

要挂起一个线程,我们显然需要它在一定的等待条件下等待. 如果线程以这种方式被阻塞, 它的事件循环不处理任何事件,Qt必须把keep放在队列中. 一旦恢复,事件循环将处理所有累积的请求. 对于等待条件,我们简单地使用 QWaitCondition object that also requires a QMutex. 设计一个可以被任何工作者重用的通用解决方案, 我们需要把所有的挂起/恢复逻辑放到一个可重用的基类中. Let’s call it SuspendableWorker. 这样的类应该支持两个方法:

  • suspend() 阻塞调用会使线程处于等待状态吗. 这可以通过将挂起请求提交到队列中并等待它被处理来实现. Pretty much similar to QThread::quit() + wait().
  • resume() 是否发出等待条件的信号以唤醒休眠线程以继续执行.

让我们回顾一下接口和实现:

// interface
类SuspendableWorker:公共QObject
{
    Q_OBJECT

public:
    明确SuspendableWorker(QObject *parent = nullptr);
    ~SuspendableWorker();

    // resume()必须从外部线程调用.
    void resume();

    //必须从外部线程调用suspend().
    //该函数将阻塞调用者的线程,直到
    //工作线程挂起.
    void suspend();

private slots:
    void suspendImpl();

private:
    QMutex _waitMutex;
    QWaitCondition _waitCondition;
};
// implementation
SuspendableWorker::SuspendableWorker(QObject *parent): QObject(parent) {
    _waitMutex.lock();
}

SuspendableWorker:: ~ SuspendableWorker () {
    _waitCondition.wakeAll();
    _waitMutex.unlock();
}

void SuspendableWorker::resume() {
    _waitCondition.wakeAll();
}

void SuspendableWorker::suspend() {
    QMetaObject: invokeMethod(这一点, &SuspendableWorker: suspendImpl);
    //获取互斥锁来阻塞调用线程
    _waitMutex.lock();
    _waitMutex.unlock();
}

void SuspendableWorker::suspendImpl() {
    _waitCondition.wait(&_waitMutex);
}

请记住,挂起的线程永远不会接收 quit event. 出于这个原因,我们不能安全地将其与香草一起使用 QThread 除非我们在发帖退出前恢复线程. 让我们把它集成到我们的自定义中 Thread 模板,使其防弹.

alt text

template 
class Thread : QThread
{
public:
    显式线程(TWorker *worker, QObject *parent = nullptr)
        : QThread(parent), _worker(worker) {
        _worker->moveToThread(this);
        start();
    }

    ~Thread() {
        resume();
        quit();
        wait();
    }

    void suspend() {
        auto worker = qobject_cast(_worker);
        if (worker != nullptr) {
            worker->suspend();
        }
    }

    void resume() {
        auto worker = qobject_cast(_worker);
        if (worker != nullptr) {
            worker->resume();
        }
    }

    TWorker worker() const {
        return _worker;
    }

protected:
    void run() override {
        QThread::*run*();
        delete _worker;
    }

private:
    TWorker *_worker;
};

通过这些更改,我们将在发布退出事件之前恢复线程. Also, Thread 仍然允许传入任何类型的worker,不管它是否是 SuspendableWorker or not.

用法如下:

LogService logService;
logService.logEvent("processed event");
logService.suspend();
logService.logEvent("queued event");
logService.resume();
//“队列事件”现在被处理.

volatile vs atomic

这是一个经常被误解的话题. Most people believe that volatile 变量可以用于提供由多个线程访问的某些标志,这可以避免数据竞争条件. That is false, and QAtomic* classes (or std::atomic)必须用于此目的.

让我们考虑一个现实的例子 TcpConnection 在专用线程中工作的连接类, 我们希望这个类导出一个线程安全的方法: bool isConnected(). 在内部,类将监听套接字事件: connected and disconnected 维护一个内部布尔标志:

//伪代码,不会编译
类TcpConnection: QObject
{
    Q_OBJECT 

public:
    // this is not thread-safe!
    bool isConnected() const {
        return _connected;
    }
    
private slots:
    无效handleSocketConnected() {
        _connected = true;
    }
    
    无效handleSocketDisconnected() {
        _connected = false;
    }
    
private:
    bool _connected;
}

Making _connected member volatile 不会解决问题,也不会制造 isConnected() thread-safe. 这个方法在99%的情况下是有效的,但是剩下的1%会让你的生活变成噩梦. 为了解决这个问题,我们需要保护多个线程对变量的访问. Let’s use QReadWriteLocker for this purpose:

//伪代码,不会编译
类TcpConnection: QObject
{
    Q_OBJECT 

public:
    bool isConnected() const {
        QReadLocker locker(&_lock);
        return _connected;
    }
    
private slots:
    无效handleSocketConnected() {
        QWriteLocker locker(&_lock);
        _connected = true;
    }
    
    无效handleSocketDisconnected() {
        QWriteLocker locker(&_lock);
        _connected = false;
    }
    
private:
    QReadWriteLocker _lock;
    bool _connected;
}

这可以可靠地工作,但不如使用“无锁”原子操作快. 第三种解决方案既快速又线程安全(本例使用的是 std::atomic instead of QAtomicInt,但它们在语义上是相同的):

//伪代码,不会编译
类TcpConnection: QObject
{
    Q_OBJECT 

public:
    bool isConnected() const {
        return _connected;
    }
    
private slots:
    无效handleSocketConnected() {
        _connected = true;
    }
    
    无效handleSocketDisconnected() {
        _connected = false;
    }
    
private:
    std::atomic _connected;
}

Conclusion

In this article, 我们讨论了使用Qt框架并发编程的几个重要问题,并设计了解决方案来处理特定的用例. 我们还没有考虑到许多简单的主题,比如原子原语的使用, read-write locks, and many others, 但如果你对这些感兴趣, 在下面留下你的评论,并要求这样的教程.

如果你有兴趣探索Qmake,我最近也发表了 The Vital Guide to Qmake. It’s a great read!

Understanding the basics

  • How is multithreading useful?

    这个多线程模型为开发人员提供了一个有用的并发执行抽象. However, 当应用于单个进程时,它真正发挥了作用:支持在多处理器系统上并行执行.

  • What is multithreading?

    多线程是一种编程和执行模型,它允许多个线程存在于单个进程的上下文中. 这些线程共享进程的资源,但能够独立执行.

  • What is a Qt application?

    一个用Qt框架构建的应用程序. Qt应用程序通常用c++构建,因为框架本身就是用c++构建的. 然而,还有其他语言绑定,如Python-Qt.

  • Qt支持哪些并发模型?

    通过利用QThreadPool和QRunnable实现基于任务的并发性, 使用QThread类进行线程编程.

  • Qt开发人员可以使用哪些同步原语?

    最常用的是QMutex、QSemaphore和QReadWriteLock. QAtomic*类还提供了无锁原子操作.

就这一主题咨询作者或专家.
Schedule a call
安德烈·斯米尔诺夫的头像
Andrei Smirnov

Located in Ankara, Turkey

Member since December 11, 2014

About the author

Andrei在微软这样的公司工作了15年以上, EMC, Motorola, and Deutsche Bank on mobile, desktop, and web using C++, C#, and JS.

Toptal作者都是各自领域经过审查的专家,并撰写他们有经验的主题. 我们所有的内容都经过同行评审,并由同一领域的Toptal专家验证.

Expertise

Previously At

Microsoft

世界级的文章,每周发一次.

订阅意味着同意我们的 privacy policy

世界级的文章,每周发一次.

订阅意味着同意我们的 privacy policy

Toptal Developers

Join the Toptal® community.