关于无锁数据结构有很多介绍了,如果想获得开箱即用的方案可以直接找liblfds,不过考虑到liblfds常年没维护,所以很多时候还得自己写,这里记录一下如何用原子操作实现无锁队列。

队列

简单队列

真的是很简单了,就是一个链表,我们使用结构体定义该链表的节点和链表本身

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct queue_node {
struct queue_node* next;
void* data;
};

struct queue {
struct queue_node dummy;
struct queue_node* tail;
};

void init_queue(struct queue* queue) {
queue->dummy.next = NULL;
queue->tail = &queue->dummy;
}

有两种操作

  • 入队,就是把节点添加到尾部

    1
    2
    3
    4
    5
    void enqueue(struct queue* queue, struct queue_node* node) {
    node->next = NULL;
    queue->tail->next = node;
    queue->tail = node;
    }
  • 出队,就是将头部节点移出

    1
    2
    3
    4
    5
    6
    7
    8
    9
    struct queue_node* dequeue(struct queue* queue) {
    if (&queue->dummy == queue->tail) {
    return NULL;
    }

    struct queue_node* old_head = queue->dummy.next;
    queue->dummy.next = old_head->next;
    return old_head;
    }

当然也可以反过来,入队从头部插入,出队从尾部移出,本文不介绍这种方法。

并行/并发环境

我们可以发现,入队和出队都至少需要两条指令,所以这样的代码在并行或者并发环境下就可能会出现一致性问题,所以我们通常会引入同步机制(例如互斥量)来保证一致性,但互斥量不是免费的午餐,加锁和释放都有一定开销,能不能借助CPU的原子指令来完成这些操作呢?答案是可以的。

原子指令

所谓原子,即不可分割的意思(当然我们知道实际上原子还能分为原子核和电子),在计算机中通常表示指令要么完成要么失败,不会发生只完成一半的情况。

并行和并发

一个很常见的例子就是如何给变量自增,很多大聪明可能觉得这怎么会是问题呢?直接i++不就完事了?当然,对于单线程的情况下,确实很简单,但是如果有多个线程同时对同一个变量进行操作,那结果可能就不正确了。

例如现在x=0,有两个线程A和B同时执行x++,正确的结果应该是2,但是很有可能结果是1,原因是一般来说自增运算并不是原子的。这里要引入内存和寄存器的概念,在CPU中,访问寄存器是最快的,通常只需要一个时钟周期,但是访问内存通常需要几百个时钟周期(缓存未命中),而我们的变量通常是放在内存里的,所以对于自增操作,首先需要从内存读取到寄存器,然后对寄存器中的值+1,最后再写回内存,如果从汇编语言来看就很清楚了,比如函数中x++的汇编可能是这样的

1
2
3
mov	-4(%rbp), %eax
add $1, %eax
mov %eax, -4(%rbp)

可以发现总共有三条汇编指令,假设A和B线程执行顺序

1
2
3
4
5
6
7
A B
1 -> A线程访存,A.eax=0
1 -> B线程访存,B.eax=0
2 -> A线程执行自增1,A.eax=1
2 -> B线程执行自增1,B.eax=1
3 -> A线程写存,由于A.eax=1,所以内存中的值变成1
3 -> B线程写存,由于B.eax=1,所以内存中的值还是1

互斥量

聪明的小伙伴应该已经想到了,我们可以用互斥量(mutex,也可以叫互斥锁)保护x++这个语句,这样在某个线程完成自增前,另一个线程会被阻塞,使得自增操作“变成”原子操作。

同理对于任何需要防止被中断的操作,都可以使用互斥量来完成。

CPU提供的原子操作

