DPDK memory management

本文主要介绍DPDK在使用和管理内存方面涉及到的技术和机制,以及如何利用这些机制优化网络报文的接收、处理和转发。

Feature

  • FIFO
  • maximum size是固定的,指针存放在一个table中
  • 无锁实现
  • 支持多消费者或单消费者出队
  • 支持多生产者或单生产者入队
  • Bulk operate:入队或出队指定数量的对象,或成功或失败
  • Burst operate:入队或出队指定数量的对象,或成功或返回最大可用对象

Advantages

  • Faster,操作很快只需要简单的CAS指令
  • 比完全无锁实现的队列简单
  • 适用于批量入队/出队操作(bulk operations)。通过操作一个table中的指针完成操作,不会造成大量cache miss。

Disadvantages

  • 大小固定不够灵活
  • 内存使用效率不如链表组织的队列 ,一个空的ring也需要占用内存

Use cases

  • DPDK中应用程序的通信
  • 用于内存池分配器(见mempool章)

Operations

Single Producer Enqueue

初始状态:

  • Step-1: 共享内存中的变量prod_head和cons_tail被复制到本地局部变量prod_head和cons_tail,局部变量prod_next指向表中接下来的一个或几个元素;
  • Step-2: 通过cons_tail判断是否没有足够的空闲空间;
  • Step-3: 取出prod_head处的元素,通过CAS指令判断是否轮到当前线程继续执行,若是则将全局prod_head的值指向局部变量prod_next,反之继续从第一步开始执行;
    中间状态:
  • Step-4: 修改全局prod_tail的值指向全局prod_head。

最终状态:

Single Consumer Dequeue

初始状态:

Steps:

  • Step-1: 将全局cons_head和全局prod_tail复制到本地局部变量cons_head和prod_tail,局部变量cons_next指向table中的下一个或几个元素,如果没有足够空间返回error;
  • Step-2: 通过prod_tail判断是否没有足够的空闲空间;
  • Step-3: 通过CAS指令判断是否由当前线程执行,若是则修改全局cons_head的值执行局部变量cons_next,反之则重新开始循环,执行第一步;
    中间状态:
  • Step-4: 修改全局cons_tail指向cons_head。
    最终状态:

Multiple Producer Enqueue

  • Step-1: 将全局prod_head和全局cons_tail复制到本地局部变量prod_head和cons_tail,局部变量prod_next指向table中的下一个或几个元素,如果没有足够空间返回error;
    初始状态:
  • Step-2: 修改全局prod_head的值,使其指向prod_next,通过CAS指令,原子执行如下操作:
    • 如果全局prod_head和当前局部变量prod_head不同,则CAS指令执行失败,重新执行第一步;
    • 否则,更新全局prod_head指向prod_next,继续执行。

中间状态:

  • Step-3: 直到全局的prod_tail等于当前局部变量prod_tail时,更新全局的prod_tail等于全局的prod_head。

最终状态:

Multiple Consumer Dequeue

  • Step-1: 将全局prod_tail和全局cons_head复制到本地局部变量prod_tail和cons_head,局部变量cons_next指向table中的下一个或几个元素,如果没有足够空间返回error;
  • Step-2: 修改全局cons_head的值,使其指向cons_next,通过CAS指令,原子执行如下操作:
    • 如果全局cons_head和当前局部变量cons_head不同,则CAS指令执行失败,重新执行第一步;
    • 否则,更新全局cons_head指向cons_next,继续执行。
  • Step-3: 直到全局的cons_tail等于当前局部变量cons_tail时,更新全局的cons_tail等于全局的cons_head。

Implementation

struct

两对<head,tail>,包括生产者相关的br_prod_head和br_prod_tail,以及消费者相关的br_cons_head和br_cons_tail,均用volatile修饰,保证每次读取时都是内存中的最新值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
struct buf_ring {
volatile uint32_t br_prod_head;
volatile uint32_t br_prod_tail;
int br_prod_size;
int br_prod_mask;
uint64_t br_drops;
uint64_t br_prod_bufs;
uint64_t br_prod_bytes;
/*
* Pad out to next L2 cache line
*/
uint64_t _pad0[11];

volatile uint32_t br_cons_head;
volatile uint32_t br_cons_tail;
int br_cons_size;
int br_cons_mask;

/*
* Pad out to next L2 cache line
*/
uint64_t _pad1[14];
#ifdef DEBUG_BUFRING
struct mtx *br_lock;
#endif
void *br_ring[0];
};

Single Producer Enqueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
```
## Single Consumer Dequeue
因为没有其他的consumer,每次唯一的consumer只需要判断当前是否存在可用element即可。
```c

/*
* single-consumer dequeue
* use where dequeue is protected by a lock
* e.g. a network driver's tx queue lock
*/
static __inline void *
buf_ring_dequeue_sc(struct buf_ring *br)
{
// local variable, copy from br_cons_head和prod_tail
uint32_t cons_head, cons_next, cons_next_next;
uint32_t prod_tail;
void *buf;

cons_head = br->br_cons_head;
prod_tail = br->br_prod_tail;

// 下一个和下下一个元素位置
cons_next = (cons_head + 1) & br->br_cons_mask;
cons_next_next = (cons_head + 2) & br->br_cons_mask;

// 没有元素,cons_head指向的是下一个需要消费的"对象"
// prod_tail指向下一个加入的对象的位置
if (cons_head == prod_tail)
return (NULL);

#ifdef PREFETCH_DEFINED
// profetch
if (cons_next != prod_tail) {
prefetch(br->br_ring[cons_next]);
if (cons_next_next != prod_tail)
prefetch(br->br_ring[cons_next_next]);
}
#endif
// 更新全局cnns_head
br->br_cons_head = cons_next;
buf = br->br_ring[cons_head];

#ifdef DEBUG_BUFRING
br->br_ring[cons_head] = NULL;
if (!mtx_owned(br->br_lock))
panic("lock not held on single consumer dequeue");
if (br->br_cons_tail != cons_head)
panic("inconsistent list cons_tail=%d cons_head=%d",
br->br_cons_tail, cons_head);
#endif
// 更新全局cons_tail
br->br_cons_tail = cons_next;
return (buf);
}

