DPDK Hash

本文主要介绍DPDK哈希库的设计原理和主要实现代码。DPDK版本为18.05,哈希库的路径在dpdk/lib/librte_hash/。

主要涉及到的实现文件为:

  • rte_hash.h:声明了创建哈希表所需要的参数信息(一个结构体)、以及对外提供的库函数接口
  • rte_cuckoo_hash.h:cuckoo哈希库实现涉及的接口声明和结构体声明以及定义
  • rte_cuckoo_hash.c:cuckoo哈希库实现函数

本文不会细究所有程序的实现,会忽略库中支持并发而添加的额外机制比如锁、rcu等等,主要分析哈希库使用到的管理数据结构、哈希库的创建和释放、哈希库的查找、插入和删除操作实现的大致过程。有兴趣可以直接查看源代码实现。

Introduction

DPDK提供了哈希库,供用户创建哈希表以进行快速查找。不同于教科书上常见的“数组+链表”形式的哈希表,以及数据库系统中使用到的扩展哈希和线性哈希,DPDK采用CuckooHash算法设计哈希库。

CuckooHash

cuckoo从不自己筑巢哺育雏鸟,而是产卵在别人的巢穴,被霸占巢穴的鸟会被cuckoo雏鸟驱逐

cuckoo哈希的思路在于,每个待插入的key可以通过两个哈希函数分别计算出两个哈希值,一个为primary哈希,另一个成为secondary哈希,即主哈希和从哈希。这两个哈希可以计算出两个索引,用于索引哈希桶,这两个哈希桶也被称为主哈希桶和从哈希桶,key可以存放在这两个桶之一,但是优先存放在主哈希桶。因此每次查找的时候需要在两个桶中搜索。删除操作也是,需要在两个桶中检查是否存在待删除的key。

如果试图插入一个新的key,步骤如下:

  • 检查主哈希桶中是否有空闲位置,若有则插入并返回
  • 检查从哈希桶中是否有空闲位置,若有则插入并返回
  • 检查主哈希桶中的每个元素,如果其从哈希桶中有空闲位置,则将其移动到其从哈希桶,并将待插入的key插入主哈希桶,返回
  • 执行一个递归的过程,尝试“驱逐”一个主桶中的元素,类似地,这个被驱逐的元素也尝试驱逐它的次桶中的元素,被驱逐的元素总是尝试去驱逐其次桶中的元素(如果次桶为满)或,次桶中有位置则移动到次桶,以次递归,尝试在主桶中找到一个位置插入新的key。当然递归是有次数限制的,如果尝试了指定次数还未成功,则返回失败,无法插入key

大部分情况下,key可以成功在主桶中找到位置,而随着哈希表利用率升高,从桶中的key数量也会占比越来越多。如下是来自dpdk官网的统计数据:

image-20240228092113761

dpdk哈希表

在介绍dpdk哈希表数据结构和接口实现之前,有必要对dpdk哈希表有一个整体的逻辑概念。

首先哈希表使用两个数组存放信息:

  • 哈希桶数组,每个元素是一个哈希桶,哈希桶中可以存放若干(DPDK中是8)个entry,每个entry保存<主哈希值,从哈希值,第二个数组的索引>
  • key数组:存放<key,数据指针或数据本身>。

主哈希值和从哈希值的大小都是4byte,为了效率,查找的时候先比较哈希桶内的entry中哈希值和待搜索或删除的key的哈希值,如果相等,再取出该entry对应的key数组中的key和当前key比较。

如果需要插入key,不仅需要在主/从哈希桶中找到一个空闲的位置,还需要在key数组中找到一个位置。

哈希桶数组中找位置是通过哈希值&bucket_mask,每个哈希桶中有八个位置。而key存放在哪个key数组则没有限制。只需要找到一个空闲的位置,放入<key,数据或者指向数据的指针>,然后把该位置的下标,存放在哈希桶中的对应的entry就行,当然该entry还需要保存key的主/从哈希值。

因此,初始化时,所有key数组中的下标都是空闲的,放入ring中,并且哈希表有一个lcore_cache,其中每个core对应下标有一个本地缓存,存放的是可用的key下标,这里的做法和mempool如出一辙。

下图是哈希表逻辑上的结构图:

image-20240228100048365

Data Structure

rte_hash_parameters

用于创建哈希表的参数结构体,包括哈希表的名字、哈希表中条目的数量、哈希表key的长度和哈希表计算主哈希值的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Parameters used when creating the hash table.
*/
struct rte_hash_parameters {
const char *name; /**< 哈希表的名字 */
uint32_t entries; /**< 哈希表的条目数. */
uint32_t reserved; /**< Unused field. Should be set to 0 */
uint32_t key_len; /**< 待存储key的长度. */
rte_hash_function hash_func; /**< 用于计算主哈希的函数. */
uint32_t hash_func_init_val; /**< Init value used by hash_func. */
int socket_id; /**< 分配内存时使用的numa id */
uint8_t extra_flag; /**< Indicate if additional parameters are present. */
};

lcore_cache

per-cpu cache,即每个cpu核心对应的本地缓存结构,包括一个数组和数组当前有效长度,数组中保存的是void*,一般是指向某个空闲可用的对象,每个线程需要获取空闲对象或释放对象时优先从线程所在cpu对应的cache中获取或释放。

对应哈希表结构图中深绿色的部分。

1
2
3
4
5
6

struct lcore_cache {
unsigned len; /**< Cache len */
void *objs[LCORE_CACHE_SIZE]; /**< Cache objects */
} __rte_cache_aligned;

rte_hash_key

哈希表存储的<key, value>键值对。值的大小是固定的8byte,可用于存储不超过8 byte的对象或大对象的指针;key是变长的,其长度需要在创建哈希表时设定。

对应哈希表结构图中黄色的部分。

1
2
3
4
5
6
7
8
9
/* Structure that stores key-value pair */
struct rte_hash_key {
union {
uintptr_t idata;
void *pdata;
};
/* Variable key size */
char key[0];
} __attribute__((aligned(KEY_ALIGNMENT)));

创建哈希表时,需要结合key的长度和rte_hash_key的大小,分配足够大的key数组空间。

rte_hash_bucket

哈希桶结构体,代码中宏定义RTE_HASH_BUCKET_ENTRIES的值是8,也就是每个桶中可以存放8个entry。dpdk没有定义“bucket entry”这个概念的结构体,而是将entry中的值分为若干数组存储。sig_current、sig_alt即主、从哈希值的数组,而key_idx为同索引的sig_current和sig_alt对应的key数组中的key的下标。

至于flag数组,其表明当前entry是否是从主桶驱逐出到从桶的。后续分析接口实现的时候会看到其用处。

对应哈希表结构图中浅紫色的部分。

1
2
3
4
5
6
7
8
9
10
/** Bucket structure */
struct rte_hash_bucket {
hash_sig_t sig_current[RTE_HASH_BUCKET_ENTRIES];

uint32_t key_idx[RTE_HASH_BUCKET_ENTRIES];

hash_sig_t sig_alt[RTE_HASH_BUCKET_ENTRIES];

uint8_t flag[RTE_HASH_BUCKET_ENTRIES];
} __rte_cache_aligned;

