Concurrency

多线程编程

多线程并发执行时,由于处理器调度的不确定性,各线程指令会以任何可能的方式交叠执行。此时,当多个线程并发访问共享变量或资源时,由于缺乏操作的原子性、顺序性和一致性等问题,就可能产生预料之外的后果。

无法保证原子性

当两个线程一起调用 pay(100) 时,会对共享变量 balance 产生预料之外的影响 (代码的交替执行)。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

unsigned long balance = 100;

void *pay(void *arg) {
  int amount = *((int*) arg);
  if (balance >= amount) {
    usleep(1);  // some unexpected delays
    balance -= amount;
  }
  return NULL;
}

int main(void) {
  pthread_t card, phone;
  int num = 100;
  pthread_create(&card, NULL, buy_meals, (void*)&num);
  pthread_create(&phone, NULL, buy_meals, (void*)&num);

  pthread_join(card, NULL);
  pthread_join(phone, NULL);

  printf("Balance: %lu\n", balance);
  return 0;
}

当两个线程一起对共享变量 count 进行 count++ 操作时,最终输出的 count 不会如预期一样等于 2000000 (指令的交替执行)。 此时,即使把 count++ 替换为一条指令 asm volatile("incq %0" : "+m" (count)),也无法得到正确的预期结果 (在多核下多个线程的指令根本就是并行执行的)。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define NUM 1000000
int count = 0;

void* func(void *arg) {
  for (int i = 0; i < NUM; i++) { 
    count++;
  }  
  return NULL;
}

int main(void) {
  pthread_t thread1, thread2;
  pthread_create(&thread1, NULL, func, NULL);
  pthread_create(&thread2, NULL, func, NULL);

  pthread_join(thread1, NULL);
  pthread_join(thread2, NULL);

  printf("Count: %d\n", count);
  return 0;
}

无法保证顺序性

使用不同的编译优化选项会对代码行为产生影响。例如,使用 -O1 编译上述 count++ 代码会观察到 1000000;而使用 -O2 编译上述代码则会观察到 2000000 (此时程序行为一定是正确的吗?)。

此外,对于下述代码 (尝试实现一种 T1 等待 T2 的行为),在不进行编译优化和使用 -O2 进行优化时,程序也会表现出不同的行为。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

int flag = 0;
int x = 0;

void *T1(void* arg) {
  while (flag == 0)
    ;
  printf("%d\n", x);
  return NULL;
}

void *T2(void* arg) {
  x = 10;
  flag = 1;
  return NULL;
}

int main() {
  pthread_t thread1, thread2;
  pthread_create(&thread1, NULL, T1, NULL);
  pthread_create(&thread2, NULL, T2, NULL);

  pthread_join(thread1, NULL);
  pthread_join(thread2, NULL);  
  return 0;
}

无法保证一致性

在不满足顺序一致性 (Sequential Consistency) 的内存模型下,对变量的 load 和 store 也可能会按非预期的执行顺序被观察到。 更多的细节和解释可参考 Hardware Memory ModelsHardware Memory Reordering

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h> 

volatile int x = 0, y = 0;
volatile int r1, r2;
sem_t s1, s2, e; 

void* T1(void *arg) {
  while(1) {
    sem_wait(&s1);
    x = 1;   // store(x) 
    r1 = y;  // load(y)
    sem_post(&e);
  }
}

void* T2(void *arg) {
  while(1) {
    sem_wait(&s2);
    y = 1;   // store(y)
    r2 = x;  // load(x)
    sem_post(&e);
  }  
}

int main(void) {
  sem_init(&s1, 0, 0); 
  sem_init(&s2, 0, 0); 
  sem_init(&e, 0, 0); 

  pthread_t thread1, thread2;
  pthread_create(&thread1, NULL, T1, NULL);
  pthread_create(&thread2, NULL, T2, NULL);

  int iteration = 0;
  while (1) {
    x = y = 0;
    // signal the threads to start
    sem_post(&s1);
    sem_post(&s2); 
    // wait for them to finish 
    sem_wait(&e);
    sem_wait(&e);

    printf("%d (%d, %d)\n", iteration, r1, r2);
    fflush(stdout);
    if (r1 == 0 && r2 == 0) {
      exit(0);
    }
    iteration++;
  }
  return 0;
}

互斥 (Mutual Exclusion) 和锁 (Lock)

临界区和锁的引入为程序员正确实现并发程序的互斥提供了基本能力。但是,锁的使用会带来性能开销,因此在实践中应以合适的粒度和位置来使用锁。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define NUM 1000000
int count = 0;
pthread_mutex_t mutex;  // a lock is a variable

void* func(void *arg) {
  for (int i = 0; i < NUM; i++) {
    pthread_mutex_lock(&mutex);
    count++;
    pthread_mutex_unlock(&mutex);
  }
  return NULL;
}

