

多线程并发执行时,由于处理器调度的不确定性,各线程指令会以任何可能的方式交叠执行。此时,当多个线程并发访问共享变量或资源时,由于缺乏操作的原子性、顺序性和一致性等问题,就可能产生预料之外的后果 (在使用 gcc 编译下列代码时需加入 -pthread 来动态链接 pthread 库)。


当两个线程一起打印字符时 (在多核处理器上可以观察到 ~200% 的 CPU 利用率),无法预测下一个观察到的输出是 A 还是 B

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

void *thread_A(void *arg) {
  while(1) {
  return NULL;

void *thread_B(void *arg) {
  while(1) {
  return NULL;

int main(int argc, char *argv[]) {
  pthread_t worker1, worker2;
  pthread_create(&worker1, NULL, thread_A, NULL);
  pthread_create(&worker2, NULL, thread_B, NULL);

  pthread_join(worker1, NULL);
  pthread_join(worker2, NULL);
  return 0;


当两个线程一起调用 pay(100) 时,会对共享变量 balance 产生预料之外的影响 (代码的交替执行)。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

unsigned long balance = 100;

void *pay(void *arg) {
  int amount = *((int*) arg);
  if (balance >= amount) {
    usleep(1);  // some unexpected delays
    balance -= amount;
  return NULL;

int main(void) {
  pthread_t card, phone;
  int num = 100;
  pthread_create(&card, NULL, buy_meals, (void*)&num);
  pthread_create(&phone, NULL, buy_meals, (void*)&num);

  pthread_join(card, NULL);
  pthread_join(phone, NULL);

  printf("Balance: %lu\n", balance);
  return 0;

当两个线程一起对共享变量 count 进行 count++ 操作时,最终输出的 count 不会如预期一样等于 2000000 (代码对应多条指令的交替执行)。 此时,即使把 count++ 替换为一条指令 asm volatile("incq %0" : "+m" (count)),也无法得到正确的预期结果 (在多核下多个线程的指令根本就是并行执行的)。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define NUM 1000000
int count = 0;

void* worker_func(void *arg) {
  for (int i = 0; i < NUM; i++) { 
  return NULL;

int main(void) {
  pthread_t worker1, worker2;
  void *worker1_status;
  void *worker2_status;

  pthread_create(&worker1, NULL, worker_func, NULL);
  pthread_create(&worker2, NULL, worker_func, NULL);

  pthread_join(worker1, &worker1_status);
  pthread_join(worker2, &worker2_status);

  printf("Count: %d\n", count);
  return 0;


使用不同的编译优化选项会对代码行为产生影响。例如,使用 -O1 编译上述 count++ 代码会观察到 1000000;而使用 -O2 编译上述代码则会观察到 2000000 (此时程序行为一定是正确的吗?)。

此外,对于下述代码 (尝试实现一种 T1 等待 T2 的行为),在不进行编译优化和使用 -O2 进行优化时,程序也会表现出不同的行为。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

int flag = 0;
int x = 0;

void *T1(void* arg) {
  while (flag == 0)
  printf("%d\n", x);
  return NULL;

void *T2(void* arg) {
  x = 10;
  flag = 1;
  return NULL;

int main() {
  pthread_t worker1, worker2;
  pthread_create(&worker1, NULL, T1, NULL);
  pthread_create(&worker2, NULL, T2, NULL);

  pthread_join(worker1, NULL);
  pthread_join(worker2, NULL);
  return 0;


在不满足顺序一致性 (Sequential Consistency) 的内存模型下,对变量的 load 和 store 也可能会按非预期的执行顺序被观察到。 更多的细节和解释可参考 Hardware Memory ModelsHardware Memory Reordering

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h> 

volatile int x = 0, y = 0;
volatile int r1, r2;
sem_t s1, s2, e; 

void* T1(void *arg) {
  while(1) {
    while (rand() % 8 != 0) ;
    x = 1;   // store(x) 
    r1 = y;  // load(y)

void* T2(void *arg) {
  while(1) {
    while (rand() % 8 != 0) ;
    y = 1;   // store(y)
    r2 = x;  // load(x)

int main(void) {
  sem_init(&s1, 0, 0); 
  sem_init(&s2, 0, 0); 
  sem_init(&e, 0, 0); 

  pthread_t worker1, worker2;
  pthread_create(&worker1, NULL, T1, NULL);
  pthread_create(&worker2, NULL, T2, NULL);

  int iteration = 0;
  while (1) {
    x = y = 0;
    // signal the threads to start
    // wait for them to finish 

    printf("%d (%d, %d)\n", iteration, r1, r2);
    if (r1 == 0 && r2 == 0) {
  return 0;

互斥 (Mutual Exclusion) 和锁 (Lock)


#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define NUM 1000000

int count = 0;
pthread_mutex_t mutex;  // a lock is a variable

void* func(void *arg) {
  for (int i = 0; i < NUM; i++) {
  return NULL;

int main(void) {
  pthread_t worker1, worker2;
  pthread_mutex_init(&mutex, NULL);  // initialisation

  pthread_create(&worker1, NULL, func, NULL);
  pthread_create(&worker2, NULL, func, NULL);

  pthread_join(worker1, NULL);
  pthread_join(worker2, NULL);

  printf("Count: %d\n", count);
  pthread_mutex_destroy(&mutex);    // destroy
  return 0;

自旋锁 (Spin Lock) 的实现

Peterson 算法给出了通过纯软件方式实现自旋锁 lock()unlock() 的一种方案,但为了让该算法在现代多核处理器下能正常工作,需要使用硬件内存屏障 (x86 中的 mfence) 来避免对 load 和 store 操作的重排序。

// Memory barrier to prevent the reordering of instructions beyond this barrier
#define BARRIER __sync_synchronize()

volatile int flags[2] = {0, 0};
volatile int turn = 0;

void lock(int self) {
  flags[self] = 1;                     BARRIER;  
  turn = 1 - self;                     BARRIER;
  while (1) {
    if (flags[1 - self] != 1 ) break;  BARRIER;  
    if (turn != 1 - self) break;       BARRIER;

void unlock(int self) {
  flags[self] = 0;                     BARRIER;

而实现自旋锁 lock()unlock() 更为合适的方式是借助程序设计语言 (包括编译器) 和计算机硬件的帮助 (提供的原子指令)。

1) 利用 xchgl 指令

volatile int flag = 0;

int xchg(int volatile *ptr, int new) {
  // exchange the value at *ptr with new
  int result;
  asm volatile ("lock xchgl %0, %1"
    : "+m"(*ptr), "=a"(result)  // result will store the output (the original value at *ptr)
    : "1"(new)                  // the value to be swapped
    : "memory"                  // compiler barrier
  return result;

void lock() {
  while (xchg(&flag, 1) == 1)

void unlock() {
  flag = 0;

2) 利用 cmpxchgl 指令

volatile int flag = 0;

int cmpxchg(int volatile *ptr, int expected, int new) {
  // if flag == expected, the value of new is stored in flag
  // the current value of flag is loaded into expected
  asm volatile ("lock cmpxchgl %2, %1"
    : "+a" (expected) // value for comparison
    : "m" (*ptr),     // memory location
      "r" (new)       // value to be written if flag == expected
    : "memory"        // compiler barrier
  return expected;

void lock() {
  while (cmpxchg(&flag, 0, 1) == 1)

void unlock() {
  flag = 0;

以上述方式实现的自旋锁虽然能实现互斥,但其不能保证公平性,并且在性能上可能会造成巨大开销:除了获得锁的线程能向前推进外,其它所有线程都将把 CPU 时间用来进行忙等待,在多核系统下会产生一个线程工作、其余线程围观的情况 (尤其在临界区执行时间较长的情况下)。更糟糕的是,如果当前持有锁的线程被切换出去,则会造成完全的 CPU 资源浪费。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <assert.h>

volatile long NUM = 1000000;
volatile long count = 0;

volatile int flag = 0;

int xchg(int volatile *ptr, int new) {
  int result;
  asm volatile ("lock xchgl %0, %1"
    : "+m"(*ptr), "=a"(result)
    : "1"(new)
    : "memory"
  return result;

void lock() {
  while (xchg(&flag, 1) == 1)

void unlock() {
  flag = 0;

void* func(void *arg) {
  for (int i = 0; i < NUM; i++) {
    for (int _ = 0; _ < 1000; _++)
      asm volatile("nop");
  return NULL;

int main(int argc, char *argv[]) {
  assert(argc == 2);
  int nthread = atoi(argv[1]);

  pthread_t thread[64];
  assert(NUM % nthread == 0);
  NUM /= nthread;
  printf("Create %d threads, each add %ld times\n", nthread, NUM);

  for (int i = 0; i < nthread; i++)
    pthread_create(&thread[i], NULL, func, NULL);

  for (int i = 0; i < nthread; i++)
    pthread_join(thread[i], NULL);
  printf("Final count: %ld\n", count);
  return 0;

为了降低在用户程序中使用自旋锁的开销,可以借助操作系统的帮助,以此来在无法立即获得锁时将线程阻塞 (Sleep),并在锁释放时唤醒一个正在等待锁的线程 (Wakeup)。 在此基础上,现代操作系统中还进一步采用两阶段锁的方式 (Two-Phase Locking) 来将自旋锁和互斥锁的优势结合起来,在请求锁时先自旋一段时间,从而在对锁争用不激烈的情况下以低成本获得锁 (Fast Path),而在对锁有争用的情况下阻塞当前线程 (Slow Path)。pthread 库的 mutex lock 就是上述方式的一种实现。

同步 (Synchronization) 和条件变量 (Condition Variables)

除了互斥外,在并发程序中我们往往还需要控制多个线程的执行序列,以使其满足某种相对关系。生产者-消费者问题 (Producer-Consumer Problem) 就是一个非常有代表性的同步问题 (这里我们把该问题简化为左右括号的打印),而我们可以使用条件变量的 wait()signal() 操作来实现多个生产者/消费者线程间的同步。注意下述代码使用了两个条件变量 emptyfill (亦可使用一个条件变量并使用 broadcast() 进行唤醒),并使用 while 语句来判断是否需要阻塞线程。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

int MAX, count = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t empty = PTHREAD_COND_INITIALIZER;
pthread_cond_t fill = PTHREAD_COND_INITIALIZER;

void* Producer(void *arg) {
  while (1) {
    while (count == MAX) {
      pthread_cond_wait(&empty, &mutex);

void* Consumer(void *arg) {
  while (1) {
    while (count == 0) {
      pthread_cond_wait(&fill, &mutex);

int main(int argc, char *argv[]) {
  if (argc != 3) {
    fprintf(stderr, "Usage: %s [MAX of buffer] [Number of producer-consmer pairs]\n", argv[0]);
    return -1;
  MAX = atoi(argv[1]);
  int num = atoi(argv[2]);

  pthread_t threads[64];
  for (int i = 0; i < num; i++) {
    pthread_create(&threads[i * 2], NULL, Producer, NULL);
    pthread_create(&threads[i * 2 + 1], NULL, Consumer, NULL);

  for (int i = 0; i < num * 2; i++)
    pthread_join(threads[i], NULL);

  return 0;

信号量 (Semaphores)

在使用条件变量解决同步问题时,需要手动维护系统状态,并使用锁进行保护。信号量通过在内部维护一个 “整数” 状态来决定阻塞和唤醒的行为,其非常适合于解决计数类型的同步问题 (例如,生产者-消费者问题)。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h> 

int MAX = 0;
sem_t buffer, product; 

void* Producer(void *arg) {
  while (1) {
    sem_wait(&buffer);   // P(buffer)
    sem_post(&product);  // V(product)

void* Consumer(void *arg) {
  while (1) {
    sem_wait(&product);  // P(product)
    sem_post(&buffer);   // V(buffer)

int main(int argc, char *argv[]) {
  if (argc != 3) {
    fprintf(stderr, "Usage: %s [MAX of buffer] [Number of producer-consmer pairs]\n", argv[0]);
    return -1;
  MAX = atoi(argv[1]);
  int num = atoi(argv[2]);

  sem_init(&buffer, 0, MAX); 
  sem_init(&product, 0, 0); 

  pthread_t threads[64];
  for (int i = 0; i < num; i++) {
    pthread_create(&threads[i * 2], NULL, Producer, NULL);
    pthread_create(&threads[i * 2 + 1], NULL, Consumer, NULL);

  for (int i = 0; i < num * 2; i++)
    pthread_join(threads[i], NULL);

  return 0;

死锁 (Dealock)

死锁是一种典型的并发程序 Bug,其代表了一组线程中的每一个都在等待另一个线程采取行动 (例如,发送消息或释放锁) 而无法继续推进执行的状态。哲学家就餐问题 (Dining Philosophers Problem) 中如果每个哲学家都执行了 P(left) 而阻塞在各自的 P(right) 时就产生了死锁。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h> 

#define N 5
sem_t forks[N];

void* Philosopher(void *arg) {
  int id = *((int*)arg);
  int left = id ;
  int right = (id + 1) % N;

  while (1) {
    printf("+ fork %d by T%d\n", left, id);
    printf("+ fork %d by T%d\n", right, id);
    // Eat
    printf("- fork %d by T%d\n", left, id);
    printf("- fork %d by T%d\n", right, id);

int main() {
  for (int i = 0 ; i < N ; i ++)
    sem_init(&forks[i], 0, 1);

  pthread_t threads[N];
  int thread_ids[5] = {0, 1, 2, 3, 4}; 
  for (int i = 0 ; i < N; i++)
    pthread_create(&threads[i], NULL, Philosopher, &thread_ids[i]);
  for (int i = 0; i < N; i++)
    pthread_join(threads[i], NULL);
  return 0;