rte_hash

哈希表结构体。我们主要关注其中重要的数据成员。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/** A hash table structure. */
struct rte_hash {
char name[RTE_HASH_NAMESIZE]; /**< 哈希表的名字 */
uint32_t entries; /**< 哈希表的容量,可以容纳的条目数 */
uint32_t num_buckets; /**< 哈希桶数 */

struct rte_ring *free_slots;
/**< ring,存放key数组中空闲位置的下标 */
uint8_t hw_trans_mem_support;
/**< Hardware transactional memory support */
struct lcore_cache *local_free_slots;
/**< per-cpu cache,也是存放key数组中空闲位置的下标 */
enum add_key_case add_key; /**< Multi-writer hash add behavior */

rte_spinlock_t *multiwriter_lock; /**< Multi-writer spinlock for w/o TM */

/* Fields used in lookup */

uint32_t key_len __rte_cache_aligned;
/**< Length of hash key. */
rte_hash_function hash_func; /**< 计算主哈希值的函数. */
uint32_t hash_func_init_val; /**< Init value used by hash_func. */
rte_hash_cmp_eq_t rte_hash_custom_cmp_eq;
/**< Custom function used to compare keys. */
enum cmp_jump_table_case cmp_jump_table_idx;
/**< Indicates which compare function to use. */
enum rte_hash_sig_compare_function sig_cmp_fn;
/**< Indicates which signature compare function to use. */
uint32_t bucket_bitmask;
/**< 和主哈希进行&运算获得对应哈希桶的下标 */
uint32_t key_entry_size; /**< 键值对的大小 */

void *key_store; /**< 存放键值对的数组,即上述key数组 */
struct rte_hash_bucket *buckets;
/**< Table with buckets storing all the hash values and key indexes
* to the key table.
*/
} __rte_cache_aligned;

除了上述结构图中提到的哈希桶数组、ring、per-cpu cache、键值对数组外,还需要一些属性,比如哈希表的名字、键值对大小、条目数、哈希函数等等。

Operations

rte_hash_create

创建哈希表。传入rte_hash_parameters参数以描述哈希表的各项数值成员。

  1. 检查哈希表条目总数是否合法,不能超过RTE_HASH_ENTRIES_MAX(1<<30)或小于8(一个哈希桶条目的数量也是8),key的长度不能为0

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    if (params == NULL) {
    RTE_LOG(ERR, HASH, "rte_hash_create has no parameters\n");
    return NULL;
    }

    /* Check for valid parameters */
    if ((params->entries > RTE_HASH_ENTRIES_MAX) ||
    (params->entries < RTE_HASH_BUCKET_ENTRIES) ||
    !rte_is_power_of_2(RTE_HASH_BUCKET_ENTRIES) ||
    (params->key_len == 0)) {
    rte_errno = EINVAL;
    RTE_LOG(ERR, HASH, "rte_hash_create has invalid parameters\n");
    return NULL;
    }
  2. 计算键值对的数量,如果需要用到per-cpu,则键值对的数量会更多一些,并且需要分配per-cpu数组

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    /* Check extra flags field to check extra options. */
    if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_TRANS_MEM_SUPPORT)
    hw_trans_mem_support = 1;

    /* Store all keys and leave the first entry as a dummy entry for lookup_bulk */
    if (hw_trans_mem_support)
    /*
    * Increase number of slots by total number of indices
    * that can be stored in the lcore caches
    * except for the first cache
    */
    num_key_slots = params->entries + (RTE_MAX_LCORE - 1) *
    (LCORE_CACHE_SIZE - 1) + 1;
    else
    num_key_slots = params->entries + 1;
    ...
    if (hw_trans_mem_support) {
    h->local_free_slots = rte_zmalloc_socket(NULL,
    sizeof(struct lcore_cache) * RTE_MAX_LCORE,
    RTE_CACHE_LINE_SIZE, params->socket_id);
    }

  3. 创建ring

  4. 计算哈希桶的数量并分配哈希桶数组。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    const uint32_t num_buckets = rte_align32pow2(params->entries)
    / RTE_HASH_BUCKET_ENTRIES;

    buckets = rte_zmalloc_socket(NULL,
    num_buckets * sizeof(struct rte_hash_bucket),
    RTE_CACHE_LINE_SIZE, params->socket_id);

    if (buckets == NULL) {
    RTE_LOG(ERR, HASH, "memory allocation failed\n");
    goto err_unlock;
    }
    ...
    h->bucket_bitmask = h->num_buckets - 1;// 哈希桶数量对应的mask
  5. 计算键值对大小并分配键值对数组

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    const uint32_t key_entry_size = sizeof(struct rte_hash_key) + params->key_len;
    const uint64_t key_tbl_size = (uint64_t) key_entry_size * num_key_slots;

    k = rte_zmalloc_socket(NULL, key_tbl_size,
    RTE_CACHE_LINE_SIZE, params->socket_id);

    if (k == NULL) {
    RTE_LOG(ERR, HASH, "memory allocation failed\n");
    goto err_unlock;
    }
  6. 初始化ring,此时所有key数组中的位置都是空闲的,将所有下标(除了0)入队

    1
    2
    3
    /* Populate free slots ring. Entry zero is reserved for key misses. */
    for (i = 1; i < num_key_slots; i++)
    rte_ring_sp_enqueue(r, (void *)((uintptr_t) i));

rte_hash_free

释放哈希表中各个组件即可,包括哈希桶、ring、key数组和per-cpu cache数组

1
2
3
4
5
6
7
8
9
10
...
if (h->hw_trans_mem_support)
rte_free(h->local_free_slots);

if (h->add_key == ADD_KEY_MULTIWRITER)
rte_free(h->multiwriter_lock);
rte_ring_free(h->free_slots);
rte_free(h->key_store);
rte_free(h->buckets);
...

rte_hash_secondary_hash

从哈希值是由主哈希生成的,可以理解为再次哈希了一次:

1
2
3
4
5
6
7
8
9
10
11
/* Calc the secondary hash value from the primary hash value of a given key */
static inline hash_sig_t
rte_hash_secondary_hash(const hash_sig_t primary_hash)
{
static const unsigned all_bits_shift = 12;
static const unsigned alt_bits_xor = 0x5bd1e995;

uint32_t tag = primary_hash >> all_bits_shift;

return primary_hash ^ ((tag + 1) * alt_bits_xor);
}

由从哈希值再次调用一次secondary hash,得到的值并不是主哈希值。

rte_hash_lookup

有了对cuckoo哈希和dpdk哈希表整体结构的了解,就不难理解哈希表的查找、插入二号删除过程了。在执行这三种操作时,dpdk还支持传入一个部分哈希值以提高效率的方法,暂且忽略这些,而专注于普通的查、加、删的执行过程。