互斥量并不是免费的午餐,加锁和释放都有一定的开销,实际上大多数CPU提供了一些指令可以帮助我们同时完成多项本来需要多条指令才能完成操作,我们将其称为原子指令,GCC编译器提供了一大票原子指令,你可以点击这里查看,对于无锁队列,涉及的主要是两个

  • 交换(Exchange),gcc中是__atomic_exchange_n__atomic_exchange

    • __atomic_exchange_n接收一个指针ptr和值val,会对*ptr和val进行交换,相当于将

      1
      2
      3
      type tmp = *ptr;
      *ptr = val;
      return tmp;

      合并为一条指令。

    • __atomic_exchange与上面那个相同,但是返回值通过指针传入。

  • 比较并交换(Compare And Swap,简称CAS),gcc中是__atomic_compare_exchange_n__atomic_compare_exchange

    • __atomic_compare_exchange_n接收两个指针ptr和expected和一个值desired,会比较*ptr与*expected是否相等,如果相等则将desired写入*ptr,否则将*ptr写入*expected,相当于将

      1
      2
      3
      4
      5
      if (*ptr == *expected) {
      *ptr = desired;
      } else {
      *expected = *ptr;
      }

      合并为一条指令。

    • __atomic_compare_exchange与上面那个等同,但desired通过指针传入。

无锁队列

终于进入正题了,现在我们尝试用原子指令实现不需要互斥量的入队和出队函数。

入队

观察入队函数可以发现,只要执行修改的两条指令中间不被其他线程的入队函数打断就不会出现一致性问题,在看二、三条指令,其实就是将new_node与tail进行了一次交换,并将原tail的next设置为new_node,我们可以用原子指令中的交换来完成。改写如下

1
2
3
4
5
void enqueue(struct queue* queue, struct queue_node* node) {
node->next = NULL;
// let queue->tail = node,and let the old queue->tail->next point to node
__atomic_exchange_n(&queue->tail, node, __ATOMIC_RELAXED)->next = node;
}

疑问

如果不用原子指令?

有大聪明可能会问,那如果直接先queue->tail = node;,再回去找父节点让它的next指向node不也可以吗?也就是

1
2
3
struct queue_node* old_tail = queue->tail;
queue->tail = node;
old_tail->next = node;

这里会出现一个问题,那就是old_tail是不是真的是原来那个old_tail,在并行/并发的情况下,很可能执行完old_tail = queue->tail;后其他线程刚好执行完入队,那么就会导致queue->tail发生改变,于是old_tail就不是原来那个old_tail了。

__atomic_exchange_n(xxx)->next = node; 并不是原子的,为什么没有影响?

大聪明们也许注意到了,确实,这并不是在原子操作中的,但也确实不会发生一致性问题。注意到原子交换返回的一定是交换出来的那个queue->tail,不会是其他节点,所以即使中间被其他线程打断,这个赋值也不会有问题。

出队

如果不检查队列非空,那么与入队一样可以直接用交换实现出队,但显然我们不应该允许空队列出队,所以出队函数稍微复杂一些。

我们首先要获得头节点的指针,并将其与尾节点指针比较,如果两者不相等(非空)那么就将头节点与头节点的下一个节点进行交换并返回头节点,否则返回NULL。

然而遗憾的是,CAS和交换都并不能直接做这么一件事,我们只能缓存head,然后在判断完成非空时再比较head是不是原来那个head,如果是则交换出去,不是则继续尝试出队或返回NULL。改写如下

1
2
3
4
5
6
7
8
9
10
11
node* dequeue(struct list* queue) {
struct list_node* head = queue->head;
do {
if ((volatile struct list_node*)head == (volatile struct list_node*)queue->tail) {
// 空队列,返回NULL
return NULL;
}
// 如果head还是queue->head则queue->head = head->next,否则head = queue->head并重新判断
} while (!__atomic_compare_exchange_n(&queue->head, &head, head->next, __ATOMIC_RELAXED));
return head;
}

volatile?

还记得刚才寄存器与内存的同步问题吗?我们的编译器很聪明,编译器当然知道寄存器比内存快,所以编译器并不会在每次赋值的时候都真的去写存,因为编译器发现后面可能还会用到这个变量,所以编译器会等到真的需要写存的时候才会去写存。

但是编译器可能不知道tail这个变量会同时被多个线程访问,有可能别的线程入队时修改了内存中的tail,但是寄存器里的tail还是原来那个,所以我们需要告诉编译器别自作聪明,每次判断head==queue->tail的时候都给我从内存里拿,于是我们就可以用volatile关键字来标记tail,这样编译器会取消这一步的常量优化了。

ABA问题

TODO:

没有银弹

自旋锁

有聪明的小伙伴可能发现了,出队的那个循环不就相当于自旋锁吗?正确的喵,就是自旋锁。

