Concurrency

多线程编程

多线程并发执行时,由于处理器调度的不确定性,各线程指令会以任何可能的方式交叠执行。此时,当多个线程并发访问共享变量或资源时,由于缺乏操作的原子性、顺序性和一致性等问题,就可能产生预料之外的后果 (在使用 gcc 编译下列代码时需加入 -pthread 来动态链接 pthread 库)。

执行顺序不确定

当两个线程一起打印字符时 (在多核处理器上可以观察到 ~200% 的 CPU 利用率),无法预测下一个观察到的输出是 A 还是 B

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

void *thread_A(void *arg) {
  while(1) {
    printf("A\n");
  }
  return NULL;
}

void *thread_B(void *arg) {
  while(1) {
    printf("B\n");
  }
  return NULL;
}

int main(int argc, char *argv[]) {
  pthread_t worker1, worker2;
  pthread_create(&worker1, NULL, thread_A, NULL);
  pthread_create(&worker2, NULL, thread_B, NULL);

  pthread_join(worker1, NULL);
  pthread_join(worker2, NULL);
  return 0;
}

无法保证原子性

当两个线程一起调用 pay(100) 时,会对共享变量 balance 产生预料之外的影响 (代码的交替执行)。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

unsigned long balance = 100;

void *pay(void *arg) {
  int amount = *((int*) arg);
  if (balance >= amount) {
    usleep(1);  // some unexpected delays
    balance -= amount;
  }
  return NULL;
}

int main(void) {
  pthread_t card, phone;
  int num = 100;
  pthread_create(&card, NULL, buy_meals, (void*)&num);
  pthread_create(&phone, NULL, buy_meals, (void*)&num);

  pthread_join(card, NULL);
  pthread_join(phone, NULL);

  printf("Balance: %lu\n", balance);
  return 0;
}

当两个线程一起对共享变量 count 进行 count++ 操作时,最终输出的 count 不会如预期一样等于 2000000 (代码对应多条指令的交替执行)。 此时,即使把 count++ 替换为一条指令 asm volatile("incq %0" : "+m" (count)),也无法得到正确的预期结果 (在多核下多个线程的指令根本就是并行执行的)。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define NUM 1000000
int count = 0;

void* worker_func(void *arg) {
  for (int i = 0; i < NUM; i++) { 
    count++;
  }  
  return NULL;
}

int main(void) {
  pthread_t worker1, worker2;
  void *worker1_status;
  void *worker2_status;

  pthread_create(&worker1, NULL, worker_func, NULL);
  pthread_create(&worker2, NULL, worker_func, NULL);

  pthread_join(worker1, &worker1_status);
  pthread_join(worker2, &worker2_status);

  printf("Count: %d\n", count);
  return 0;
}

无法保证顺序性

使用不同的编译优化选项会对代码行为产生影响。例如,使用 -O1 编译上述 count++ 代码会观察到 1000000;而使用 -O2 编译上述代码则会观察到 2000000 (此时程序行为一定是正确的吗?)。

此外,对于下述代码 (尝试实现一种 T1 等待 T2 的行为),在不进行编译优化和使用 -O2 进行优化时,程序也会表现出不同的行为。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

int flag = 0;
int x = 0;

void *T1(void* arg) {
  while (flag == 0)
    ;
  printf("%d\n", x);
  return NULL;
}

void *T2(void* arg) {
  x = 10;
  flag = 1;
  return NULL;
}

int main() {
  pthread_t worker1, worker2;
  pthread_create(&worker1, NULL, T1, NULL);
  pthread_create(&worker2, NULL, T2, NULL);

  pthread_join(worker1, NULL);
  pthread_join(worker2, NULL);
  return 0;
}

无法保证一致性

在不满足顺序一致性 (Sequential Consistency) 的内存模型下,对变量的 load 和 store 也可能会按非预期的执行顺序被观察到。 更多的细节和解释可参考 Hardware Memory ModelsHardware Memory Reordering

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h> 