不论是否提供了部分hash值,dpdk的查找都是通过下述函数执行的,只不过把相应的hash值设为了空。

  1. 首先根据key的主哈希值,在主哈希桶中查找entry,如果找到匹配主哈希值的entry,且其对应的key数组下标不为空(0),则从key数组中找到对应的键值对,并判断key是否匹配。
  2. 根据key的从哈希值,在从桶中比较。这两个桶的比较稍微有些不同
    1. 主桶中比较时,需要比较主哈希值,以及判断是否对应的key下标合法
    2. 从桶中比较时,需要比较主哈希值,从哈希值,但是不需要判断key数组下标是否合法,这个下标一定合法,即不为0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

static inline int32_t
__rte_hash_lookup_with_hash(const struct rte_hash *h, const void *key,
hash_sig_t sig, void **data)
{
uint32_t bucket_idx;
hash_sig_t alt_hash;
unsigned i;
struct rte_hash_bucket *bkt;
struct rte_hash_key *k, *keys = h->key_store;

bucket_idx = sig & h->bucket_bitmask;
bkt = &h->buckets[bucket_idx];

/* Check if key is in primary location */
for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
if (bkt->sig_current[i] == sig &&
bkt->key_idx[i] != EMPTY_SLOT) {
k = (struct rte_hash_key *) ((char *)keys +
bkt->key_idx[i] * h->key_entry_size);
if (rte_hash_cmp_eq(key, k->key, h) == 0) {
if (data != NULL)
*data = k->pdata;
/*
* Return index where key is stored,
* subtracting the first dummy index
*/
return bkt->key_idx[i] - 1;
}
}
}

/* Calculate secondary hash */
alt_hash = rte_hash_secondary_hash(sig);
bucket_idx = alt_hash & h->bucket_bitmask;
bkt = &h->buckets[bucket_idx];

/* Check if key is in secondary location */
for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
if (bkt->sig_current[i] == alt_hash &&
bkt->sig_alt[i] == sig) {
k = (struct rte_hash_key *) ((char *)keys +
bkt->key_idx[i] * h->key_entry_size);
if (rte_hash_cmp_eq(key, k->key, h) == 0) {
if (data != NULL)
*data = k->pdata;
/*
* Return index where key is stored,
* subtracting the first dummy index
*/
return bkt->key_idx[i] - 1;
}
}
}

return -ENOENT;
}

rte_hash_add_key

哈希库中插入新的键值对,可以选择提供一个哈希值,目前仅考虑普通的插入操作流程。

  1. 首先,需要在key数组中获取一个空闲的位置。如果支持per-cpu cache,则优先考虑从当前cpu缓存中获取,如果缓存不够,则从ring中出队一些空闲的位置到当前cpu缓存中;如果没有per-cpu cache,则直接从ring中获取一个空闲的slot,注意ring中存放的全是key数组中空闲slot的下标

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    /* Get a new slot for storing the new key */
    if (h->hw_trans_mem_support) {
    lcore_id = rte_lcore_id();
    cached_free_slots = &h->local_free_slots[lcore_id];
    /* Try to get a free slot from the local cache */
    if (cached_free_slots->len == 0) {
    /* Need to get another burst of free slots from global ring */
    n_slots = rte_ring_mc_dequeue_burst(h->free_slots,
    cached_free_slots->objs,
    LCORE_CACHE_SIZE, NULL);
    if (n_slots == 0) {
    ret = -ENOSPC;
    goto failure;
    }

    cached_free_slots->len += n_slots;
    }

    /* Get a free slot from the local cache */
    cached_free_slots->len--;
    slot_id = cached_free_slots->objs[cached_free_slots->len];
    } else {
    if (rte_ring_sc_dequeue(h->free_slots, &slot_id) != 0) {
    ret = -ENOSPC;
    goto failure;
    }
    }
  2. 在主桶和从桶中检查是否已经有插入当前待插入的key,如果找到则更新key对应的data

  3. 尝试在主桶中找到一个空闲的slot,如果没有,则尝试将主桶中某个元素push到其从桶中。此处有一个递归push的操作。

reference

[1] DPDK官方文档https://doc.dpdk.org/guides-19.11/prog_guide/ring_lib.html

DPDK memory management

本文主要介绍DPDK在使用和管理内存方面涉及到的技术和机制,以及如何利用这些机制优化网络报文的接收、处理和转发。

Feature

  • FIFO
  • maximum size是固定的,指针存放在一个table中
  • 无锁实现
  • 支持多消费者或单消费者出队
  • 支持多生产者或单生产者入队
  • Bulk operate:入队或出队指定数量的对象,或成功或失败
  • Burst operate:入队或出队指定数量的对象,或成功或返回最大可用对象

Advantages

  • Faster,操作很快只需要简单的CAS指令
  • 比完全无锁实现的队列简单
  • 适用于批量入队/出队操作(bulk operations)。通过操作一个table中的指针完成操作,不会造成大量cache miss。

Disadvantages

  • 大小固定不够灵活
  • 内存使用效率不如链表组织的队列 ,一个空的ring也需要占用内存

Use cases

  • DPDK中应用程序的通信
  • 用于内存池分配器(见mempool章)

Operations

Single Producer Enqueue

初始状态:

  • Step-1: 共享内存中的变量prod_head和cons_tail被复制到本地局部变量prod_head和cons_tail,局部变量prod_next指向表中接下来的一个或几个元素;
  • Step-2: 通过cons_tail判断是否没有足够的空闲空间;
  • Step-3: 取出prod_head处的元素,通过CAS指令判断是否轮到当前线程继续执行,若是则将全局prod_head的值指向局部变量prod_next,反之继续从第一步开始执行;
    中间状态:
  • Step-4: 修改全局prod_tail的值指向全局prod_head。

最终状态:

Single Consumer Dequeue

初始状态:

Steps:

  • Step-1: 将全局cons_head和全局prod_tail复制到本地局部变量cons_head和prod_tail,局部变量cons_next指向table中的下一个或几个元素,如果没有足够空间返回error;
  • Step-2: 通过prod_tail判断是否没有足够的空闲空间;
  • Step-3: 通过CAS指令判断是否由当前线程执行,若是则修改全局cons_head的值执行局部变量cons_next,反之则重新开始循环,执行第一步;
    中间状态:
  • Step-4: 修改全局cons_tail指向cons_head。
    最终状态:

Multiple Producer Enqueue

  • Step-1: 将全局prod_head和全局cons_tail复制到本地局部变量prod_head和cons_tail,局部变量prod_next指向table中的下一个或几个元素,如果没有足够空间返回error;
    初始状态:
  • Step-2: 修改全局prod_head的值,使其指向prod_next,通过CAS指令,原子执行如下操作:
    • 如果全局prod_head和当前局部变量prod_head不同,则CAS指令执行失败,重新执行第一步;
    • 否则,更新全局prod_head指向prod_next,继续执行。

中间状态:

  • Step-3: 直到全局的prod_tail等于当前局部变量prod_tail时,更新全局的prod_tail等于全局的prod_head。

最终状态:

