36 pthread_mutexattr_t attr;
37 pthread_condattr_t cattr;
44 q->producer_evtfd = -1;
45 q->consumer_evtfd = -1;
50 if (pthread_mutexattr_init (&attr))
52 if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
54 if (pthread_mutexattr_setrobust (&attr, PTHREAD_MUTEX_ROBUST))
56 if (pthread_mutex_init (&q->mutex, &attr))
58 if (pthread_mutexattr_destroy (&attr))
60 if (pthread_condattr_init (&cattr))
63 if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
65 if (pthread_cond_init (&q->condvar, &cattr))
67 if (pthread_condattr_destroy (&cattr))
82 q->consumer_pid = consumer_pid;
93 (void) pthread_mutex_destroy (&q->mutex);
94 (void) pthread_cond_destroy (&q->condvar);
101 int rv = pthread_mutex_lock (&q->mutex);
103 pthread_mutex_consistent (&q->mutex);
109 int rv = pthread_mutex_trylock (&q->mutex);
111 rv = pthread_mutex_consistent (&q->mutex);
118 pthread_mutex_unlock (&q->mutex);
124 return q->cursize == q->maxsize;
130 if (q->producer_evtfd == -1)
132 (void) pthread_cond_broadcast (&q->condvar);
136 int __clib_unused rv, fd;
138 ASSERT (q->consumer_evtfd > 0 && q->producer_evtfd > 0);
139 fd = is_prod ? q->producer_evtfd : q->consumer_evtfd;
140 rv = write (fd, &data,
sizeof (data));
155 if (q->producer_evtfd == -1)
157 pthread_cond_wait (&q->condvar, &q->mutex);
163 u32 cursize = q->cursize;
165 while (q->cursize == cursize)
182 ts.tv_nsec = (timeout - (
u32) timeout) * 1e9;
184 if (q->producer_evtfd == -1)
186 return pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
191 u32 cursize = q->cursize;
216 int need_broadcast = 0;
220 while (q->cursize == q->maxsize)
224 tailp = (
i8 *) (&q->data[0] + q->elsize * q->tail);
230 need_broadcast = (q->cursize == 1);
232 if (q->tail == q->maxsize)
245 tailp = (
i8 *) (&q->data[0] + q->elsize * q->tail);
248 q->tail = (q->tail + 1) % q->maxsize;
263 int need_broadcast = 0;
283 while (q->cursize == q->maxsize)
287 tailp = (
i8 *) (&q->data[0] + q->elsize * q->tail);
293 need_broadcast = (q->cursize == 1);
295 if (q->tail == q->maxsize)
313 int need_broadcast = 0;
333 while (q->cursize + 1 == q->maxsize)
337 tailp = (
i8 *) (&q->data[0] + q->elsize * q->tail);
343 if (q->tail == q->maxsize)
346 need_broadcast = (q->cursize == 1);
348 tailp = (
i8 *) (&q->data[0] + q->elsize * q->tail);
354 if (q->tail == q->maxsize)
373 int need_broadcast = 0;
396 while (q->cursize == 0 && rc == 0)
407 while (q->cursize == 0)
412 headp = (
i8 *) (&q->data[0] + q->elsize * q->head);
417 if (q->cursize == q->maxsize)
422 if (q->head == q->maxsize)
446 headp = (
i8 *) (&q->data[0] + q->elsize * q->head);
450 need_broadcast = (q->cursize == q->maxsize / 2);
471 while (q->cursize == 0)
475 headp = (
i8 *) (&q->data[0] + q->elsize * q->head);
478 need_broadcast = q->cursize == q->maxsize;
480 q->head = (q->head + 1) % q->maxsize;
492 q->producer_evtfd = fd;
498 q->consumer_evtfd = fd;
void svm_queue_add_raw(svm_queue_t *q, u8 *elem)
Add element to queue with mutex held.
int svm_queue_is_full(svm_queue_t *q)
int svm_queue_add(svm_queue_t *q, u8 *elem, int nowait)
Optimized string handling code, including c11-compliant "safe C library" variants.
#define clib_memcpy_fast(a, b, c)
void svm_queue_set_producer_event_fd(svm_queue_t *q, int fd)
Set producer's event fd.
static int svm_queue_timedwait_inline(svm_queue_t *q, double timeout)
clib_memset(h->entries, 0, sizeof(h->entries[0]) *entries)
int svm_queue_sub2(svm_queue_t *q, u8 *elem)
void svm_queue_unlock(svm_queue_t *q)
blocking call, returns on signal or time-out - best used in combination with condvars, with eventfds we don't yield the cpu
int svm_queue_sub(svm_queue_t *q, u8 *elem, svm_q_conditional_wait_t cond, u32 time)
static f64 unix_time_now(void)
void svm_queue_wait(svm_queue_t *q)
Wait for queue event.
int svm_queue_add2(svm_queue_t *q, u8 *elem, u8 *elem2, int nowait)
void svm_queue_lock(svm_queue_t *q)
static void svm_queue_send_signal_inline(svm_queue_t *q, u8 is_prod)
static void svm_queue_wait_inline(svm_queue_t *q)
void svm_queue_free(svm_queue_t *q)
void svm_queue_set_consumer_event_fd(svm_queue_t *q, int fd)
Set consumer's event fd.
static int svm_queue_trylock(svm_queue_t *q)
svm_queue_t * svm_queue_alloc_and_init(int nels, int elsize, int consumer_pid)
Allocate and initialize svm queue.
static void clib_mem_free(void *p)
int svm_queue_sub_raw(svm_queue_t *q, u8 *elem)
#define clib_unix_warning(format, args...)
svm_queue_t * svm_queue_init(void *base, int nels, int elsize)
int svm_queue_add_nolock(svm_queue_t *q, u8 *elem)
struct _svm_queue svm_queue_t
static void * clib_mem_alloc_aligned(uword size, uword align)
int svm_queue_timedwait(svm_queue_t *q, double timeout)
Timed wait for queue event.
#define CLIB_CACHE_LINE_BYTES
void svm_queue_send_signal(svm_queue_t *q, u8 is_prod)
non-blocking call - works with both condvar and eventfd signaling