Multiple Producer Enqueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/*
* multi-producer safe lock-free ring buffer enqueue
*
*/
static __inline int
buf_ring_enqueue_bytes(struct buf_ring *br, void *buf, int nbytes)
{
// local variable
uint32_t prod_head, prod_next;
uint32_t cons_tail;
int success;
#ifdef DEBUG_BUFRING
int i;
// Check whether buf has enqueued
for (i = br->br_cons_head; i != br->br_prod_head;
i = ((i + 1) & br->br_cons_mask))
if(br->br_ring[i] == buf)
panic("buf=%p already enqueue at %d prod=%d cons=%d",
buf, i, br->br_prod_tail, br->br_cons_tail);
#endif
critical_enter();
do {
// 读取最新的全局prod_head和prod_tail
prod_head = br->br_prod_head;
cons_tail = br->br_cons_tail;

prod_next = (prod_head + 1) & br->br_prod_mask;

// check是否还有空闲空间
if (prod_next == cons_tail) {
critical_exit();
return (ENOBUFS);
}

// 如果当前全局prod_head和本地局部prod_head相同,则更新全局prod_head为prod_next
success = atomic_cmpset_int(&br->br_prod_head, prod_head,
prod_next);
} while (success == 0);
#ifdef DEBUG_BUFRING
if (br->br_ring[prod_head] != NULL)
panic("dangling value in enqueue");
#endif
// 入队,保存buf
br->br_ring[prod_head] = buf;
wmb();

/*
* If there are other enqueues in progress
* that preceeded us, we need to wait for them
* to complete
*/
// 等待已经更新完的线程执行完
while (br->br_prod_tail != prod_head)
cpu_spinwait();
br->br_prod_bufs++;
br->br_prod_bytes += nbytes;
br->br_prod_tail = prod_next;
// 退出临界区
critical_exit();
return (0);
}

Multiple Consumer Dequeue

存在多个consumer,通过实现CAS指令功能的atomic_cmpset_int,同步对全局变量的更新操作。有可能存在一个线程,它在当前线程之前已经完成对全局cons_head更新,该线程的局部cons_next的值比当前线程的局部cons_next小,因此该线程需要先执行完毕对全局cons_tail的修改(否则当前线程更新完全局cons_tail后,这个值会被另一位线程修改为一个更小的值)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
 * multi-consumer safe dequeue 
*
*/
static __inline void *
buf_ring_dequeue_mc(struct buf_ring *br)
{
// local variable
uint32_t cons_head, cons_next;
uint32_t prod_tail;
void *buf;
int success;

// 进入临界区
critical_enter();
do {
// 读取最新的全局cons_head和cons_tail
cons_head = br->br_cons_head;
prod_tail = br->br_prod_tail;

// update cons_next
cons_next = (cons_head + 1) & br->br_cons_mask;

// 没有可用元素,退出
if (cons_head == prod_tail) {
critical_exit();
return (NULL);
}

// 更新全局cons_head
success = atomic_cmpset_int(&br->br_cons_head, cons_head,
cons_next);
} while (success == 0);

buf = br->br_ring[cons_head];
#ifdef DEBUG_BUFRING
br->br_ring[cons_head] = NULL;
#endif
rmb();

/*
* If there are other dequeues in progress
* that preceeded us, we need to wait for them
* to complete
*/
// 等待在当前线程更新全局cons_head之前的线程执行完毕
while (br->br_cons_tail != cons_head)
cpu_spinwait();

// update global cons_next and exit
br->br_cons_tail = cons_next;
critical_exit();

return (buf);
}

Another

为什么同时需要cons_head、cons_tail、prod_head和prod_tail?只需要一个消费者头部cons_head和生产者头部prod_head会有什么问题?

生产者线程首先修改prod_head,但是不会修改prod_tail,可以理解为,生产者“预定”了这个位置,然后生产者在该位置写入指向待入队的对象的地址(指针),最后修改prod_tail可以理解为,生产者线程完成了入队操作。因此,仅仅查看prod_head是否移动无法得知生产者是否真正完成了入队操作。尤其对于消费者线程来说,如果消费者线程通过判断prod_head查看是否对内有元素可使用,则会尝试出队一个,生产者还未写入实际对象指针的位置处的指针。

如下是多消费者场景下的出队操作最后更新cons_tail的代码,为什么需要这个while循环?

1
2
3
4
5
6
7
// 等待在当前线程更新全局cons_head之前的线程执行完毕
while (br->br_cons_tail != cons_head)
cpu_spinwait();

// update global cons_next and exit
br->br_cons_tail = cons_next;
critical_exit();

该循环等待在当前线程出队元素之前已经尝试出队的线程,完成出队操作。

reference

[1] DPDK官方文档https://doc.dpdk.org/guides-19.11/prog_guide/ring_lib.html