Multiple Consumer Dequeue

  • Step-1: 将全局prod_tail和全局cons_head复制到本地局部变量prod_tail和cons_head,局部变量cons_next指向table中的下一个或几个元素,如果没有足够空间返回error;
  • Step-2: 修改全局cons_head的值,使其指向cons_next,通过CAS指令,原子执行如下操作:
    • 如果全局cons_head和当前局部变量cons_head不同,则CAS指令执行失败,重新执行第一步;
    • 否则,更新全局cons_head指向cons_next,继续执行。
  • Step-3: 直到全局的cons_tail等于当前局部变量cons_tail时,更新全局的cons_tail等于全局的cons_head。

Implementation

struct

两对<head,tail>,包括生产者相关的br_prod_head和br_prod_tail,以及消费者相关的br_cons_head和br_cons_tail,均用volatile修饰,保证每次读取时都是内存中的最新值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
struct buf_ring {
volatile uint32_t br_prod_head;
volatile uint32_t br_prod_tail;
int br_prod_size;
int br_prod_mask;
uint64_t br_drops;
uint64_t br_prod_bufs;
uint64_t br_prod_bytes;
/*
* Pad out to next L2 cache line
*/
uint64_t _pad0[11];

volatile uint32_t br_cons_head;
volatile uint32_t br_cons_tail;
int br_cons_size;
int br_cons_mask;

/*
* Pad out to next L2 cache line
*/
uint64_t _pad1[14];
#ifdef DEBUG_BUFRING
struct mtx *br_lock;
#endif
void *br_ring[0];
};

Single Producer Enqueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
```
## Single Consumer Dequeue
因为没有其他的consumer,每次唯一的consumer只需要判断当前是否存在可用element即可。
```c

/*
* single-consumer dequeue
* use where dequeue is protected by a lock
* e.g. a network driver's tx queue lock
*/
static __inline void *
buf_ring_dequeue_sc(struct buf_ring *br)
{
// local variable, copy from br_cons_head和prod_tail
uint32_t cons_head, cons_next, cons_next_next;
uint32_t prod_tail;
void *buf;

cons_head = br->br_cons_head;
prod_tail = br->br_prod_tail;

// 下一个和下下一个元素位置
cons_next = (cons_head + 1) & br->br_cons_mask;
cons_next_next = (cons_head + 2) & br->br_cons_mask;

// 没有元素,cons_head指向的是下一个需要消费的"对象"
// prod_tail指向下一个加入的对象的位置
if (cons_head == prod_tail)
return (NULL);

#ifdef PREFETCH_DEFINED
// profetch
if (cons_next != prod_tail) {
prefetch(br->br_ring[cons_next]);
if (cons_next_next != prod_tail)
prefetch(br->br_ring[cons_next_next]);
}
#endif
// 更新全局cnns_head
br->br_cons_head = cons_next;
buf = br->br_ring[cons_head];

#ifdef DEBUG_BUFRING
br->br_ring[cons_head] = NULL;
if (!mtx_owned(br->br_lock))
panic("lock not held on single consumer dequeue");
if (br->br_cons_tail != cons_head)
panic("inconsistent list cons_tail=%d cons_head=%d",
br->br_cons_tail, cons_head);
#endif
// 更新全局cons_tail
br->br_cons_tail = cons_next;
return (buf);
}

Multiple Producer Enqueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/*
* multi-producer safe lock-free ring buffer enqueue
*
*/
static __inline int
buf_ring_enqueue_bytes(struct buf_ring *br, void *buf, int nbytes)
{
// local variable
uint32_t prod_head, prod_next;
uint32_t cons_tail;
int success;
#ifdef DEBUG_BUFRING
int i;
// Check whether buf has enqueued
for (i = br->br_cons_head; i != br->br_prod_head;
i = ((i + 1) & br->br_cons_mask))
if(br->br_ring[i] == buf)
panic("buf=%p already enqueue at %d prod=%d cons=%d",
buf, i, br->br_prod_tail, br->br_cons_tail);
#endif
critical_enter();
do {
// 读取最新的全局prod_head和prod_tail
prod_head = br->br_prod_head;
cons_tail = br->br_cons_tail;

prod_next = (prod_head + 1) & br->br_prod_mask;

// check是否还有空闲空间
if (prod_next == cons_tail) {
critical_exit();
return (ENOBUFS);
}

// 如果当前全局prod_head和本地局部prod_head相同,则更新全局prod_head为prod_next
success = atomic_cmpset_int(&br->br_prod_head, prod_head,
prod_next);
} while (success == 0);
#ifdef DEBUG_BUFRING
if (br->br_ring[prod_head] != NULL)
panic("dangling value in enqueue");
#endif
// 入队,保存buf
br->br_ring[prod_head] = buf;
wmb();

/*
* If there are other enqueues in progress
* that preceeded us, we need to wait for them
* to complete
*/
// 等待已经更新完的线程执行完
while (br->br_prod_tail != prod_head)
cpu_spinwait();
br->br_prod_bufs++;
br->br_prod_bytes += nbytes;
br->br_prod_tail = prod_next;
// 退出临界区
critical_exit();
return (0);
}

Multiple Consumer Dequeue

存在多个consumer,通过实现CAS指令功能的atomic_cmpset_int,同步对全局变量的更新操作。有可能存在一个线程,它在当前线程之前已经完成对全局cons_head更新,该线程的局部cons_next的值比当前线程的局部cons_next小,因此该线程需要先执行完毕对全局cons_tail的修改(否则当前线程更新完全局cons_tail后,这个值会被另一位线程修改为一个更小的值)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
 * multi-consumer safe dequeue 
*
*/
static __inline void *
buf_ring_dequeue_mc(struct buf_ring *br)
{
// local variable
uint32_t cons_head, cons_next;
uint32_t prod_tail;
void *buf;
int success;

// 进入临界区
critical_enter();
do {
// 读取最新的全局cons_head和cons_tail
cons_head = br->br_cons_head;
prod_tail = br->br_prod_tail;

// update cons_next
cons_next = (cons_head + 1) & br->br_cons_mask;

// 没有可用元素,退出
if (cons_head == prod_tail) {
critical_exit();
return (NULL);
}

// 更新全局cons_head
success = atomic_cmpset_int(&br->br_cons_head, cons_head,
cons_next);
} while (success == 0);

buf = br->br_ring[cons_head];
#ifdef DEBUG_BUFRING
br->br_ring[cons_head] = NULL;
#endif
rmb();

/*
* If there are other dequeues in progress
* that preceeded us, we need to wait for them
* to complete
*/
// 等待在当前线程更新全局cons_head之前的线程执行完毕
while (br->br_cons_tail != cons_head)
cpu_spinwait();

// update global cons_next and exit
br->br_cons_tail = cons_next;
critical_exit();

return (buf);
}

Another

为什么同时需要cons_head、cons_tail、prod_head和prod_tail?只需要一个消费者头部cons_head和生产者头部prod_head会有什么问题?

