structbuf_ring { volatileuint32_t br_prod_head; volatileuint32_t br_prod_tail; int br_prod_size; int br_prod_mask; uint64_t br_drops; uint64_t br_prod_bufs; uint64_t br_prod_bytes; /* * Pad out to next L2 cache line */ uint64_t _pad0[11];
volatileuint32_t br_cons_head; volatileuint32_t br_cons_tail; int br_cons_size; int br_cons_mask; /* * Pad out to next L2 cache line */ uint64_t _pad1[14]; #ifdef DEBUG_BUFRING structmtx *br_lock; #endif void *br_ring[0]; };
#ifdef DEBUG_BUFRING br->br_ring[cons_head] = NULL; if (!mtx_owned(br->br_lock)) panic("lock not held on single consumer dequeue"); if (br->br_cons_tail != cons_head) panic("inconsistent list cons_tail=%d cons_head=%d", br->br_cons_tail, cons_head); #endif // 更新全局cons_tail br->br_cons_tail = cons_next; return (buf); }
/* * multi-producer safe lock-free ring buffer enqueue * */ static __inline int buf_ring_enqueue_bytes(struct buf_ring *br, void *buf, int nbytes) { // local variable uint32_t prod_head, prod_next; uint32_t cons_tail; int success; #ifdef DEBUG_BUFRING int i; // Check whether buf has enqueued for (i = br->br_cons_head; i != br->br_prod_head; i = ((i + 1) & br->br_cons_mask)) if(br->br_ring[i] == buf) panic("buf=%p already enqueue at %d prod=%d cons=%d", buf, i, br->br_prod_tail, br->br_cons_tail); #endif critical_enter(); do { // 读取最新的全局prod_head和prod_tail prod_head = br->br_prod_head; cons_tail = br->br_cons_tail;
prod_next = (prod_head + 1) & br->br_prod_mask; // check是否还有空闲空间 if (prod_next == cons_tail) { critical_exit(); return (ENOBUFS); } // 如果当前全局prod_head和本地局部prod_head相同,则更新全局prod_head为prod_next success = atomic_cmpset_int(&br->br_prod_head, prod_head, prod_next); } while (success == 0); #ifdef DEBUG_BUFRING if (br->br_ring[prod_head] != NULL) panic("dangling value in enqueue"); #endif // 入队,保存buf br->br_ring[prod_head] = buf; wmb();
/* * If there are other enqueues in progress * that preceeded us, we need to wait for them * to complete */ // 等待已经更新完的线程执行完 while (br->br_prod_tail != prod_head) cpu_spinwait(); br->br_prod_bufs++; br->br_prod_bytes += nbytes; br->br_prod_tail = prod_next; // 退出临界区 critical_exit(); return (0); }
buf = br->br_ring[cons_head]; #ifdef DEBUG_BUFRING br->br_ring[cons_head] = NULL; #endif rmb(); /* * If there are other dequeues in progress * that preceeded us, we need to wait for them * to complete */ // 等待在当前线程更新全局cons_head之前的线程执行完毕 while (br->br_cons_tail != cons_head) cpu_spinwait();
// update global cons_next and exit br->br_cons_tail = cons_next; critical_exit();