Mempool

Intruduction

内存池(memory pool)是固定大小对象的分配器。在DPDK中,它由name唯一标识,并使用mempool handler存储空闲对象。默认的mempool handler是基于ring的。它提供了一些其他可选的服务,例如**每核心对象缓存(per-core object cache)**和对齐帮助器,以确保对象被填充,从而在所有DRAM或DDR3通道上均匀分布。

Structure

mempool可以分配固定大小的对象,通过三部分组织实现:

  • mempool对象节点:mempool的对象挂在在
    1
    2
    3
    static struct rte_tailq_elem rte_mempool_tailq = {
    .name = "RTE_MEMPOOL",
    };
  • mempool实际内存区:实际分配的连续内存空间,当需要为指定对象分配内存时,所申请的内存都位于这里,可以把它想象为一个指定类型对象的数组,每一个位置是一块可用内存,用来存放我们指定的对象;
  • ring:一个无锁环形队列,存储着指向可用内存的指针,这些指针所指的区域均位于mempool中。这个ring是用来管理mempool中的可用内存的,可以想象为,mempool是一系列可用的内存块,每块内存可用于存放指定的对象,而ring中则存放着这些可用对象的地址。

struct

有两个主要的数据结构,用于实现mempool的表示

rte_mempool

保存关于mempool的名字、ring、私有数据大小、核相关cache等信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
struct rte_mempool {
/*
* Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
* compatibility requirements, it could be changed to
* RTE_MEMPOOL_NAMESIZE next time the ABI changes
*/
char name[RTE_MEMZONE_NAMESIZE]; /**< Name of mempool. */
RTE_STD_C11
union {
void *pool_data; /**< Ring or pool to store objects. */
uint64_t pool_id; /**< External mempool identifier. */
};
void *pool_config; /**< optional args for ops alloc. */
const struct rte_memzone *mz; /**< Memzone where pool is alloc'd. */
int flags; /**< Flags of the mempool. */
int socket_id; /**< Socket id passed at create. */
uint32_t size; /**< Max size of the mempool. */
uint32_t cache_size;
/**< Size of per-lcore default local cache. */

uint32_t elt_size; /**< Size of an element. */
uint32_t header_size; /**< Size of header (before elt). */
uint32_t trailer_size; /**< Size of trailer (after elt). */

unsigned private_data_size; /**< Size of private data. */
/**
* Index into rte_mempool_ops_table array of mempool ops
* structs, which contain callback function pointers.
* We're using an index here rather than pointers to the callbacks
* to facilitate any secondary processes that may want to use
* this mempool.
*/
int32_t ops_index;

struct rte_mempool_cache *local_cache; /**< Per-lcore local cache */

uint32_t populated_size; /**< Number of populated objects. */
struct rte_mempool_objhdr_list elt_list; /**< List of objects in pool */
uint32_t nb_mem_chunks; /**< Number of memory chunks */
struct rte_mempool_memhdr_list mem_list; /**< List of memory chunks */

#ifdef RTE_LIBRTE_MEMPOOL_DEBUG
/** Per-lcore statistics. */
struct rte_mempool_debug_stats stats[RTE_MAX_LCORE];
#endif
} __rte_cache_aligned;

mempool中存储的每一个对象的结构分为3部分:首部、数据部和尾部,每一部分都按字节对齐填充,调试模式时首部和尾部还能加上cookie以帮助调试缓冲区溢出。

rte_mempool_cache

核相关cache,每个核对应有一个,保存着该核相关的对象指针

1
2
3
4
5
6
7
8
9
10
11
**
* A structure that stores a per-core object cache.
*/
struct rte_mempool_cache {
unsigned len; /**< Cache len */
/*
* Cache is allocated to this size to allow it to overflow in certain
* cases to avoid needless emptying of cache.
*/
void *objs[RTE_MEMPOOL_CACHE_MAX_SIZE * 3]; /**< Cache objects */
} __rte_cache_aligned;

整体结构如下:

Local Cache

为了避免多核访问内存池中的ring带来的并发控制开销,内存池分配器维护了一个关于每个核的cache数组,每个核可以自由访问自己的对象缓存而不会受到其他核的干扰,只有当该缓存为空时,才会访问内存池的ring,当该缓存达到某个阈值时会将对象还给池的ring。

Operation

取对象

核心实现函数是__mempool_put_bulk,参数n是所需要的对象数量,cache_size是cache的配置大小,cache_len是当前的cache大小,执行步骤为:

  • Step-1:如果配置的cache_size=0、或者是单消费者、或n>=cache_len,则执行step-4
  • Step-2:如果cache_len < n,则从ring中申请足够的对象放到cache中(n + (cache_size - cache->len))
  • Step-3:从cache取对象,结束
  • Step-4:ring执行出队,取对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79

/**
* @internal Get several objects from the mempool; used internally.
* @param mp
* A pointer to the mempool structure.
* @param obj_table
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to get, must be strictly positive.
* @param is_mc
* Mono-consumer (0) or multi-consumers (1).
* @return
* - >=0: Success; number of objects supplied.
* - <0: Error; code of ring dequeue function.
*/
static inline int __attribute__((always_inline))
__mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
unsigned n, int is_mc)
{
int ret;
struct rte_mempool_cache *cache;
uint32_t index, len;
void **cache_objs;
unsigned lcore_id = rte_lcore_id();
uint32_t cache_size = mp->cache_size;

/* cache is not enabled or single consumer */
if (unlikely(cache_size == 0 || is_mc == 0 ||
n >= cache_size || lcore_id >= RTE_MAX_LCORE))
goto ring_dequeue;

cache = &mp->local_cache[lcore_id];
cache_objs = cache->objs;

/* Can this be satisfied from the cache? */
if (cache->len < n) {
/* No. Backfill the cache first, and then fill from it */
uint32_t req = n + (cache_size - cache->len);

/* How many do we require i.e. number to fill the cache + the request */
ret = rte_ring_mc_dequeue_bulk(mp->ring, &cache->objs[cache->len], req);
if (unlikely(ret < 0)) {
/*
* In the offchance that we are buffer constrained,
* where we are not able to allocate cache + n, go to
* the ring directly. If that fails, we are truly out of
* buffers.
*/
goto ring_dequeue;
}

cache->len += req;
}

/* Now fill in the response ... */
for (index = 0, len = cache->len - 1; index < n; ++index, len--, obj_table++)
*obj_table = cache_objs[len];

cache->len -= n;

__MEMPOOL_STAT_ADD(mp, get_success, n);

return 0;

ring_dequeue:

/* get remaining objects from ring */
if (is_mc)
ret = rte_ring_mc_dequeue_bulk(mp->ring, obj_table, n);
else
ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n);