生产者线程首先修改prod_head,但是不会修改prod_tail,可以理解为,生产者“预定”了这个位置,然后生产者在该位置写入指向待入队的对象的地址(指针),最后修改prod_tail可以理解为,生产者线程完成了入队操作。因此,仅仅查看prod_head是否移动无法得知生产者是否真正完成了入队操作。尤其对于消费者线程来说,如果消费者线程通过判断prod_head查看是否对内有元素可使用,则会尝试出队一个,生产者还未写入实际对象指针的位置处的指针。

如下是多消费者场景下的出队操作最后更新cons_tail的代码,为什么需要这个while循环?

1
2
3
4
5
6
7
// 等待在当前线程更新全局cons_head之前的线程执行完毕
while (br->br_cons_tail != cons_head)
cpu_spinwait();

// update global cons_next and exit
br->br_cons_tail = cons_next;
critical_exit();

该循环等待在当前线程出队元素之前已经尝试出队的线程,完成出队操作。

reference

[1] DPDK官方文档https://doc.dpdk.org/guides-19.11/prog_guide/ring_lib.html

自旋锁

本文讨论自旋锁以及其在x86-64平台下的实现。

spin lock

自旋锁,是操作系统实现互斥操作的一种手段,用于保护临界区的互斥访问,防止不同的执行流(即线程)同时访问临界区而造成资源状态不一致的情况。

implementation

抽象语法树

AST

image-20230504220248773

class definition

数据成员

对于一个ast对象,其具备三个子树根节点,以及当前节点的字符串类型符号,另外,再保存一个bool类型变量,指示当前节点是否为叶子节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/* AST */
class AST
{
private:
const string val{};
AST * left_child{};
AST * mid_child{};
AST * right_child{};
bool is_leaf{};
public:
AST()=default;
explicit AST(const string & token) : val(token) , is_leaf(isTerminate(token)) {}
explicit AST(string && token) : val(std::move(token)) {}
};

定义两个全局函数,用于判断符号是否为终结符号,以及是否为运算符。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

bool isTerminate(const string & str)
{
return !isupper(str[0]);
}

bool isOp(const string & str)
{
switch (str[0])
{
case '+':
case '-':
case '*':
case '/':
case '%':
return true;
default:return str == "minus";
}
}

方法

除了默认构造函数以及两个接收string类型(左值和右值)的构造函数外,我们在定义两个成员方法和一个静态方法:

  • built_ast:接受一个istream类型的参数,读取输入的ast文本表示信息,构建ast对象,该方法与特定的ast对象无关,因此用static修饰。
  • post_traverse:接收一个ast类型参数,打印其四元式序列,本质为后序遍历。
  • check:递归检查该ast的各层节点之间的“父子关系”,用于调试使用。

首先实现逻辑较为简单的check,我们一次打印出当前节点的符号以及子树的符号即可,如果某一个子树为空,则用#替代

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void AST::check() const
{
cout << val << ": ";
if (!is_leaf)
{
cout << (left_child ? left_child->val : "#") << "\t" << (mid_child ? mid_child->val : "#") << "\t" << (right_child ? right_child->val : "#") << endl;
if (left_child)
left_child->check();
if (mid_child)
mid_child->check();
if (right_child)
right_child->check();
}
else
cout << endl;
}

再开post_traverse,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

void AST::post_traverse() const
{
if (!is_leaf)
{
if (left_child && !left_child->is_leaf)
left_child->post_traverse();
if (mid_child && !mid_child->is_leaf)
mid_child->post_traverse();
if (right_child && !right_child->is_leaf)
right_child->post_traverse();

string operand1, op, operand2;
// case 1. 一个左孩子,且为叶子节点
if (left_child && !mid_child && !right_child)
{
operand1 = left_child->val;
operand2 = "-";
op = ":=";
}
else if (mid_child && right_child && isOp(mid_child->val))
{
operand1 = left_child ? left_child->val : right_child->val;
operand2 = left_child ? right_child->val : "-";
op = mid_child->val;
}
else if (left_child && right_child && left_child->val == "(")
{
operand1 = mid_child->val;
operand2 = "-";
op = ":=";
}
cout << op << "," << operand1 << "," << operand2 << "," << val << endl;
}
}

build_ast的实现源码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
 AST *  AST::build_AST(istream & in)
{
string line;
stack<pair<uint32_t , AST*>> stk;
while (getline(in, line))
{
line.push_back(' ');
// adjust idx to the first
uint32_t idx = 0;
uint32_t next_idx;
while (idx < line.length() && line[idx] == ' ')
++idx;
while (idx < line.length())
{
next_idx = line.find(' ', idx);
string token = line.substr(idx, next_idx - idx);
AST *node = new AST(token);
while (!stk.empty() && stk.top().first >= idx)
stk.pop();
// 设置栈顶的节点的孩子节点
if (!stk.empty())
{
auto &dad = stk.top().second;

// token 是左右括号
if (token == "(")
{
assert(dad->left_child == nullptr);
dad->left_child = node;
}
else if (token == ")")
{
assert(dad->right_child == nullptr);
dad->right_child = node;
}
else
{
if (isOp(token) || (dad->left_child && dad->left_child->val == "("))
dad->mid_child = node;
else
{
if (!dad->mid_child)
dad->left_child = node;
else
dad->right_child = node;
}
}
}
stk.emplace(idx, node);

idx = next_idx + 1;
}
}
while (stk.size() > 1)
stk.pop();

return stk.top().second;
}

测试

测试文件:

1
2
3
4
5
6
7
8
9
10
11
12
E minus
T1 T0 K num
/
20
*
F (
P M x
%
6
+
j0
)

编写main函数,使用测试文件测试ast:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int main()
{
ifstream fin("tree.txt");
if (!fin.is_open())
{
cerr << "Fail to open file\n";
exit(1);
}
auto *ast = AST::build_AST(fin);
fin.close();

ast->post_traverse();
cout << "--------------------\n";
ast->check();

return 0;
}

输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
:=,num,-,K
/,K,20,T0
%,x,6,M
+,M,j0,P
:=,P,-,F
*,T0,F,T1
minus,T1,-,E
--------------------
E: # minus T1
minus:
T1: T0 * F
T0: K / 20
K: num # #
num:
/:
20:
*:
F: ( P )
(:
P: M + j0
M: x % 6
x:
%:
6:
+:
j0:
):

Mempool

Intruduction

内存池(memory pool)是固定大小对象的分配器。在DPDK中,它由name唯一标识,并使用mempool handler存储空闲对象。默认的mempool handler是基于ring的。它提供了一些其他可选的服务,例如**每核心对象缓存(per-core object cache)**和对齐帮助器,以确保对象被填充,从而在所有DRAM或DDR3通道上均匀分布。

Structure

mempool可以分配固定大小的对象,通过三部分组织实现:

  • mempool对象节点:mempool的对象挂在在
    1
    2
    3
    static struct rte_tailq_elem rte_mempool_tailq = {
    .name = "RTE_MEMPOOL",
    };
  • mempool实际内存区:实际分配的连续内存空间,当需要为指定对象分配内存时,所申请的内存都位于这里,可以把它想象为一个指定类型对象的数组,每一个位置是一块可用内存,用来存放我们指定的对象;
  • ring:一个无锁环形队列,存储着指向可用内存的指针,这些指针所指的区域均位于mempool中。这个ring是用来管理mempool中的可用内存的,可以想象为,mempool是一系列可用的内存块,每块内存可用于存放指定的对象,而ring中则存放着这些可用对象的地址。

