4.1 线程的简单介绍与同步方法

线程的简单介绍与线程同步方法

线程介绍

这里的线程主要为 POSIX 线程,即 Pthreads,由 Native POSIX Threads Library(NPTL)实现。 线程是允许应用程序并发执行多个任务的一种机制,一个进程可以包含多个线程,线程可以共享同一份全局内存区域。 使用多进程实现并行有以下不足:

  • 进程间的信息难以共享,必须采用进程间通信(IPC)方式

  • 调用 fork 创建进程的代价比较高

线程完美的解决了上面两个问题,线程还可共享:

  • 进程 ID,父进程 ID

  • 进程组 ID 与会话 ID

  • 打开的文件描述符

  • 信号处置

各个线程的独有属性有:

  • 线程 ID

  • 信号掩码

  • 线程特有数据

  • erron 变量,是一个宏

后续 Pthreads 系列函数均以 0 表示成功,返回一个正值表示失败,这个正值与传统 Unix 的 errno 的值含义一样。在编译调用了 Pthreads 函数的程序需要添加 -lpthread以链接此库。

创建线程

程序刚启动时,只包含一条线程,称为主线程。

#include <pthread.h>
/*
@brief	创建线程
@param	thread 	线程 ID,通过该标识可以引用该进程
@param	attr	指定了新线程的各种属性,NULL 则使用默认属性
@param	start	新线程将执行 start 函数
@param	arg	为传递给 start 的参数
*/
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
                   void *(*start)(void *), void *arg);

创建新线程后,系统会优先调度哪一个是不确定的,如果对执行顺序有明确要求可采取一些同步技术(见后续的线程同步小节)。

终止线程

可采用以下方式终止线程:

  • 线程 start 函数执行到 return 语句

  • 线程调用 pthread_exit

  • 调用 pthread_cancel 取消线程

  • 任意线程调用了 exit,或者主线程(main)执行了 return

#include <pthread.h>
// 终止线程,retval 指定了线程的返回值,其指向的内容不应该分配在线程栈中
// 因为线程终止后,线程栈内的内容不一定有效
void pthread_exit(void *retval);

线程 ID

线程 ID 是线程的唯一标识,pthread_create 可返回线程 ID,线程也可获取自己的线程 ID:

#include <pthread.h>
pthread_t pthread_self(void);

不能直接使用 == 运算符判断两个线程 ID 是否相等,因为其底层实现可能是无符号长整型、指针、结构:

#include <pthread.h>
// 判断线程 ID 是否相同
int pthread_equal(pthread_t t1, pthread_t t2);

但 NPTL 中,它实际上是一个经强制转化为无符号长整形的指针。 在 Linux 中线程 ID 在所有进程中都是唯一的,不过其它 Unix 实现则不一定。线程 ID 可复用,当某个线程终止后,其 ID 可被后续新线程复用。

连接已终止的线程

类似进程中的 wait 系列函数:

#include <pthread.h>
// 等待 thread 线程终止,并将线程返回值拷贝到 retval 处,若其为非空指针
int pthread_join(pthread_t thread, void **retval);

与进程类似,若线程结束后没有连接,则会产生僵尸线程,除非手动将其分离(后续介绍)。如向 pthread_join 传入一个已经连接过的线程将会导致无法预知的行为,因为线程 ID 可能会被复用。 线程的连接与进程中的 wait 有以下不同:

  • 线程之间的关系是对等的,不像进程存在父子关系。进程中的任意线程均可以调用 pthread_join 与其他线程连接起来。

  • 不能以非阻塞方式连接,也不能连接任意线程(如别的进程中的线程)。

线程分离

线程默认清空下是可连接的,但也可指定其为分离状态。分离状态的线程在终止后,系统会自动清理并移除。当不关心线程的返回状态时,这十分有用。

#include <pthread.h>
// 调用 pthread_detach(pthread_self)); 可自行分离
int pthread_detach(pthread_t thread);