if (ret < 0)
__MEMPOOL_STAT_ADD(mp, get_fail, n);
else
__MEMPOOL_STAT_ADD(mp, get_success, n);

return ret;
}

还对象

核心执行函数为__mempool_put_bulk,执行步骤为:

  • Step-1: 如果cache_size = 0、是单生产者模式或n>cache最大限制,则执行step-3
  • Step-2:将对象添加到cache中, 如果cache_len达到了阈值,则将cache_len-cache_size个对象返回给ring,结束
  • Step-3:在ring上执行入队操作,还回对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/**
* @internal Put several objects back in the mempool; used internally.
* @param mp
* A pointer to the mempool structure.
* @param obj_table
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to store back in the mempool, must be strictly
* positive.
* @param is_mp
* Mono-producer (0) or multi-producers (1).
*/
static inline void __attribute__((always_inline))
__mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
unsigned n, int is_mp)
{
struct rte_mempool_cache *cache;
uint32_t index;
void **cache_objs;
unsigned lcore_id = rte_lcore_id();
uint32_t cache_size = mp->cache_size;
uint32_t flushthresh = mp->cache_flushthresh;

/* increment stat now, adding in mempool always success */
__MEMPOOL_STAT_ADD(mp, put, n);

/* cache is not enabled or single producer or non-EAL thread */
if (unlikely(cache_size == 0 || is_mp == 0 ||
lcore_id >= RTE_MAX_LCORE))
goto ring_enqueue;

/* Go straight to ring if put would overflow mem allocated for cache */
if (unlikely(n > RTE_MEMPOOL_CACHE_MAX_SIZE))
goto ring_enqueue;

cache = &mp->local_cache[lcore_id];
cache_objs = &cache->objs[cache->len];

/*
* The cache follows the following algorithm
* 1. Add the objects to the cache
* 2. Anything greater than the cache min value (if it crosses the
* cache flush threshold) is flushed to the ring.
*/

/* Add elements back into the cache */
for (index = 0; index < n; ++index, obj_table++)
cache_objs[index] = *obj_table;

cache->len += n;

if (cache->len >= flushthresh) {
rte_ring_mp_enqueue_bulk(mp->ring, &cache->objs[cache_size],
cache->len - cache_size);
cache->len = cache_size;
}

return;

ring_enqueue:

/* push remaining objects in ring */
if (is_mp) {
if (rte_ring_mp_enqueue_bulk(mp->ring, obj_table, n) < 0)
rte_panic("cannot put objects in mempool\n");
}
else {
if (rte_ring_sp_enqueue_bulk(mp->ring, obj_table, n) < 0)
rte_panic("cannot put objects in mempool\n");
}
}

Create

通过执行rte_mempool_create创建一个pool,该函数通过调用两个核心执行函数之一完成创建任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#ifdef RTE_LIBRTE_XEN_DOM0
return rte_dom0_mempool_create(name, n, elt_size,
cache_size, private_data_size,
mp_init, mp_init_arg,
obj_init, obj_init_arg,
socket_id, flags);
#else
return rte_mempool_xmem_create(name, n, elt_size,
cache_size, private_data_size,
mp_init, mp_init_arg,
obj_init, obj_init_arg,
socket_id, flags,
NULL, NULL, MEMPOOL_PG_NUM_DEFAULT, MEMPOOL_PG_SHIFT_MAX);
#endif

核心执行函数是rte_mempool_xmem_create,主要执行以下任务:

  • 编译时检查,检查是否cache的申请大小过大等
  • 计算mempool中每个对象的大小
  • 创建一个ring,用于存储每个对象的地址
  • 预留private data
  • 分配一个链表的表项,插入到管理mempool的链表中
  • 为mempool分配内存并初始化,调用初始化操作初始化对象

Free

通过调用rte_mempool_free释放mempool以及其保存对象的zone占用的空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/* free a mempool */
void
rte_mempool_free(struct rte_mempool *mp)
{
struct rte_mempool_list *mempool_list = NULL;
struct rte_tailq_entry *te;

if (mp == NULL)
return;

mempool_list = RTE_TAILQ_CAST(rte_mempool_tailq.head, rte_mempool_list);
rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
/* find out tailq entry */
TAILQ_FOREACH(te, mempool_list, next) {
if (te->data == (void *)mp)
break;
}

if (te != NULL) {
TAILQ_REMOVE(mempool_list, te, next);
rte_free(te);
}
rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);

rte_mempool_free_memchunks(mp);
rte_mempool_ops_free(mp);
rte_memzone_free(mp->mz);
}

执行的主要步骤:

  • Step-1: 从rte_mempool_tailq中删除mempool对应的表项
  • Step-2: 调用rte_mempool_free_memchunks、rte_mempool_ops_free、rte_memzone_free释放mempool中保存的对象以及分配给mempool的memzone。