volatile int x = 0, y = 0;
volatile int r1, r2;
sem_t s1, s2, e; 

void* T1(void *arg) {
  while(1) {
    sem_wait(&s1);
    while (rand() % 8 != 0) ;
    x = 1;   // store(x) 
    r1 = y;  // load(y)
    sem_post(&e);
  }
}

void* T2(void *arg) {
  while(1) {
    sem_wait(&s2);
    while (rand() % 8 != 0) ;
    y = 1;   // store(y)
    r2 = x;  // load(x)
    sem_post(&e);
  }  
}

int main(void) {
  srand(time(NULL));
  sem_init(&s1, 0, 0); 
  sem_init(&s2, 0, 0); 
  sem_init(&e, 0, 0); 

  pthread_t worker1, worker2;
  pthread_create(&worker1, NULL, T1, NULL);
  pthread_create(&worker2, NULL, T2, NULL);

  int iteration = 0;
  while (1) {
    x = y = 0;
    // signal the threads to start
    sem_post(&s1);
    sem_post(&s2); 
    // wait for them to finish 
    sem_wait(&e);
    sem_wait(&e);

    printf("%d (%d, %d)\n", iteration, r1, r2);
    fflush(stdout);
    if (r1 == 0 && r2 == 0) {
      exit(0);
    }
    iteration++;
  }
  return 0;
}

互斥 (Mutual Exclusion) 和锁 (Lock)

临界区和锁的引入为程序员正确实现并发程序的互斥提供了基本能力。但是,锁的使用会带来性能开销,因此在实践中应以合适的粒度和位置来使用锁。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define NUM 1000000

int count = 0;
pthread_mutex_t mutex;  // a lock is a variable

void* func(void *arg) {
  for (int i = 0; i < NUM; i++) {
    pthread_mutex_lock(&mutex);
    count++;
    pthread_mutex_unlock(&mutex);
  }
  return NULL;
}

int main(void) {
  pthread_t worker1, worker2;
  pthread_mutex_init(&mutex, NULL);  // initialisation

  pthread_create(&worker1, NULL, func, NULL);
  pthread_create(&worker2, NULL, func, NULL);

  pthread_join(worker1, NULL);
  pthread_join(worker2, NULL);

  printf("Count: %d\n", count);
  pthread_mutex_destroy(&mutex);    // destroy
  return 0;
}

自旋锁 (Spin Lock) 的实现

Peterson 算法给出了通过纯软件方式实现自旋锁 lock()unlock() 的一种方案,但为了让该算法在现代多核处理器下能正常工作,需要使用硬件内存屏障 (x86 中的 mfence) 来避免对 load 和 store 操作的重排序。

// Memory barrier to prevent the reordering of instructions beyond this barrier
#define BARRIER __sync_synchronize()

volatile int flags[2] = {0, 0};
volatile int turn = 0;

void lock(int self) {
  flags[self] = 1;                     BARRIER;  
  turn = 1 - self;                     BARRIER;
  while (1) {
    if (flags[1 - self] != 1 ) break;  BARRIER;  
    if (turn != 1 - self) break;       BARRIER;
  }
}

void unlock(int self) {
  flags[self] = 0;                     BARRIER;
}

而实现自旋锁 lock()unlock() 更为合适的方式是借助程序设计语言 (包括编译器) 和计算机硬件的帮助 (提供的原子指令)。

1) 利用 xchgl 指令

volatile int flag = 0;

int xchg(int volatile *ptr, int new) {
  // exchange the value at *ptr with new
  int result;
  asm volatile ("lock xchgl %0, %1"
    : "+m"(*ptr), "=a"(result)  // result will store the output (the original value at *ptr)
    : "1"(new)                  // the value to be swapped
    : "memory"                  // compiler barrier
  );
  return result;
}

void lock() {
  while (xchg(&flag, 1) == 1)
    ;
}

void unlock() {
  __sync_synchronize();
  flag = 0;
}

2) 利用 cmpxchgl 指令

