C++ 中的并发性:消息队列--第1部分

作者:微信公众号:【架构师老卢】
4-29 8:7
21

概述:监视器对象模式在前面的并发部分中,我们了解到数据保护是并发编程中的关键元素。在研究了几种实现此目的的方法之后,我们现在希望在这些概念的基础上设计一种在线程之间进行受控和细粒度数据交换的方法(消息队列)。实现这种构造的一个重要步骤是实现监视对象,这是一种同步并发方法执行的设计模式,以确保一次只在对象中运行一个方法。它还允许对象的方法协同调度其执行序列。此模式解决的问题基于以下观察结果:许多应用程序包含其方法由多个客户端线程同时调用的对象。这些方法通常会修改其对象的状态,例如,通过向内部向量添加数据。为了使此类并发程序正确执行,必须非常小心地同步和调度对对象的访问。监视对象的思想是同步对对象方法的

监视器对象模式

在前面的并发部分中,我们了解到数据保护是并发编程中的关键元素。在研究了几种实现此目的的方法之后,我们现在希望在这些概念的基础上设计一种在线程之间进行受控和细粒度数据交换的方法(消息队列)。实现这种构造的一个重要步骤是实现监视对象,这是一种同步并发方法执行的设计模式,以确保一次只在对象中运行一个方法。它还允许对象的方法协同调度其执行序列。此模式解决的问题基于以下观察结果:许多应用程序包含其方法由多个客户端线程同时调用的对象。这些方法通常会修改其对象的状态,例如,通过向内部向量添加数据。为了使此类并发程序正确执行,必须非常小心地同步和调度对对象的访问。监视对象的思想是同步对对象方法的访问,以便任何时候只能执行一个方法。

在上一节关于互斥锁和锁的章节中,我们看了一个代码示例,它非常接近监视器对象的功能:类。WaitingVehicles

让我们修改并部分重新实现这个类,我们希望将其用作并发线程可以存储数据的共享位置,在我们的例子中是类的实例。由于我们将对所有线程使用相同的对象,因此我们必须通过引用将其传递给它们 - 并且由于所有线程将同时写入此对象(这是一个突变操作),因此我们将将其作为共享指针传递。请记住,将有许多线程尝试同时将数据传递到对象,因此存在数据争用的危险。VehicleWaitingVehiclesWaitingVehicles

在我们看一下 的实现之前,让我们先看一下 main 函数(见下面的代码),所有线程都是在其中生成的。我们需要一个向量来存储期货,因为没有要从线程返回的数据。此外,我们需要在末尾调用期货,这样程序就不会在线程执行完成之前过早退出。WaitingVehicleswait()main()

我们将不再使用,而是使用构建未来的位置,而不是将它们移动到向量中。在 for 循环中构造一个新对象后,我们通过向它传递对函数的引用、指向对象的共享指针和新创建的载体来启动一个新任务。请注意,后者是使用移动语义传递的。push_backemplace_backVehiclepushBackWaitingVehicles

#include <iostream>
#include <thread>
#include <vector>
#include <future>
#include <mutex>

class Vehicle
{
public:
    Vehicle(int id) : _id(id) {}
    int getID() { return _id; }

private:
    int _id;
};

class WaitingVehicles
{
public:
    WaitingVehicles() {}

    void printIDs()
    {
        std::lock_guard<std::mutex> myLock(_mutex); // lock is released when myLock goes out of scope
        for(auto &v : _vehicles)
            std::cout << "   Vehicle #" << v.getID() << " is now waiting in the queue" << std::endl;
        
    }

    void pushBack(Vehicle &&v)
    {
        // perform vector modification under the lock
        std::lock_guard<std::mutex> uLock(_mutex);
        std::cout << "   Vehicle #" << v.getID() << " will be added to the queue" << std::endl; 
        _vehicles.emplace_back(std::move(v));

        // simulate some work
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }

private:
    std::vector<Vehicle> _vehicles; // list of all vehicles waiting to enter this intersection
    std::mutex _mutex;
};

int main()
{
    // create monitor object as a shared pointer to enable access by multiple threads
    std::shared_ptr<WaitingVehicles> queue(new WaitingVehicles);

    std::cout << "Spawning threads..." << std::endl;
    std::vector<std::future<void>> futures;
    for (int i = 0; i < 10; ++i)
    {
        // create a new Vehicle instance and move it into the queue
        Vehicle v(i);
        futures.emplace_back(std::async(std::launch::async, &WaitingVehicles::pushBack, queue, std::move(v)));
    }

    std::for_each(futures.begin(), futures.end(), [](std::future<void> &ftr) {
        ftr.wait();
    });

    std::cout << "Collecting results..." << std::endl;
    queue->printIDs();

    return 0;
}

仔细看看 waitVehicle 对象的实现

我们需要启用 waitVehicle 对象来同时处理来自多个线程的写入请求。每次从线程传入请求时,对象都需要将新数据添加到其内部存储中。我们的存储容器将是一个.由于我们需要保护向量以后不被同时访问,我们还需要将互斥锁集成到类中。正如我们已经知道的,互斥锁具有锁定和解锁的方法。为了避免数据争用,每次线程想要访问向量时都需要锁定互斥锁,并且在写入操作完成后需要解锁互斥锁。为了避免由于缺少解锁操作而导致程序冻结,我们将使用锁保护对象,一旦锁定对象超出范围,它就会自动解锁。std::vector