struct

有两个主要的数据结构,用于实现mempool的表示

rte_mempool

保存关于mempool的名字、ring、私有数据大小、核相关cache等信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
struct rte_mempool {
/*
* Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
* compatibility requirements, it could be changed to
* RTE_MEMPOOL_NAMESIZE next time the ABI changes
*/
char name[RTE_MEMZONE_NAMESIZE]; /**< Name of mempool. */
RTE_STD_C11
union {
void *pool_data; /**< Ring or pool to store objects. */
uint64_t pool_id; /**< External mempool identifier. */
};
void *pool_config; /**< optional args for ops alloc. */
const struct rte_memzone *mz; /**< Memzone where pool is alloc'd. */
int flags; /**< Flags of the mempool. */
int socket_id; /**< Socket id passed at create. */
uint32_t size; /**< Max size of the mempool. */
uint32_t cache_size;
/**< Size of per-lcore default local cache. */

uint32_t elt_size; /**< Size of an element. */
uint32_t header_size; /**< Size of header (before elt). */
uint32_t trailer_size; /**< Size of trailer (after elt). */

unsigned private_data_size; /**< Size of private data. */
/**
* Index into rte_mempool_ops_table array of mempool ops
* structs, which contain callback function pointers.
* We're using an index here rather than pointers to the callbacks
* to facilitate any secondary processes that may want to use
* this mempool.
*/
int32_t ops_index;

struct rte_mempool_cache *local_cache; /**< Per-lcore local cache */

uint32_t populated_size; /**< Number of populated objects. */
struct rte_mempool_objhdr_list elt_list; /**< List of objects in pool */
uint32_t nb_mem_chunks; /**< Number of memory chunks */
struct rte_mempool_memhdr_list mem_list; /**< List of memory chunks */

#ifdef RTE_LIBRTE_MEMPOOL_DEBUG
/** Per-lcore statistics. */
struct rte_mempool_debug_stats stats[RTE_MAX_LCORE];
#endif
} __rte_cache_aligned;

mempool中存储的每一个对象的结构分为3部分:首部、数据部和尾部,每一部分都按字节对齐填充,调试模式时首部和尾部还能加上cookie以帮助调试缓冲区溢出。

rte_mempool_cache

核相关cache,每个核对应有一个,保存着该核相关的对象指针

1
2
3
4
5
6
7
8
9
10
11
**
* A structure that stores a per-core object cache.
*/
struct rte_mempool_cache {
unsigned len; /**< Cache len */
/*
* Cache is allocated to this size to allow it to overflow in certain
* cases to avoid needless emptying of cache.
*/
void *objs[RTE_MEMPOOL_CACHE_MAX_SIZE * 3]; /**< Cache objects */
} __rte_cache_aligned;

整体结构如下:

Local Cache

为了避免多核访问内存池中的ring带来的并发控制开销,内存池分配器维护了一个关于每个核的cache数组,每个核可以自由访问自己的对象缓存而不会受到其他核的干扰,只有当该缓存为空时,才会访问内存池的ring,当该缓存达到某个阈值时会将对象还给池的ring。

Operation

取对象

核心实现函数是__mempool_put_bulk,参数n是所需要的对象数量,cache_size是cache的配置大小,cache_len是当前的cache大小,执行步骤为:

  • Step-1:如果配置的cache_size=0、或者是单消费者、或n>=cache_len,则执行step-4
  • Step-2:如果cache_len < n,则从ring中申请足够的对象放到cache中(n + (cache_size - cache->len))
  • Step-3:从cache取对象,结束
  • Step-4:ring执行出队,取对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79

/**
* @internal Get several objects from the mempool; used internally.
* @param mp
* A pointer to the mempool structure.
* @param obj_table
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to get, must be strictly positive.
* @param is_mc
* Mono-consumer (0) or multi-consumers (1).
* @return
* - >=0: Success; number of objects supplied.
* - <0: Error; code of ring dequeue function.
*/
static inline int __attribute__((always_inline))
__mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
unsigned n, int is_mc)
{
int ret;
struct rte_mempool_cache *cache;
uint32_t index, len;
void **cache_objs;
unsigned lcore_id = rte_lcore_id();
uint32_t cache_size = mp->cache_size;

/* cache is not enabled or single consumer */
if (unlikely(cache_size == 0 || is_mc == 0 ||
n >= cache_size || lcore_id >= RTE_MAX_LCORE))
goto ring_dequeue;

cache = &mp->local_cache[lcore_id];
cache_objs = cache->objs;

/* Can this be satisfied from the cache? */
if (cache->len < n) {
/* No. Backfill the cache first, and then fill from it */
uint32_t req = n + (cache_size - cache->len);

/* How many do we require i.e. number to fill the cache + the request */
ret = rte_ring_mc_dequeue_bulk(mp->ring, &cache->objs[cache->len], req);
if (unlikely(ret < 0)) {
/*
* In the offchance that we are buffer constrained,
* where we are not able to allocate cache + n, go to
* the ring directly. If that fails, we are truly out of
* buffers.
*/
goto ring_dequeue;
}

cache->len += req;
}

/* Now fill in the response ... */
for (index = 0, len = cache->len - 1; index < n; ++index, len--, obj_table++)
*obj_table = cache_objs[len];

cache->len -= n;

__MEMPOOL_STAT_ADD(mp, get_success, n);

return 0;

ring_dequeue:

/* get remaining objects from ring */
if (is_mc)
ret = rte_ring_mc_dequeue_bulk(mp->ring, obj_table, n);
else
ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n);

if (ret < 0)
__MEMPOOL_STAT_ADD(mp, get_fail, n);
else
__MEMPOOL_STAT_ADD(mp, get_success, n);

return ret;
}

还对象

