C++ 中的并发:互斥锁和锁 — 第 1 部分

作者:微信公众号:【架构师老卢】
4-29 8:30
27

概述:互斥实体到目前为止,我们用于在线程之间传递数据的方法都是短期的,涉及将参数(promise)从父线程传递到工作线程,然后在结果可用时(通过 future)将结果传递回父线程。promise-future 结构是一次性使用的非永久性通信通道。我们已经看到,为了避免数据争用,我们需要放弃访问共享数据,或者在不改变数据的情况下将其用于只读访问。在本章中,我们想研究一种建立稳定的长期沟通渠道的方法,该渠道既允许共享,也允许突变。理想情况下,我们希望有一个与无线电信道上的语音通信相对应的通信协议,其中发射器使用表达式“结束”来指示向接收器传输的结束。通过使用这种协议,发送方和接收方可以轮流传输他们的数据

互斥实体

到目前为止,我们用于在线程之间传递数据的方法都是短期的,涉及将参数(promise)从父线程传递到工作线程,然后在结果可用时(通过 future)将结果传递回父线程。promise-future 结构是一次性使用的非永久性通信通道。

我们已经看到,为了避免数据争用,我们需要放弃访问共享数据,或者在不改变数据的情况下将其用于只读访问。在本章中,我们想研究一种建立稳定的长期沟通渠道的方法,该渠道既允许共享,也允许突变。理想情况下,我们希望有一个与无线电信道上的语音通信相对应的通信协议,其中发射器使用表达式“结束”来指示向接收器传输的结束。通过使用这种协议,发送方和接收方可以轮流传输他们的数据。在 C++ 中,这种轮流的概念可以由称为“互斥”的实体构建——它代表相互排除。

回想一下,数据争用需要从两个线程同时访问。如果我们可以保证一次只有一个线程可以访问特定的内存位置,那么就不会发生数据争用。为了做到这一点,我们需要建立一个通信协议。需要注意的是,互斥锁本身并不是数据争用问题的解决方案,而只是程序员必须实现和遵守的线程安全通信协议的使能器。

让我们看一下这个协议是如何工作的:假设我们有一个内存(例如共享变量),我们想要保护它免受同时访问,我们可以分配一个互斥锁作为这个特定内存的守护者。重要的是要了解互斥锁绑定到它所保护的内存。想要访问受保护内存的线程 1 必须首先“锁定”互斥锁。线程 1 “在锁下”后,线程 2 被阻止访问共享变量,它无法获取互斥锁上的锁,并被系统暂时挂起。

线程 1 的读取或写入操作完成后,它必须“解锁”互斥锁,以便线程 2 可以访问内存位置。通常,“在锁下”执行的代码被称为“关键部分”。需要注意的是,对共享内存的只读访问也必须锁定互斥锁以防止数据争用 - 当另一个线程(当时可能处于锁定状态)要修改数据时,就会发生这种情况。

当多个线程尝试获取并锁定互斥锁时,只有一个线程会成功。所有其他线程都会自动暂停——就像汽车在十字路口等待绿灯一样。一旦成功获取锁的线程完成其工作并解锁互斥锁,等待访问的排队线程将被唤醒并允许锁定互斥锁以继续其读/写操作。如果所有线程都遵循此协议,则可以有效地避免数据争用。在我们仔细研究这样的协议之前,让我们接下来分析一个代码示例。

#include <iostream>
#include <thread>
#include <vector>
#include <future>
#include<algorithm>

class Vehicle
{
public:
    Vehicle(int id) : _id(id) {}

private:
    int _id;
};

class WaitingVehicles
{
public:
    WaitingVehicles() : _tmpVehicles(0) {}

    // getters / setters
    void printSize()
    {
        std::cout << "#vehicles = " << _tmpVehicles << std::endl;
    }

    // typical behaviour methods
    void pushBack(Vehicle &&v)
    {
        //_vehicles.push_back(std::move(v)); // data race would cause an exception
        int oldNum = _tmpVehicles;
        std::this_thread::sleep_for(std::chrono::microseconds(1)); // wait deliberately to expose the data race
        _tmpVehicles = oldNum + 1;
    }

private:
    std::vector<Vehicle> _vehicles; // list of all vehicles waiting to enter this intersection
    int _tmpVehicles; 
};

int main()
{
    std::shared_ptr<WaitingVehicles> queue(new WaitingVehicles); 
    std::vector<std::future<void>> futures;
    for (int i = 0; i < 1000; ++i)
    {
        Vehicle v(i);
        futures.emplace_back(std::async(std::launch::async, &WaitingVehicles::pushBack, queue, std::move(v)));
    }

    std::for_each(futures.begin(), futures.end(), [](std::future<void> &ftr) {
        ftr.wait();
    });

    queue->printSize();

    return 0;
}