在我们修改后的函数中,我们将首先创建一个锁保护对象,并向其传递互斥成员变量。现在,我们可以自由地将 Vehicle 对象移动到我们的向量中,而不会出现数据争用的危险。在函数的末尾,有一个对 的调用,它模拟了一些工作,帮助我们更好地暴露潜在的并发问题。对于传递到队列中的每个新 Vehicle 对象,我们将看到控制台的输出。pushBackstd::sleep_for

类中的另一个函数是 ,它遍历向量的所有元素,并将它们各自的 ID 打印到控制台。和 之间的一个主要区别是,后者函数通过遍历向量来访问所有对象,而只访问单个对象 - 这是向量的最新添加。WaitingVehicleprintIDs()pushBack()printIDs()VehiclepushBackVehicle

虽然我们构建的监视器对象的功能比许多其他允许将数据传递到线程的方法有所改进,但它有一个明显的缺点:主线程必须等到所有工作线程都完成其作业,然后才能批量访问添加的数据。然而,一个真正具有交互性的系统必须在事件到达时做出反应——它不应该等到所有线程都完成其工作,而是在新数据到达后立即采取行动

创建无限轮询循环

当线程使用该方法以增量方式向监视器添加数据时,主线程在最后使用该方法一次显示所有结果。我们的目标是以一种方式更改代码,以便在每次有新数据可用时通知主线程。但是,主线程如何知道新数据是否可用呢?解决方案是编写一种新方法,定期检查新数据的到来。pushBackprintSize

在下面列出的代码中,添加了一个新方法,同时删除了该方法。如果向量中的数据可用,则此方法返回 true,否则返回 false。一旦线程通过向量中发现新数据,它就可以调用该方法从监视对象中检索数据。请注意,它不是复制数据,而是从向量移动到方法。dataIsAvailable()printIDs()maindataIsAvailable()popBack()main

#include <iostream>
#include <thread>
#include <vector>
#include <future>
#include <mutex>

class Vehicle
{
public:
    Vehicle(int id) : _id(id) {}
    int getID() { return _id; }

private:
    int _id;
};

class WaitingVehicles
{
public:
    WaitingVehicles() {}

    bool dataIsAvailable()
    {
        std::lock_guard<std::mutex> myLock(_mutex);
        return !_vehicles.empty();
    }

    Vehicle popBack()
    {
        // perform vector modification under the lock
        std::lock_guard<std::mutex> uLock(_mutex);

        // remove last vector element from queue
        Vehicle v = std::move(_vehicles.back());
        _vehicles.pop_back();

        return v; // will not be copied due to return value optimization (RVO) in C++
    }

    void pushBack(Vehicle &&v)
    {
        // simulate some work
        std::this_thread::sleep_for(std::chrono::milliseconds(100));

        // perform vector modification under the lock
        std::lock_guard<std::mutex> uLock(_mutex);

        // add vector to queue
        std::cout << "   Vehicle #" << v.getID() << " will be added to the queue" << std::endl;
        _vehicles.emplace_back(std::move(v));
    }

private:
    std::vector<Vehicle> _vehicles; // list of all vehicles waiting to enter this intersection
    std::mutex _mutex;
};

int main()
{
    // create monitor object as a shared pointer to enable access by multiple threads
    std::shared_ptr<WaitingVehicles> queue(new WaitingVehicles);

    std::cout << "Spawning threads..." << std::endl;
    std::vector<std::future<void>> futures;
    for (int i = 0; i < 10; ++i)
    {
        // create a new Vehicle instance and move it into the queue
        Vehicle v(i);
        futures.emplace_back(std::async(std::launch::async, &WaitingVehicles::pushBack, queue, std::move(v)));
    }

    std::cout << "Collecting results..." << std::endl;
    while (true)
    {
        if (queue->dataIsAvailable())
        {
            Vehicle v = queue->popBack();
            std::cout << "   Vehicle #" << v.getID() << " has been removed from the queue" << std::endl;
        }
    }

    std::for_each(futures.begin(), futures.end(), [](std::future<void> &ftr) {
        ftr.wait();
    });

    std::cout << "Finished processing queue" << std::endl;

    return 0;
}

在线程中,我们将使用无限的 while 循环来频繁轮询监视器对象并检查新数据是否可用。与之前相反,我们现在将在 worker 完成之前执行读取操作 - 因此我们必须在 末尾调用期货之前集成我们的循环。一旦有新对象可用,我们希望在循环中打印它。mainwait()main()Vehicle

从输出中可以很容易地看出,在监视对象中添加和删除现在是交错的。当重复执行时,车辆的顺序很可能在两次执行之间有所不同。

练习:编写车辆计数器

请注意,上面示例中的程序不会终止 - 即使没有将新车辆添加到队列中,无限 while 循环也不会退出。

此问题的一种可能解决方案是将 vehicle 计数器集成到类中,每次添加 Vehicle 对象时都会递增该计数器,并在删除该对象时递减该计数器。一旦计数器达到零,while-loop 就可以终止。请继续实现此功能 - 但请记住保护计数器,因为它也将同时被多个线程访问。此外,在生成线程和收集结果之间引入一个小延迟也是一个好主意。否则,队列将默认为空,程序将提前终止。

阅读排行