核心执行函数为__mempool_put_bulk,执行步骤为:

  • Step-1: 如果cache_size = 0、是单生产者模式或n>cache最大限制,则执行step-3
  • Step-2:将对象添加到cache中, 如果cache_len达到了阈值,则将cache_len-cache_size个对象返回给ring,结束
  • Step-3:在ring上执行入队操作,还回对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/**
* @internal Put several objects back in the mempool; used internally.
* @param mp
* A pointer to the mempool structure.
* @param obj_table
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to store back in the mempool, must be strictly
* positive.
* @param is_mp
* Mono-producer (0) or multi-producers (1).
*/
static inline void __attribute__((always_inline))
__mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
unsigned n, int is_mp)
{
struct rte_mempool_cache *cache;
uint32_t index;
void **cache_objs;
unsigned lcore_id = rte_lcore_id();
uint32_t cache_size = mp->cache_size;
uint32_t flushthresh = mp->cache_flushthresh;

/* increment stat now, adding in mempool always success */
__MEMPOOL_STAT_ADD(mp, put, n);

/* cache is not enabled or single producer or non-EAL thread */
if (unlikely(cache_size == 0 || is_mp == 0 ||
lcore_id >= RTE_MAX_LCORE))
goto ring_enqueue;

/* Go straight to ring if put would overflow mem allocated for cache */
if (unlikely(n > RTE_MEMPOOL_CACHE_MAX_SIZE))
goto ring_enqueue;

cache = &mp->local_cache[lcore_id];
cache_objs = &cache->objs[cache->len];

/*
* The cache follows the following algorithm
* 1. Add the objects to the cache
* 2. Anything greater than the cache min value (if it crosses the
* cache flush threshold) is flushed to the ring.
*/

/* Add elements back into the cache */
for (index = 0; index < n; ++index, obj_table++)
cache_objs[index] = *obj_table;

cache->len += n;

if (cache->len >= flushthresh) {
rte_ring_mp_enqueue_bulk(mp->ring, &cache->objs[cache_size],
cache->len - cache_size);
cache->len = cache_size;
}

return;

ring_enqueue:

/* push remaining objects in ring */
if (is_mp) {
if (rte_ring_mp_enqueue_bulk(mp->ring, obj_table, n) < 0)
rte_panic("cannot put objects in mempool\n");
}
else {
if (rte_ring_sp_enqueue_bulk(mp->ring, obj_table, n) < 0)
rte_panic("cannot put objects in mempool\n");
}
}

Create

通过执行rte_mempool_create创建一个pool,该函数通过调用两个核心执行函数之一完成创建任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#ifdef RTE_LIBRTE_XEN_DOM0
return rte_dom0_mempool_create(name, n, elt_size,
cache_size, private_data_size,
mp_init, mp_init_arg,
obj_init, obj_init_arg,
socket_id, flags);
#else
return rte_mempool_xmem_create(name, n, elt_size,
cache_size, private_data_size,
mp_init, mp_init_arg,
obj_init, obj_init_arg,
socket_id, flags,
NULL, NULL, MEMPOOL_PG_NUM_DEFAULT, MEMPOOL_PG_SHIFT_MAX);
#endif

核心执行函数是rte_mempool_xmem_create,主要执行以下任务:

  • 编译时检查,检查是否cache的申请大小过大等
  • 计算mempool中每个对象的大小
  • 创建一个ring,用于存储每个对象的地址
  • 预留private data
  • 分配一个链表的表项,插入到管理mempool的链表中
  • 为mempool分配内存并初始化,调用初始化操作初始化对象

Free

通过调用rte_mempool_free释放mempool以及其保存对象的zone占用的空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/* free a mempool */
void
rte_mempool_free(struct rte_mempool *mp)
{
struct rte_mempool_list *mempool_list = NULL;
struct rte_tailq_entry *te;

if (mp == NULL)
return;

mempool_list = RTE_TAILQ_CAST(rte_mempool_tailq.head, rte_mempool_list);
rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
/* find out tailq entry */
TAILQ_FOREACH(te, mempool_list, next) {
if (te->data == (void *)mp)
break;
}

if (te != NULL) {
TAILQ_REMOVE(mempool_list, te, next);
rte_free(te);
}
rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);

rte_mempool_free_memchunks(mp);
rte_mempool_ops_free(mp);
rte_memzone_free(mp->mz);
}

执行的主要步骤:

  • Step-1: 从rte_mempool_tailq中删除mempool对应的表项
  • Step-2: 调用rte_mempool_free_memchunks、rte_mempool_ops_free、rte_memzone_free释放mempool中保存的对象以及分配给mempool的memzone。

type traits

本文阐述c++中的type_traits技术,即类型萃取。

示例代码

通过如下测试程序,练习type traits的使用。

1
2
3
4
5
template<typename T>
struct my_strait
{
using has_trivial_default_copy_constructor = __true_type;
};

设计两个类,分别具有nontrivial的copy-assignment和trivial的copy-assignment,很容易想到两个典型的类:String和Complex,我们写出他们的定义,其中String包含了自定义的nontrivial-copy-assignment:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class String
{
friend ostream &operator<<(ostream &os, const String &rhs)
{
cout << rhs.ps;
return os;
}
private:
char *ps{};
size_t len{};
public:
String() = default;

explicit String(const char *s)
{
len = strlen(s);
ps = new char[len + 1];
strncpy(ps, s, len);
ps[len] = '\0';
}

String(const String &s) : ps(new char[s.len + 1]), len(s.len)
{
strncpy(ps, s.ps, len);
ps[len] = '\0';
}

String &operator=(const String &s)
{
if (this != &s)
{
delete[] ps;
len = s.len;
ps = new char[len + 1];
strncpy(ps, s.ps, len);
ps[len] = '\0';
}
return *this;
}

~String()
{
delete[] ps;
}
};

class Complex
{
friend ostream &operator<<(ostream &os, const Complex &rhs)
{
cout << rhs.real << " + " << rhs.imag << "i";
return os;
}

private:
double real{};
double imag{};
public:
Complex() = default;

Complex(double r, double i) : real(r), imag(i)
{}
};

为这两个类添加了重载<<输出运算符友元函数,方便我们输出对象的内容。

提供两个类相关的特化版本my_trait:

1
2
3
4
5
6
7
8
9
10
11
template<>
struct my_strait<String>
{
using has_trivial_default_copy_assign = __false_type;
};

template<>
struct my_strait<Complex>
{
using has_trivial_default_copy_assign = __true_type;
};

给出一个模板函数copy,其会根据传入的参数实际类型,是否具有trivial的默认拷贝赋值运算符,调用不同的重载函数copy_aux:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
template <typename T>
void copy_aux(T &lhs, const T &rhs, __true_type)
{
cout << "this is true type copy\n";
lhs = rhs;
}

template <typename T>
void copy_aux(T &lhs, const T &rhs, __false_type)
{
cout << "this is false type copy\n";
lhs = rhs;
}

template <typename T>
void copy(T &lhs, const T &rhs)
{
using Temp = typename my_strait<T>::has_trivial_default_copy_assign;
copy_aux(lhs, rhs, Temp());
}

编写如下main函数测试,观察copy函数的执行情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int main()
{
cout <<"=============String================\n";
String s1("hello");
String s2;
copy(s2, s1);
cout << s1 << endl;
cout << s2 << endl;

cout <<"=============Complex================\n";
Complex c1(1.2, 3.4);
Complex c2;
copy(c2, c1);

cout << c1 << endl;
cout << c2 << endl;

cout <<"=============double================\n";
double a = 13.2;
double b = 2.1;
copy(b, a);
cout << a << endl;
cout << b << endl;

return 0;
}

执行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
=============String================
this is false type copy
hello
hello
=============Complex================
this is true type copy
1.2 + 3.4i
1.2 + 3.4i
=============double================
this is true type copy
13.2
13.2

我的电脑环境是Windows11系统,clion 2022,在Linux平台上应该也是一样的。

最长回文子串

本文讨论求解字符串最长回文子串长度和位置的方法。

最长回文子串

