|
FD.io VPP
v21.10.1-2-g0a485f517
Vector Packet Processing
|
Go to the documentation of this file.
20 #include <sys/eventfd.h>
21 #include <sys/socket.h>
38 ASSERT (elt_index < ring->nitems);
45 pthread_mutexattr_t attr;
46 pthread_condattr_t cattr;
51 if (pthread_mutexattr_init (&attr))
53 if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
55 if (pthread_mutexattr_setrobust (&attr, PTHREAD_MUTEX_ROBUST))
57 if (pthread_mutex_init (&sq->
mutex, &attr))
59 if (pthread_mutexattr_destroy (&attr))
61 if (pthread_condattr_init (&cattr))
63 if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
65 if (pthread_cond_init (&sq->
condvar, &cattr))
67 if (pthread_condattr_destroy (&cattr))
88 ring = (
void *) ((
u8 *) smq->
q + q_sz);
95 ring = (
void *) ((
u8 *) ring +
offset);
107 uword rings_sz = 0, mq_sz;
157 ring = (
void *) ((
u8 *) smq->
q + q_sz);
158 for (
i = 0;
i < n_rings;
i++)
164 ring = (
void *) ((
u8 *) ring +
offset);
201 (void) pthread_cond_broadcast (&mq->
q.
shr->
condvar);
204 pthread_mutex_unlock (&mq->
q.
shr->
mutex);
208 int __clib_unused
rv;
309 clib_warning (
"message out of order: elt %u head %u ring %u",
325 u32 dist1, dist2, tail, head;
342 return (dist1 < dist2);
435 if (sq->
head + to_deq < sq->maxsize)
445 sq->
elsize * (to_deq - first_batch));
494 if ((fd = eventfd (0, 0)) < 0)
520 pthread_mutex_unlock (&mq->
q.
shr->
mutex);
581 rv = pthread_mutex_lock (&sq->
mutex);
584 rv = pthread_mutex_consistent (&sq->
mutex);
590 if (sz != 0 && sz != sq->
maxsize)
592 pthread_mutex_unlock (&sq->
mutex);
597 ts.tv_nsec = (timeout - (
u32) timeout) * 1e9;
600 pthread_mutex_unlock (&sq->
mutex);
609 tv.tv_sec = (
u64) timeout;
610 tv.tv_usec = ((
u64) timeout - (
u64) timeout) * 1e9;
611 rv = setsockopt (mq->
q.
evtfd, SOL_SOCKET, SO_RCVTIMEO,
612 (
const char *) &tv,
sizeof tv);
623 return rv < 0 ? errno : 0;
static u32 svm_msg_q_size(svm_msg_q_t *mq)
Check length of message queue.
static void clib_spinlock_init(clib_spinlock_t *p)
u32 n_rings
number of msg rings
static f64 unix_time_now(void)
int svm_msg_q_sub_raw_batch(svm_msg_q_t *mq, svm_msg_q_msg_t *msg_buf, u32 n_msgs)
Consumer dequeue multiple messages from queue.
u32 nitems
max size of the ring
int svm_msg_q_sub(svm_msg_q_t *mq, svm_msg_q_msg_t *msg, svm_q_conditional_wait_t cond, u32 time)
Consumer dequeue one message from queue.
void svm_msg_q_add_and_unlock(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Producer enqueue one message to queue with mutex held.
svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring(svm_msg_q_t *mq, u32 ring_index)
Allocate message buffer on ring.
static int svm_msg_q_try_lock(svm_msg_q_t *mq)
Try locking message queue.
svm_msg_q_shared_t * svm_msg_q_init(void *base, svm_msg_q_cfg_t *cfg)
svm_msg_q_ring_t * svm_msg_q_ring(svm_msg_q_t *mq, u32 ring_index)
Get message queue ring.
void * svm_msg_q_msg_data(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Get data for message in queue.
static void clib_mem_free(void *p)
volatile u32 tail
current tail (for enqueue)
u32 q_nitems
msg queue size (not rings)
@ SVM_Q_NOWAIT
non-blocking call - works with both condvar and eventfd signaling
static int svm_msg_q_lock(svm_msg_q_t *mq)
Lock, or block trying, the message queue.
u32 elt_index
index in ring
Unidirectional shared-memory multi-ring message queue.
int svm_msg_q_alloc_eventfd(svm_msg_q_t *mq)
Allocate event fd for queue.
svm_msg_q_shared_t * svm_msg_q_alloc(svm_msg_q_cfg_t *cfg)
Allocate message queue.
static_always_inline void * clib_memcpy_fast(void *restrict dst, const void *restrict src, size_t n)
void svm_msg_q_free(svm_msg_q_t *mq)
Free message queue.
#define clib_unix_warning(format, args...)
svm_msg_q_shared_queue_t q[0]
queue for exchanging messages
u8 * format_svm_msg_q(u8 *s, va_list *args)
Format message queue, shows msg count for each ring.
int svm_msg_q_wait_prod(svm_msg_q_t *mq)
Wait for message queue event as producer.
static u8 svm_msg_q_is_empty(svm_msg_q_t *mq)
Check if message queue is empty.
static void svm_msg_q_init_mutex(svm_msg_q_shared_queue_t *sq)
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
struct svm_msg_q_shared_ svm_msg_q_shared_t
void svm_msg_q_cleanup(svm_msg_q_t *mq)
Cleanup mq's private data.
static u8 svm_msg_q_is_full(svm_msg_q_t *mq)
Check if message queue is full.
struct svm_msg_q_shr_queue_ svm_msg_q_shared_queue_t
static svm_msg_q_ring_t * svm_msg_q_ring_inline(svm_msg_q_t *mq, u32 ring_index)
#define vec_elt_at_index(v, i)
Get vector value at index i checking that i is in bounds.
u8 data[0]
chunk of memory for msg data
struct clib_bihash_value offset
template key/value backing page structure
svm_msg_q_shared_queue_t * shr
pointer to shared queue
static void svm_msg_q_add_raw(svm_msg_q_t *mq, u8 *elem)
u32 ring_index
ring index, could be u8
struct svm_msg_q_ring_shared_ svm_msg_q_ring_shared_t
int evtfd
producer/consumer eventfd
static void clib_spinlock_free(clib_spinlock_t *p)
svm_msg_q_ring_cfg_t * ring_cfgs
array of ring cfgs
u32 n_rings
number of rings after q
u32 elsize
size of an element
#define vec_validate(V, I)
Make sure vector is long enough for given index (no header, unspecified alignment)
void svm_msg_q_attach(svm_msg_q_t *mq, void *smq_base)
u32 nitems
max size of the ring
#define clib_atomic_fetch_add_rel(a, b)
#define CLIB_CACHE_LINE_BYTES
svm_msg_q_ring_shared_t * shr
ring in shared memory
static void svm_msg_q_send_signal(svm_msg_q_t *mq, u8 is_consumer)
uword svm_msg_q_size_to_alloc(svm_msg_q_cfg_t *cfg)
static void * svm_msg_q_ring_data(svm_msg_q_ring_t *ring, u32 elt_index)
#define clib_atomic_fetch_add_relax(a, b)
#define vec_free(V)
Free vector's memory (no header).
void svm_msg_q_set_eventfd(svm_msg_q_t *mq, int fd)
Set event fd for queue.
int svm_msg_q_timedwait(svm_msg_q_t *mq, double timeout)
Timed wait for message queue event.
void svm_msg_q_free_msg(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Free message buffer.
template key/value backing page structure
@ SVM_Q_TIMEDWAIT
blocking call, returns on signal or time-out - best used in combination with condvars,...
description fragment has unexpected format
#define clib_atomic_fetch_sub_relax(a, b)
volatile u32 cursize
current size of the ring
#define clib_atomic_load_relax_n(a)
#define vec_foreach(var, vec)
Vector iterator.
for(i=1;i<=collision_buckets;i++)
u32 elsize
size of an element
svm_msg_q_ring_t * rings
rings with message data
clib_memset(h->entries, 0, sizeof(h->entries[0]) *entries)
int svm_msg_q_lock_and_alloc_msg_w_ring(svm_msg_q_t *mq, u32 ring_index, u8 noblock, svm_msg_q_msg_t *msg)
Lock message queue and allocate message buffer on ring.
#define clib_warning(format, args...)
static void * clib_mem_alloc_aligned(uword size, uword align)
clib_spinlock_t lock
private lock for multi-producer
enum svm_msg_q_wait_type_ svm_msg_q_wait_type_t
static u8 svm_msg_q_ring_is_full(svm_msg_q_t *mq, u32 ring_index)
int svm_msg_q_sub_raw(svm_msg_q_t *mq, svm_msg_q_msg_t *elem)
Consumer dequeue one message from queue.
svm_msg_q_msg_t svm_msg_q_alloc_msg(svm_msg_q_t *mq, u32 nbytes)
Allocate message buffer.
svm_msg_q_queue_t q
queue for exchanging messages
int svm_msg_q_wait(svm_msg_q_t *mq, svm_msg_q_wait_type_t type)
Wait for message queue event.
vl_api_fib_path_type_t type
static int svm_msq_q_msg_is_valid(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
int svm_msg_q_add(svm_msg_q_t *mq, svm_msg_q_msg_t *msg, int nowait)
Producer enqueue one message to queue.
volatile u32 head
current head (for dequeue)
static void svm_msg_q_unlock(svm_msg_q_t *mq)
Unlock message queue.