那么这个自旋锁会长时间阻塞吗?答案是不会,原子操作还是很快的,只要没有其他线程在出队就能成功,注意到如果队列为空会直接返回NULL,所以这并不是阻塞队列哦。

原子指令开销

既然原子指令这么好用,为什么大多数能用到的逻辑不使用原子指令呢?

其实原子指令也不是免费的午餐,但比互斥量便宜点。从CPU实现来说,如果遇到原子指令,那么CPU通常会阻塞其他核心访存,本质上也是锁,不过是硬件锁,相对互斥量来说开销会小一些。

阻塞队列

对于生产者/消费者模型,生产者入队,消费者出队,我们更需要的可能是阻塞队列,也就是当消费速度大于生产速度时,队列为空,那么消费者应该阻塞直到生产者生产,而不是直接返回NULL。

众所周知wxWidgets是一个跨平台的GUI库,支持Linux(GTK/X11),Windows和macOS(cocoa),与Qt不同的是,他足够轻量,不需要魔改C++语言本身(qmake说的就是你),而且它调用平台原生API实现GUI,而Qt在Windows上实际上是绘制的界面,所以我打算用wxWidgets来做毕设。

众所周知Windows是现阶段的主流操作系统,而我用来写代码的电脑是mac,所以最终我可能仍然需要提供一个Windows版本的程序,但是我实在是不想在虚拟机上再折腾一套开发环境,所以就想着能不能直接在macOS或者Linux上编译出exe呢?答案是可以的,那就是mingw。

阅读全文 »

项目地址

后端框架跑分

Go有Gin,Java有Tomcat/Spring,Python有Django,PHP本身就是为后端而生,Node.js有http模块,而C/C++……有nginx(?)! 但是众所周知,nginx插件并不好写,而且nginx还要配置文件,实际上也没多少人用nginx插件来写Web后端。

所以我决定来做这件事,我对这个框架的目标是

  • 高性能,既然是用C写,性能一定是优先地位,争取暴打nginx
  • 易用,类似于FaaS的开发和使用体验,如果还能有一个开发用的cli就更好了
  • 灵活,低耦合,可插拔,高度自定义
  • 轻量,少依赖
  • 跨平台,至少支持Linux和macOS
阅读全文 »

也就是优先队列,我们知道STL中提供了一个好用的优先队列priority_queue<T>,C语言中起码有不少库(比如librock)提供了这玩意,所以写这玩意纯属造无意义的轮子。但是我写这玩意主要是因为最近在面试,至少遇到了两次让手写一个堆(不过当时都没写出来,毕竟不会真的有人要手写STL提供的算法和数据结构吧?不会吧?不会吧?),闲着也是闲着,不如锻炼一下,堆还是很好写的。

我写的这个版本用void*模拟泛型,同时能自动扩容,但是缺点也很多,就我自己的感觉是时间复杂度的常数肯定很大,虽然是$O(\log_2 n)$​​​​​,但是有大量的判断,而且插入位置也没有优化,只写了个简单的测试例,总之不推荐在生产环境用(STL不香吗?)。

阅读全文 »

是的,现在api.js成为模块啦!但是我改了下名字,现在他叫restful-proxy,因为我实际上是对RESTful风格的API进行封装,而且用了Proxy,本来是想叫做restful.js的,无奈这个名字已经有人用了(虽然万年没更新,而且也没有我的抽象)。

因为大多数题目的字符串都是小写字母,所以就按照这个设计了,共26个英文字母,每个节点占用26个char。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class TrieNode {
public:
array<TrieNode *, 26> node;
TrieNode() : node({nullptr}) {}
void insert(string &s) {
TrieNode *ptr = this;
for (auto c: s) {
c -= 'a';
if (ptr->node[c] == nullptr) {
ptr->node[c] = new TrieNode;
}
ptr = ptr->node[c];
}
}
bool find(string &s) {
TrieNode* ptr = this;
for (auto c: s) {
c -= 'a';
if (ptr->node[c] == nullptr) {
return false;
} else {
ptr = ptr->node[c];
}
}
return true;
}
};

两个小时,三道题,第二题实在是粗心了,正确率应该能提到100%的,忘记对结果进行特判。

结果

A题

第一题可以选择中文或者英文题面,不过题目好像不太一样,我做的中文题。

阅读全文 »