对于一个字符串,顺着读和反着读是一样的字符序列,则称其为回文串,即反转后的字符串和原字符串相同。给定一个字符串,判断其是否为回文串的方法很简单,至少我们可以给出两种显而易见的方法:

  • 反转当前字符串,判断是否和原字符串相同
  • 分别从字符串的头和尾开始,逐一判断是否两边的字符相同

倘若给定一个字符串,如何求其最长的回文子串?

朴素/暴力

根据回文串的长度奇偶性,回文串中心有一个字符或者两个字符,对于每个位置的字符,求出以其为中心的奇数长度的最长回文串长度,以及以其为偏中心的偶数长度的最长回文串长度,也可以求出字符串的最长回文子串。暂且称之为朴素算法或者暴力算法。

过程

从i等于0到n-1,枚举每一个位置,分别求以其为中心的最长奇数长度回文子串的长度以及以其为左中心的最长偶数长度回文子串的长度。我们先求这两种情况下的字符串半径长度,分别使用odd和even数组保存这两种值,比如,对于如下示例字符串,两个数组的取值情况:

string a b a d d
odd 1 2 1 1 1
even 0 0 0 1 0

对于回文子串”aba”其半径为2,中心位于i = 1,半径为2,因此odd[1] = 2;对于回文子串”dd”,左中心位于i = 3,半径为1,因此even[i] = 1。

对于odd数组,当索引为i时,显然单个字符构成的子串肯定回文,因此odd[i] >= 1,然后不断往两边延伸判断是否字符相等,递增odd[i],当遇到不同的边界字符时停止。

对于even数组,当索引为i时,需要满足i < n - 1,因为i是偶数回文串的左中心,even[i] >= 0,我们从半径为0开始探测两边的字符是否相等,不断递增半径知道遇到两个不相等的字符。

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
string violent(string str)
{
int n = str.length();
int odd[n];
int even[n];

for (int i = 0; i < n; ++i)
{
odd[i] = 1;
while (i - odd[i] >= 0 && i + odd[i] < n
&& str[i - odd[i]] == str[i + odd[i]])
++odd[i];
even[i] = 0;
while (i - even[i]>= 0 && i + even[i] + 1 < n
&& str[i - even[i]] == str[i + even[i] + 1])
++even[i];
}

// case 1: in odd len = 2 * r - 1
// case 2: in even len = 2 * r
auto idx1 = max_element(odd, odd + n) - odd;
auto idx2 = max_element(even, even + n) - even;
if (odd[idx1] <= even[idx2])
return str.substr(idx2 - even[idx2] + 1, 2 * even[idx2]);
else
return str.substr(idx1 - odd[idx1] + 1, 2 * odd[idx1] - 1);
}

动态规划

对于一个字符串str,假设从索引i到j的一段子串用σ(i, j)表示,则很容易联想到,如果σ(i + 1, j - 1)是一个回文子串,且str[i] = str[j],则σ(i, j)也是一个回文子串(暂时不考虑边界问题,如i+1是否大于j-1),这已经是一个最优子结构了,因此我们可以通过动态规划,不断枚举子串的长度,求出一个表格,包含所有子串是否是回文子串的信息。

使用dp[i][j]表示σ(i, j)是否是一个合法回文子串,n为字符串str长度:

  • 单个字符构成一个合法的回文子串,即dp[i][i] = true(0 <= i < n)
  • 双字符如果相同则也构成一个合法回文子串,即如果str[i] = str[i+1],则dp[i][i+1] = true(0 < i < n - 1)
  • 对于长度大于2的子串σ(i, j),dp[i][j] = dp[i+1][j-1] && str[i] == str[j]

实现

c++程序如下,时间复杂度为O(n^2):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
string longestPalindrome(string str) {
size_t n = str.length();
bool dp[n][n];
memset(dp, 0, sizeof(dp));

size_t ans = 1;
size_t idx = 0;

for (size_t i = 0; i < n; ++i)
{
// case 1: 单个字符可以组成一个回文子串
dp[i][i] = true;

// case 2: 双字符如果相同,可以组成一个回文子串
if (i != n - 1 && str[i] == str[i+1])
{
dp[i][i+1] = true;
ans = 2;
idx = i;
}
}

for (size_t l = 3; l <= n; ++l)
{
for (size_t i = 0; i < n - l + 1; ++i)
{
size_t j = i + l - 1;
dp[i][j] = (dp[i + 1][j - 1] && str[i] == str[j]);
if (dp[i][j])
{
ans = l;
idx = i;
}
}
}

return str.substr(idx, ans);
}

Manacher

我们考虑一个例子:string = “abaacaabaefg”,按照朴素思想,枚举索引从i = 0到i = 4,显然最长的回文子串是起始处的”abaacaaba”,当我们枚举i = 5时,i落在下标为4,半径为5的子串中,因此i关于4的对称位置为3,而下标为3的最大回文子串半径我们已经求得了,i = 5时的最大回文子串肯定不小于i = 3时最大回文子串的半径。因此通过维护一个当前已知的最右边的边界,我们可以判断是否目前枚举的下标,在某个已经求出最大半径的回文子串范围中,通过对称关系,我们可以直接获取当前索引出最长回文子串的最大半径至少是多少,而不用从0开始暴力,同时,我们需要维持目前最右边界

过程

实现

根据上一节的描述,c++实现算法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
string manacher(string str)
{
string estr;
size_t n = str.length();
estr.reserve(n * 2 + 1);

estr += "^#";
for (size_t i = 0; i < n - 1; ++i)
{
estr += str[i];
estr += '#';
}
estr += str.back();
estr += "#$";

n = estr.length();
size_t dp[n];
size_t mid = 0;
dp[0] = 1;

for (size_t i = 1; i < n; ++i)
{
dp[i] = i < mid + dp[mid] ? dp[2 * mid - i] : 1;

while (estr[i + dp[i]] == estr[i - dp[i]])
++dp[i];
if (mid + dp[mid] < i + dp[i])
mid = i;
}

auto iter = max_element(dp, dp + n);
size_t len = *iter;
size_t idx = iter - dp;
len /= 2;
idx = idx / 2 - 1;
idx -= len - 1;

return str.substr(idx, 2 * len - 1);
}

复杂度分析

根据最右边界所属子串的中心的更新,即mid,可以估算复杂度。

  • 首先,mid的值根据i更新,因此mid不会超过i,即总可以找到一个镜像j,其和i关于mid对称,且j < mid < i。

  • 如果i加上镜像j的半径dp[j]没有超过最有边界,即i + dp[j] - 1 < mid + dp[mid] - 1,此时以j为中心的最长回文子串完全位于mid为中心的子串中,i为中心的回文子串不可能再扩展,dp[i] = dp[j]

  • 如果上述情况下,i + dp[j] - 1 = mid + dp[mid] - 1,此时最右边的一个字符也是dp[i]当前可以探测到的回文子串最右边的字符,此时是有可能继续往右扩展的,例如”cbabcb”,i = 4,mid = 2时,j = 0,此时i可以继续往右扩展字符,探测到b,组成回文串”bcb”

因此,该算法的复杂度即最有边界不断往右移动的复杂度O(n)。