int main(void) {
  pthread_t thread1, thread2;
  pthread_mutex_init(&mutex, NULL);  // initialisation

  pthread_create(&thread1, NULL, func, NULL);
  pthread_create(&thread2, NULL, func, NULL);

  pthread_join(thread1, NULL);
  pthread_join(thread2, NULL);

  printf("Count: %d\n", count);
  pthread_mutex_destroy(&mutex);    // destroy
  return 0;
}

自旋锁 (Spin Lock) 的实现

Peterson 算法给出了通过纯软件方式实现自旋锁 lock()unlock() 的一种方案,但为了让该算法在现代多核处理器下能正常工作,需要使用硬件内存屏障 (x86 中的 mfence) 来避免对 load 和 store 操作的重排序。

// Memory barrier to prevent the reordering of instructions beyond this barrier
#define BARRIER __sync_synchronize()

volatile int flags[2] = {0, 0};
volatile int turn = 0;

void lock(int self) {
  flags[self] = 1;                     BARRIER;  
  turn = 1 - self;                     BARRIER;
  while (1) {
    if (flags[1 - self] != 1 ) break;  BARRIER;  
    if (turn != 1 - self) break;       BARRIER;
  }
}

void unlock(int self) {
  flags[self] = 0;                     BARRIER;
}

而实现自旋锁 lock()unlock() 更为合适的方式是借助计算机硬件的帮助 (提供原子指令)。

1) 利用 xchgl 指令

volatile int flag = 0;

int xchg(int volatile *ptr, int new) {
  // exchange the value at *ptr with new
  int result;
  asm volatile ("lock xchgl %0, %1"
    : "+m"(*ptr), "=a"(result)  // result will store the output (the original value at *ptr)
    : "1"(new)                  // the value to be swapped
    : "memory"                  // compiler barrier
  );
  return result;
}

void lock() {
  while (xchg(&flag, 1) == 1)
    ;
}

void unlock() {
  __sync_synchronize();
  flag = 0;
}

2) 利用 cmpxchgl 指令

volatile int flag = 0;

int cmpxchg(int volatile *ptr, int expected, int new) {
  // if flag == expected, the value of new is stored in flag
  // the current value of flag is loaded into expected
  asm volatile ("lock cmpxchgl %2, %1"
    : "+a" (expected) // value for comparison
    : "m" (*ptr),     // memory location
      "r" (new)       // value to be written if flag == expected
    : "memory"        // compiler barrier
  );
  return expected;
}

void lock() {
  while (cmpxchg(&flag, 0, 1) == 1)
    ;
}

void unlock() {
  __sync_synchronize();
  flag = 0;
}

以上述方式实现的自旋锁虽然能实现互斥,但其不能保证公平性,并且在性能上可能会造成巨大开销:除了获得锁的线程能向前推进外,其它所有线程都将把 CPU 时间全部用来忙等待 (busy waiting),这在多核系统下会产生一个线程工作、其余线程围观的情况 (尤其在临界区执行时间较长的情况下)。更糟糕的是,如果当前持有锁的线程被切换出去,则会造成完全的 CPU 资源浪费。例如,可以通过 time 命令来统计如下代码在不同线程个数时的执行开销。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <assert.h>

volatile long NUM = 1000000;
volatile long count = 0;

volatile int flag = 0;

int xchg(int volatile *ptr, int new) {
  int result;
  asm volatile ("lock xchgl %0, %1"
    : "+m"(*ptr), "=a"(result)
    : "1"(new)
    : "memory"
  );
  return result;
}

void lock() {
  while (xchg(&flag, 1) == 1)
    ;
}

void unlock() {
  __sync_synchronize();
  flag = 0;
}

void* func(void *arg) {
  for (int i = 0; i < NUM; i++) {
    lock();
    for (int _ = 0; _ < 1000; _++)
      asm volatile("nop");
    count++;
    unlock();
  }
  return NULL;
}

int main(int argc, char *argv[]) {
  // Usage: ./a.out [number of threads created]
  assert(argc == 2);
  int nthread = atoi(argv[1]);

  pthread_t thread[64];
  assert(NUM % nthread == 0);
  NUM /= nthread;
  printf("Create %d threads, each add %ld times\n", nthread, NUM);

  for (int i = 0; i < nthread; i++)
    pthread_create(&thread[i], NULL, func, NULL);

  for (int i = 0; i < nthread; i++)
    pthread_join(thread[i], NULL);
  
  printf("Final count: %ld\n", count);
  return 0;
}

为了降低在用户程序中使用自旋锁的开销,可以借助操作系统的帮助,以此来在无法立即获得锁时将线程阻塞 (Sleep),并在锁释放时唤醒一个正在等待锁的线程 (Wakeup)。在此基础上,现代操作系统中还进一步采用两阶段锁的方式 (Two-Phase Locking) 来将 Spin 和 Block 实现的优势结合起来,在请求锁时先自旋一段时间,从而在对锁争用不激烈的情况下以低成本获得锁 (Fast Path);而在对锁有争用的情况下阻塞当前线程 (Slow Path)。pthread 库的 mutex lock 就是上述方式的一种实现。