一旦线程处理分离状态,则不可被连接,也不能重返可连接状态。在其他线程调用了 exit 或者主线程 return 后,分离的线程仍然会被终止。

线程与进程

线程有以下优点:

  • 线程间数据共享十分简单

  • 创建线程很快

相比与进程有以下缺点:

  • 多线程编程时,需要确保调用线程安全函数,或以线程安全的方式调用函数(后续介绍)

  • 因为共享共一个全局空间,某个线程的 bug 可能会影响到全局

  • 线程共用宿主进程的虚拟地址空间,分配大量线程时,则会出现问题

  • 线程中处理信号十分麻烦,在多线程程序中应避免使用信号

线程同步

线程同步旨在同步对共享资源的使用,可通过互斥量和条件变量实现。

互斥量

线程可以直接访问全局变量,但必须确保多个线程不会同时修改同一变量。这部分全局变量称之为临界区(共享资源),操作临界区的代码必须是原子的,某线程访问临界区时,不应该被其他线程中断。 互斥量有两种状态:已锁定和未锁定。任何时候只有一个线程可以锁定该互斥量,试图对一个已经上锁的互斥量加锁可能会阻塞进程或者返回失败。 一旦线程锁定某一互斥量,便只有该线程可以解锁。一般情况下,对不同共享资源会使用不同的互斥量,每个线程在访问同一资源时,一般遵循:

  1. 针对该共享资源锁定互斥量

  2. 访问共享资源

  3. 解锁

互斥量类型为 pthread_mutex_t,在使用之前必须对其初始化:

#include <pthread.h>
// 静态初始化
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
/*
@brief	动态初始化,attr 为该互斥量的各种属性值,NULL 则使用默认属性
@param	attr 包括了互斥量的类型:
        - PTHREAD_MUTEX_NORMAL		此类型互斥量不具有死锁检测功能
        - PTHREAD_MUTEX_ERRORCHECK	这类互斥量的所有操作均会进行错误检查
        				不过效率会差一些,可用作 Debug
        - PTHREAD_MUTEX_RECURSIVE	递归互斥量维护一个锁计数器,每次锁定都会将计数器值加 1
                                        解锁操作递减计数器,当计数器为 0 时才会释放
        
*/
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
// 动态初始化的互斥量需要手动销毁
int pthread_mutex_destory(pthread_mutex_t *mutex);

加锁和解锁互斥量:

#include <pthread.h>
// 尝试给互斥量加锁,若互斥量已被锁定,则会一直阻塞,直到该互斥量被解锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
// 对为锁定的互斥量解锁,或解锁其他线程锁定互斥量均会返回错误
int pthread_mutex_unlock(pthread_mutex_t *mutex);
// 不会阻塞线程,信号量已经被锁定会失败并返回 EBUSY 错误
int pthread_mutex_trylock(pthread_mutex_t *mutex);

当线程需要同时访问多个不同资源时,这些资源由互斥量管理,则可能会发生死锁。例如:

  • 线程 A 锁定了 mutex1

  • 线程 B 锁定了 mutex2

  • 线程 A 尝试锁定 mutex2, 线程 B 尝试锁定 mutex1

  • 死锁发生

要避免此类死锁问题,最简单的办法就是定义互斥量的层级关系。当多个线程对一组互斥量操作时,总是以相同的顺序对互斥量进行锁定。 或者先使用 pthread_mutex_lock 锁定第一个互斥量,后续互斥量使用 pthread_mutex_trylock 尝试锁定,如果失败,则解锁之前锁定的互斥量。这种方法效率会低一些,但比较灵活。

条件变量

互斥量防止多个线程同时访问同一共享资源,条件变量允许线程阻塞,直到某个线程就某个共享资源的状态变化通知其他线程。 采用条件变量可以允许一个线程休眠,直至收到另一线程的通知。条件变量一般结合互斥量使用,就互斥量状态改变发出通知。 条件变量也有静态分配和动态分配两种方式:

