亲宝软件园·资讯

展开

dpdk中QSBR具体实现

可酷可乐 人气:2
[TOC] # dpdk-QSBR实现 dpdk19.01提供了qsbr模式的rcu库,其具体实现在`lib/librte_rcu`目录中。 `librte_rcu`是无锁线程安全的,这个库提供了读者报告静默状态的能力,让写者知道读者是否进入过静默状态。 # 初始化 初始化时,会用到一些通过的工具宏,定义在在`dpdk-master/lib/librte_eal/common/include/rte_common.h`中。如下: ```c #define RTE_CACHE_LINE_SIZE 64 #define RTE_ALIGN_MUL_CEIL(v, mul) \ ((v + 64 - 1)/64) * 64 // (64地板除 + 1)*64 #define RTE_ALIGN_FLOOR(val, 64) \ val & (~(64 - 1)) // 64的地板除 #define RTE_ALIGN_CEIL(val, 64) \ RTE_ALIGN_FLOOR(val + 64 - 1, 64) // 64的地板除 + 1 #define RTE_ALIGN(val, align) RTE_ALIGN_CEIL(val, align) // 64的地板除 + 1 ``` 在`dpdk-master\lib\librte_rcu\rte_rcu_qsbr.h`中,定义了初始化时用到的一些函数与宏。 ```c /* 工作线程计数器 */ struct rte_rcu_qsbr_cnt { uint64_t cnt; // 静默态计数器,0表示下线。使用64bits,防止计数溢出 uint32_t lock_cnt; // counter锁, 用于CONFIG_RTE_LIBRTE_RCU_DEBUG } __rte_cache_aligned; #define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8) // 数组元素大小为64 B #define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads)\ RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, 64) >> 3, RTE_CACHE_LINE_SIZE) // 计算得到线程数组的大小 /* * (struct rte_rcu_qsbr_cnt *)(v + 1): 获得 v中 rte_rcu_qsbr_cnt 的地址偏移,此时指针p变为 struct rte_rcu_qsbr_cnt *类型 * + v->max_threads: 得到 v中thread id_array的偏移, * + i */ #define __RTE_QSBR_THRID_ARRAY_ELM(v, i) // 获得线程数组的第 i 个 ((uint64_t *) ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i) #define __RTE_QSBR_THRID_INDEX_SHIFT 6 #define __RTE_QSBR_THRID_MASK 0x3f #define RTE_QSBR_THRID_INVALID 0xffffffff /* * 获得QSBR变量的内存大小,包括rte_rcu_qsbr + thread ID bitmap array变量 */ size_t rte_rcu_qsbr_get_memsize(uint32_t max_threads) { size_t sz; // rcu_qsbr sz = sizeof(struct rte_rcu_qsbr); /* Add the size of quiescent state counter array */ sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads; /* Add the size of the registered thread ID bitmap array */ sz += __RTE_QSBR_THRID_ARRAY_SIZE(max_threads); return sz; } ``` qsbr rcu真正的初始化在函数`rte_rcu_qsbr_init()`中,主要是初始化变量的值。 ```c int rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads) { size_t sz; sz = rte_rcu_qsbr_get_memsize(max_threads); if (sz == 1) return 1; /* Set all the threads to offline */ memset(v, 0, sz); // 获得大小,初始化为零 v->max_threads = max_threads; v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads, __RTE_QSBR_THRID_ARRAY_ELM_SIZE) / __RTE_QSBR_THRID_ARRAY_ELM_SIZE; // 根据最大线程数,获得 thread_id array的元素个数 v->token = __RTE_QSBR_CNT_INIT; v->acked_token = __RTE_QSBR_CNT_INIT - 1; return 0; } ``` 其中, `rte_rcu_qsbr_init` 函数中的参数中,传入了全局变量`rte_rcu_qsbr`,其存储了静默期版本号,以及所有注册了的线程的thread_Id与局部静默期版本号。 此变量定义如下: ```c struct rte_rcu_qsbr { uint64_t token __rte_cache_aligned; // 允许多个并发静态查询的计数器 /**< Counter to allow for multiple concurrent quiescent state queries */ uint64_t acked_token; /**< Least token acked by all the threads in the last call to * rte_rcu_qsbr_check API. */ uint32_t num_elems __rte_cache_aligned; /**< Number of elements in the thread ID array */ uint32_t num_threads; /**< Number of threads currently using this QS variable */ uint32_t max_threads; /**< Maximum number of threads using this QS variable */ struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned; /**< Quiescent state counter array of 'max_threads' elements */ /**< Registered thread IDs are stored in a bitmap array, * after the quiescent state counter array. */ } __rte_cache_aligned; ``` # 注册与注销 通过`rte_rcu_qsbr_thread_register`函数,注册一个读者线程的thread_id到 全局变量 `rte_rcu_qsbr` 的 thread 数组位图中,并更新线程数`num_threads`。 ```c int rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id) { unsigned int i, id, success; uint64_t old_bmap, new_bmap; id = thread_id & __RTE_QSBR_THRID_MASK; // thread_id%64, 表示bits<64>中位图中的哪一位 i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT; // thread_id/64,表示uint64_t数组的索引 /* * 确保已注册线程的计数器不会不同步。因此,需要额外的检查。 */ old_bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i), __ATOMIC_RELAXED); // 获得 thread_id所在的 bits<64> if (old_bmap & 1UL << id) // bits<64>中的id位是否为1 return 0; // 等于1,表示已注册,则返回 do { // 若没有注册,则注册,并对num_threads + 1 new_bmap = old_bmap | (1UL << id); / success = __atomic_compare_exchange( __RTE_QSBR_THRID_ARRAY_ELM(v, i), &old_bmap, &new_bmap, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED); if (success) __atomic_fetch_add(&v->num_threads, // 加1 1, __ATOMIC_RELAXED); else if (old_bmap & (1UL << id)) // 抢注册 return 0; } while (success == 0); return 0; } ``` 通过`rte_rcu_qsbr_thread_unregister`函数将读线程的thread_id 从全局变量 `rte_rcu_qsbr` 的 thread数组位图中移除。 ```c int rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id) { unsigned int i, id, success; uint64_t old_bmap, new_bmap; __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n", v->qsbr_cnt[thread_id].lock_cnt); id = thread_id & __RTE_QSBR_THRID_MASK; i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT; /* Make sure that the counter for registered threads does not * go out of sync. Hence, additional checks are required. */ /* Check if the thread is already unregistered */ old_bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i), __ATOMIC_RELAXED); if (!(old_bmap & (1UL << id))) return 0; do { new_bmap = old_bmap & ~(1UL << id); /* Make sure any loads of the shared data structure are * completed before removal of the thread from the list of * reporting threads. */ success = __atomic_compare_exchange( __RTE_QSBR_THRID_ARRAY_ELM(v, i), &old_bmap, &new_bmap, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED); if (success) __atomic_fetch_sub(&v->num_threads, 1, __ATOMIC_RELAXED); else if (!(old_bmap & (1UL << id))) /* Someone else unregistered this thread. * Counter should not be incremented. */ return 0; } while (success == 0); return 0; } ``` # 上线与下线 线程的上线通过`rte_rcu_qsbr_thread_online()`函数将局部静默期版本号更新到全局版本。 `rte_rcu_qsbr_thread_online()`函数的简化版本如下: ```c static __rte_always_inline void rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id) { uint64_t t; t = __atomic_load_n(&v->token, __ATOMIC_RELAXED); // 获得全局版本号 __atomic_store_n(&v->qsbr_cnt[thread_id].cnt, // 更新本线程的局部静默期版本号 t, __ATOMIC_RELAXED); } ``` 线程的下线就是通过`rte_rcu_qsbr_thread_offline()`函数,将局部静默期版本号设置为0。 ```c __rte_experimental static __rte_always_inline void rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id) { __atomic_store_n(&v->qsbr_cnt[thread_id].cnt, 0, __ATOMIC_RELEASE); } ``` # 等待静默 通过`rte_rcu_qsbr_synchronize()`函数等待所有线程进入过静默期,其主要工作如下: - 首先,对全局的静默期的版本加1; - 然后,判断本线程局部静默期版本是否等于全局的,若不等于,则更新到最新; - 最后,遍历所有注册了的并且在线的线程的静默期版本号`cnt`的值,确定是否所有线程都进入过本次静默期,若没有,则等待所有读线程都进入过静默状态。 ```c void rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id) { uint64_t t; t = rte_rcu_qsbr_start(v); // 将 v->token 加1,并存储在局部变量中 /* 若当前线程还在临界区,更新其静默状态 */ if (thread_id != RTE_QSBR_THRID_INVALID) // 0xffffffff rte_rcu_qsbr_quiescent(v, thread_id); // 更新本线程的 v->qsbr_cnt[thread_id].cnt 到最新token /* 等待其他读者进入静默期 */ rte_rcu_qsbr_check(v, t, true); } ``` **注意:** 线程每调用一次`rte_rcu_qsbr_synchronize()`函数,全局的静默期版本号token就会加1。 因为多个线程同时调用此函数,线程的局部静默期版本号cnt一般会小于全局好几个版本。 事实上,若线程调用了一次`rte_rcu_qsbr_synchronize()`,其版本号就会大于存储在其他线程局部变量`t`中的全局版本号。 具体是通过`rte_rcu_qsbr_check()`判断所有线程是否都进行了本次静默。 ```c __rte_experimental static __rte_always_inline int rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait) { /* 判断是否所有线程都进入过静默期 */ if (likely(t <= v->acked_token)) return 1; /* 若没有确认过,则遍历线程确认。 */ if (likely(v->num_threads == v->max_threads)) return __rte_rcu_qsbr_check_all(v, t, wait); else return __rte_rcu_qsbr_check_selective(v, t, wait); } ``` 其中,`__rte_rcu_qsbr_check_all()`函数与`__rte_rcu_qsbr_check_selective()`函数类似, 都是通过遍历注册在thread_id array中的所有线程的cnt,判断是否所有线程进入过静默期。下面,以函数`__rte_rcu_qsbr_check_all()`进行说明。 ```c static __rte_always_inline int __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait) { uint32_t i, j, id; uint64_t bmap; uint64_t c; uint64_t *reg_thread_id; uint64_t acked_token = __RTE_QSBR_CNT_MAX; // ((uint64_t)~0) /* 遍历注册在thread_id array中的所有线程的版本,等待所有线程进入过静默期 */ for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0); // 获得第0个 thread_id array元素 i < v->num_elems; // thread_id array 元素个数 i++, reg_thread_id++) { /* 获得bmap所标识的所有线程id的公共前缀 */ bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE); id = i << __RTE_QSBR_THRID_INDEX_SHIFT; // while (bmap) { /* 获得线程的id,以及对应的计数器 */ j = __builtin_ctzl(bmap); // bmap中的第一个注册线程 c = __atomic_load_n( // 获得线程id的cnt &v->qsbr_cnt[id + j].cnt, // id + j = thread_id __ATOMIC_ACQUIRE); /* 若线程没有下线,并且静默期号小于t,则等待,直到其大于等于 */ if (unlikely(c != __RTE_QSBR_CNT_THR_OFFLINE && c < t)) { /* This thread is not in quiescent state */ if (!wait) // 若不等待则直接返回 return 0; rte_pause(); // 暂定CPU执行一小段时间 bmap = __atomic_load_n(reg_thread_id, // 重新查看未退出注册的线程,是否进入静默期 __ATOMIC_ACQUIRE); continue; } /* 更新acked_token到最新版本 */ if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c) acked_token = c; bmap &= ~(1UL << j); } } if (acked_token != __RTE_QSBR_CNT_MAX) __atomic_store_n(&v->acked_token, acked_token, // 若所有的读者都已经进入过静默期,则将最新的静默期版本更新 __ATOMIC_RELAXED); return 1; } ``` **示例:** 在`dpdk/app/test/test_rcu_qsbr.c`中, # 附录 1. `type __atomic_load_n (type *ptr, int memorder)`,GCC内建函数,实现原子的加载操作,返回`*ptr` 有限的 `memorder`有:`__ATOMIC_RELAXED, __ATOMIC_SEQ_CST, __ATOMIC_ACQUIRE, __ATOMIC_CONSUME` 目前最新版本的gcc、clang的原子操作实现均符合c++11定义的原子操作6种内存模型: `__ATOMIC_RELAXED` No barriers or synchronization. `__ATOMIC_CONSUME` Data dependency only for both barrier and synchronization with another thread. `__ATOMIC_ACQUIRE` Barrier to hoisting of code and synchronizes with release (or stronger) semantic stores from another thread. `__ATOMIC_RELEASE` Barrier to sinking of code and synchronizes with acquire (or stronger) semantic loads from another thread. `__ATOMIC_ACQ_REL` Full barrier in both directions and synchronizes with acquire loads and release stores in another thread. `__ATOMIC_SEQ_CST` Full barrier in both directions and synchronizes with acquire loads and release stores in all threads. 详见 http://gcc.gnu.org/wiki/Atomic/GCCMM/AtomicSync 2. `void __atomic_store_n (type *ptr, type val, int memorder)`,GCC内建函数,实现原子的存操作,将val的值写入*ptr。 3. `__builtin_ctz(x):` 计算器`x`二进制表示,末尾有多少个0。 例如,a = 16,其二进制表示是 00000000 00000000 00000000 00010000,输出为ctz = 4 类似的函数有`__builtin_ctzl(x)`与`__builtin_ctzll(x)`,分别用于long类型,与long long类型的数据。 4. `static void rte_pause(void)`: 暂停CPU执行一段时间, 此调用用于轮询共享资源或等待事件的紧循环。在回路中短暂的停顿可以降低功耗。 [原文阅读]( http://www.nfvschool.cn/?p=783) # 参考 - [gcc-docs](https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html)

加载全部内容

相关教程
猜你喜欢
用户评论