volatile int flag = 0;

int cmpxchg(int volatile *ptr, int expected, int new) {
  // if flag == expected, the value of new is stored in flag
  // the current value of flag is loaded into expected
  asm volatile ("lock cmpxchgl %2, %1"
    : "+a" (expected) // value for comparison
    : "m" (*ptr),     // memory location
      "r" (new)       // value to be written if flag == expected
    : "memory"        // compiler barrier
  );
  return expected;
}

void lock() {
  while (cmpxchg(&flag, 0, 1) == 1)
    ;
}

void unlock() {
  __sync_synchronize();
  flag = 0;
}

以上述方式实现的自旋锁虽然能实现互斥,但其不能保证公平性,并且在性能上可能会造成巨大开销:除了获得锁的线程能向前推进外,其它所有线程都将把 CPU 时间用来进行忙等待,在多核系统下会产生一个线程工作、其余线程围观的情况 (尤其在临界区执行时间较长的情况下)。更糟糕的是,如果当前持有锁的线程被切换出去,则会造成完全的 CPU 资源浪费。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <assert.h>

volatile long NUM = 1000000;
volatile long count = 0;

volatile int flag = 0;

int xchg(int volatile *ptr, int new) {
  int result;
  asm volatile ("lock xchgl %0, %1"
    : "+m"(*ptr), "=a"(result)
    : "1"(new)
    : "memory"
  );
  return result;
}

void lock() {
  while (xchg(&flag, 1) == 1)
    ;
}

void unlock() {
  __sync_synchronize();
  flag = 0;
}

void* func(void *arg) {
  for (int i = 0; i < NUM; i++) {
    lock();
    for (int _ = 0; _ < 1000; _++)
      asm volatile("nop");
    count++;
    unlock();
  }
  return NULL;
}

int main(int argc, char *argv[]) {
  assert(argc == 2);
  int nthread = atoi(argv[1]);

  pthread_t thread[64];
  assert(NUM % nthread == 0);
  NUM /= nthread;
  printf("Create %d threads, each add %ld times\n", nthread, NUM);

  for (int i = 0; i < nthread; i++)
    pthread_create(&thread[i], NULL, func, NULL);

  for (int i = 0; i < nthread; i++)
    pthread_join(thread[i], NULL);
  
  printf("Final count: %ld\n", count);
  return 0;
}

为了降低在用户程序中使用自旋锁的开销,可以借助操作系统的帮助,以此来在无法立即获得锁时将线程阻塞 (Sleep),并在锁释放时唤醒一个正在等待锁的线程 (Wakeup)。 在此基础上,现代操作系统中还进一步采用两阶段锁的方式 (Two-Phase Locking) 来将自旋锁和互斥锁的优势结合起来,在请求锁时先自旋一段时间,从而在对锁争用不激烈的情况下以低成本获得锁 (Fast Path),而在对锁有争用的情况下阻塞当前线程 (Slow Path)。pthread 库的 mutex lock 就是上述方式的一种实现。

同步 (Synchronization) 和条件变量 (Condition Variables)

除了互斥外,在并发程序中我们往往还需要控制多个线程的执行序列,以使其满足某种相对关系。生产者-消费者问题 (Producer-Consumer Problem) 就是一个非常有代表性的同步问题 (这里我们把该问题简化为左右括号的打印),而我们可以使用条件变量的 wait()signal() 操作来实现多个生产者/消费者线程间的同步。注意下述代码使用了两个条件变量 emptyfill (亦可使用一个条件变量并使用 broadcast() 进行唤醒),并使用 while 语句来判断是否需要阻塞线程。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

int MAX, count = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t empty = PTHREAD_COND_INITIALIZER;
pthread_cond_t fill = PTHREAD_COND_INITIALIZER;

void* Producer(void *arg) {
  while (1) {
    pthread_mutex_lock(&mutex);
    while (count == MAX) {
      pthread_cond_wait(&empty, &mutex);
    }
    printf("(");
    count++;
    pthread_cond_signal(&fill);
    pthread_mutex_unlock(&mutex);
  }
}