#include <pthread.h>
// 静态分配
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
// 动态分配,attr 为该条件变量的一些属性,NULL 使用默认属性
int pthred_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
// 动态分配的条件变量需要手动销毁
int pthread_cond_destory(pthread_cond_t *cond);

条件变量的主要操作是发送信号(通知其他线程,此信号并不是系统的那个信号)和等待。等待操作是指收到一个通知前一直处于阻塞状态。

#include <pthread.h>

// signal 保证唤醒至少一条遭到阻塞的线程
int pthread_cond_signal(pthread_cond_t *cond);
// broadcast 唤醒所有遭阻塞的线程
int pthread_cond_broadcast(pthread_cond_t *cond);
// 阻塞此线程直到收到条件变量 cond 的通知
int pthread_cond_wait(pthread_cond_t *cond, pthread_t *mutex);
// 使用参数 abstime 设置等待上限,而不会一直阻塞
// 到时且无状态变化通知则返回 ETIMEOUT 错误
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_t *mutex,
                          const struct timespec *abstime);

条件变量并不保存状态信息,发送信号通知其他线程时,若无线程在等待该条件变量,这个信号则会被忽略。

pthread_cond_wait

pthread_cond_wait 会阻塞线程直到收到条件变量,其内部会执行如下步骤:

  1. 解锁互斥量 mutex,此互斥量与控制目标资源访问的互斥量是同一个

  2. 阻塞调用线程,直到另一线程就此条件变量发出信号

  3. 重新锁定 mutex

pthread_mutex_lock(&mtx);
while(/* 检查共享资源的状态是否是想要的 */)
    pthread_cond_wait(&cond, &mtx);
pthread_mutex_unlock(&mtx);

其总与访问共享资源的同一个互斥量绑定,共享资源的互斥量与条件变量一般有以下形式:

  1. 线程锁定互斥量以访问共享资源

  2. 检查共享资源状态

  3. 如果共享资源并没有处于预期状态,则应该解锁互斥量(以便其他线程可以访问),并阻塞

  4. 线程因为条件变量被唤醒后,应立即加锁,因为一般线程会马上访问共享变量

函数 pthread_cond_wait 会自动执行 3 4 步的解锁和加锁,且解锁与陷入阻塞为一个原子操作,其他线程不会在此线程陷入阻塞前获取到此互斥量。 上面的代码中并没有用 if 语句判断 pthread_cond_wait 控制的条件的状态,而是采用 while 不断检查资源状态,因为从 pthread_cond_wait 中返回时不能对判断条件的状态做任何假设,因为可能会存在虚假唤醒的情况。

例子:生产者-消费者

/*
@Brief 生产者消费者问题
*/
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>

pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int produced_num = 0;

// 生产者线程
void* producer(void* arg) {
    int idx = (int)arg;
    while(1) {
        pthread_mutex_lock(&mtx);
        ++produced_num;
        printf("thread:%d produced, num: %d\n", idx, produced_num);
        pthread_mutex_unlock(&mtx);
        sleep(1);
        // 发送信号表示已经生产
        pthread_cond_signal(&cond);
    }
    return NULL;
}

int main(int argc, char** argv) {
    int thread_num = 3; // 线程数量
    // 创建线程
    for(int i = 0;i < thread_num;++i) {
        pthread_t t_id;
        int s = pthread_create(&t_id, NULL, producer, (void*)i);
        pthread_detach(t_id);
    }
    // 主线程为消费者
    while(1) {
        pthread_mutex_lock(&mtx);
        // 为 0 则等待生产者生产
        while(produced_num == 0) {
            int s = pthread_cond_wait(&cond, &mtx);
            if(s != 0) {
                exit(EXIT_FAILURE);
            }
        }
        while(produced_num > 0) {
            --produced_num;
            printf("consume, now num:%d\n", produced_num);
        }
        pthread_mutex_unlock(&mtx);
    }
    return 0;
}

最后更新于