此代码基于我们在前面的并发文章中看到的一些类构建。有一个具有单个数据成员 () 的类 Vehicle。此外,还有一个类 WaitingVehicles,它应该在内部向量中存储许多车辆。请注意,与课程项目相反,使用右值参考将车辆移动到向量中。另请注意,此处注释掉了 push_back 函数。这样做的原因是,我们试图引发数据竞赛 - 让push_back处于活动状态会导致程序崩溃(我们将在稍后对其进行评论)。这也是为什么有一个辅助成员的原因,它将用于计算通过调用添加到的车辆数量。这个临时变量将帮助我们在不使程序崩溃的情况下暴露数据竞争。int _id_tmpVehiclespushBack()

在 中,for 循环用于启动大量任务,这些任务都尝试将新创建的车辆添加到队列中。使用 launch 选项同步运行程序会在控制台上生成以下输出:main()std::launch::deferred

#vehicles = 1000

正如人们所期望的那样,每个任务都会在队列中插入一个元素,车辆总数达到 1000 辆。

现在,让我们强制执行并发行为,并将启动选项更改为 。这将生成 813 的输出。std::launch::async

似乎并非所有车辆都可以添加到队列中。但这是为什么呢?请注意,在线程函数“pushBack”中,有一个对 的调用,它会在短时间内暂停线程执行。这是发生数据争用的位置:首先,当前值存储在一个临时变量中。当线程暂停时,可能会(并且将)由其他线程执行更改。当执行恢复时,将写回以前的值,从而使同时具有写入访问权限的所有线程的贡献无效。有趣的是,当注释掉时,程序的输出与 with 相同 - 至少在我们运行程序时的大部分时间里都是如此。但偶尔,可能会有一个调度星座导致错误暴露自己。除了了解数据竞赛之外,您还应该建议在开发的测试/调试阶段引入故意的时间延迟,以帮助暴露许多并发错误。sleep_for_tmpVehiclesoldNum_tmpVehicles_tmpVehiclessleep_forstd::launch::deferred

使用互斥锁保护数据

在最简单的形式中,使用互斥锁包括四个直接的步骤:

  1. 包括标题<mutex>
  2. 创建一个std::mutex
  3. 在调用读/写之前使用锁定互斥锁lock()
  4. 读/写操作完成后,使用unlock()

为了防止访问同时被多个线程操作,在下面的代码中,已将互斥锁作为私有数据成员添加到类中。在函数中,互斥锁在将新元素添加到向量之前被锁定,并在操作完成后解锁。_vehiclespushBack

请注意,在打印矢量大小之前,互斥锁也被锁定在函数 printSize 中。这种锁定的原因有两个:首先,我们希望防止在发生对向量的读取访问和同时写访问(即使在锁定下)时发生的数据争用。其次,我们希望将标准输出专门保留给控制台,用于打印矢量大小,而无需其他线程同时打印到控制台。

执行此代码时,向量中将有 1000 个元素。通过对共享资源使用互斥锁,可以有效地避免数据争用。

#include <iostream>
#include <thread>
#include <vector>
#include <future>
#include <mutex>
#include<algorithm>

class Vehicle
{
public:
    Vehicle(int id) : _id(id) {}

private:
    int _id;
};

class WaitingVehicles
{
public:
    WaitingVehicles() {}

    // getters / setters
    void printSize()
    {
        _mutex.lock();
        std::cout << "#vehicles = " << _vehicles.size() << std::endl;
        _mutex.unlock();
    }

    // typical behaviour methods
    void pushBack(Vehicle &&v)
    {
        _mutex.lock();
        _vehicles.emplace_back(std::move(v)); // data race would cause an exception
        _mutex.unlock();
    }

private:
    std::vector<Vehicle> _vehicles; // list of all vehicles waiting to enter this intersection
    std::mutex _mutex;
};

int main()
{
    std::shared_ptr<WaitingVehicles> queue(new WaitingVehicles); 
    std::vector<std::future<void>> futures;
    for (int i = 0; i < 1000; ++i)
    {
        Vehicle v(i);
        futures.emplace_back(std::async(std::launch::async, &WaitingVehicles::pushBack, queue, std::move(v)));
    }

    std::for_each(futures.begin(), futures.end(), [](std::future<void> &ftr) {
        ftr.wait();
    });

    queue->printSize();

    return 0;
}
阅读排行