void* Consumer(void *arg) {
  while (1) {
    pthread_mutex_lock(&mutex);
    while (count == 0) {
      pthread_cond_wait(&fill, &mutex);
    }
    printf(")");
    count--;
    pthread_cond_signal(&empty);
    pthread_mutex_unlock(&mutex);
  }
}

int main(int argc, char *argv[]) {
  if (argc != 3) {
    fprintf(stderr, "Usage: %s [MAX of buffer] [Number of producer-consmer pairs]\n", argv[0]);
    return -1;
  }
  MAX = atoi(argv[1]);
  int num = atoi(argv[2]);

  pthread_t threads[64];
  for (int i = 0; i < num; i++) {
    pthread_create(&threads[i * 2], NULL, Producer, NULL);
    pthread_create(&threads[i * 2 + 1], NULL, Consumer, NULL);
  }

  for (int i = 0; i < num * 2; i++)
    pthread_join(threads[i], NULL);

  return 0;
}

信号量 (Semaphores)

在使用条件变量解决同步问题时,需要手动维护系统状态,并使用锁进行保护。信号量通过在内部维护一个 “整数” 状态来决定阻塞和唤醒的行为,其非常适合于解决计数类型的同步问题 (例如,生产者-消费者问题)。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h> 

int MAX = 0;
sem_t buffer, product; 

void* Producer(void *arg) {
  while (1) {
    sem_wait(&buffer);   // P(buffer)
    printf("(");
    sem_post(&product);  // V(product)
  }
}

void* Consumer(void *arg) {
  while (1) {
    sem_wait(&product);  // P(product)
    printf(")");
    sem_post(&buffer);   // V(buffer)
  }
}

int main(int argc, char *argv[]) {
  if (argc != 3) {
    fprintf(stderr, "Usage: %s [MAX of buffer] [Number of producer-consmer pairs]\n", argv[0]);
    return -1;
  }
  MAX = atoi(argv[1]);
  int num = atoi(argv[2]);

  sem_init(&buffer, 0, MAX); 
  sem_init(&product, 0, 0); 

  pthread_t threads[64];
  for (int i = 0; i < num; i++) {
    pthread_create(&threads[i * 2], NULL, Producer, NULL);
    pthread_create(&threads[i * 2 + 1], NULL, Consumer, NULL);
  }

  for (int i = 0; i < num * 2; i++)
    pthread_join(threads[i], NULL);

  return 0;
}

死锁 (Dealock)

死锁是一种典型的并发程序 Bug,其代表了一组线程中的每一个都在等待另一个线程采取行动 (例如,发送消息或释放锁) 而无法继续推进执行的状态。哲学家就餐问题 (Dining Philosophers Problem) 中如果每个哲学家都执行了 P(left) 而阻塞在各自的 P(right) 时就产生了死锁。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h> 

#define N 5
sem_t forks[N];

void* Philosopher(void *arg) {
  int id = *((int*)arg);
  int left = id ;
  int right = (id + 1) % N;

  while (1) {
    sem_wait(&forks[left]);
    printf("+ fork %d by T%d\n", left, id);
    sem_wait(&forks[right]);
    printf("+ fork %d by T%d\n", right, id);
    // Eat
    sem_post(&forks[left]);
    printf("- fork %d by T%d\n", left, id);
    sem_post(&forks[right]);
    printf("- fork %d by T%d\n", right, id);
  }
}

int main() {
  for (int i = 0 ; i < N ; i ++)
    sem_init(&forks[i], 0, 1);

  pthread_t threads[N];
  int thread_ids[5] = {0, 1, 2, 3, 4}; 
  for (int i = 0 ; i < N; i++)
    pthread_create(&threads[i], NULL, Philosopher, &thread_ids[i]);
  
  for (int i = 0; i < N; i++)
    pthread_join(threads[i], NULL);
  return 0;
}