36 pthread_mutexattr_t attr;
37 pthread_condattr_t cattr;
44 q->producer_evtfd = -1;
45 q->consumer_evtfd = -1;
50 if (pthread_mutexattr_init (&attr))
52 if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
54 if (pthread_mutex_init (&q->mutex, &attr))
56 if (pthread_mutexattr_destroy (&attr))
58 if (pthread_condattr_init (&cattr))
61 if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
63 if (pthread_cond_init (&q->condvar, &cattr))
65 if (pthread_condattr_destroy (&cattr))
80 q->consumer_pid = consumer_pid;
91 (void) pthread_mutex_destroy (&q->mutex);
92 (void) pthread_cond_destroy (&q->condvar);
99 pthread_mutex_lock (&q->mutex);
105 pthread_mutex_unlock (&q->mutex);
111 return q->cursize == q->maxsize;
117 if (q->producer_evtfd == -1)
119 (void) pthread_cond_broadcast (&q->condvar);
123 int __clib_unused rv, fd;
125 ASSERT (q->consumer_evtfd > 0 && q->producer_evtfd > 0);
126 fd = is_prod ? q->producer_evtfd : q->consumer_evtfd;
127 rv = write (fd, &data,
sizeof (data));
142 if (q->producer_evtfd == -1)
144 pthread_cond_wait (&q->condvar, &q->mutex);
150 u32 cursize = q->cursize;
152 while (q->cursize == cursize)
169 ts.tv_nsec = (timeout - (
u32) timeout) * 1e9;
171 if (q->producer_evtfd == -1)
173 return pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
178 u32 cursize = q->cursize;
203 int need_broadcast = 0;
207 while (q->cursize == q->maxsize)
211 tailp = (
i8 *) (&q->data[0] + q->elsize * q->tail);
217 need_broadcast = (q->cursize == 1);
219 if (q->tail == q->maxsize)
232 tailp = (
i8 *) (&q->data[0] + q->elsize * q->tail);
235 q->tail = (q->tail + 1) % q->maxsize;
250 int need_broadcast = 0;
255 if (pthread_mutex_trylock (&q->mutex))
270 while (q->cursize == q->maxsize)
274 tailp = (
i8 *) (&q->data[0] + q->elsize * q->tail);
280 need_broadcast = (q->cursize == 1);
282 if (q->tail == q->maxsize)
300 int need_broadcast = 0;
305 if (pthread_mutex_trylock (&q->mutex))
320 while (q->cursize + 1 == q->maxsize)
324 tailp = (
i8 *) (&q->data[0] + q->elsize * q->tail);
330 if (q->tail == q->maxsize)
333 need_broadcast = (q->cursize == 1);
335 tailp = (
i8 *) (&q->data[0] + q->elsize * q->tail);
341 if (q->tail == q->maxsize)
360 int need_broadcast = 0;
366 if (pthread_mutex_trylock (&q->mutex))
383 while (q->cursize == 0 && rc == 0)
394 while (q->cursize == 0)
399 headp = (
i8 *) (&q->data[0] + q->elsize * q->head);
404 if (q->cursize == q->maxsize)
409 if (q->head == q->maxsize)
433 headp = (
i8 *) (&q->data[0] + q->elsize * q->head);
437 need_broadcast = (q->cursize == q->maxsize / 2);
458 while (q->cursize == 0)
462 headp = (
i8 *) (&q->data[0] + q->elsize * q->head);
465 need_broadcast = q->cursize == q->maxsize;
467 q->head = (q->head + 1) % q->maxsize;
479 q->producer_evtfd = fd;
485 q->consumer_evtfd = fd;
void svm_queue_add_raw(svm_queue_t *q, u8 *elem)
Add element to queue with mutex held.
int svm_queue_is_full(svm_queue_t *q)
int svm_queue_add(svm_queue_t *q, u8 *elem, int nowait)
Optimized string handling code, including c11-compliant "safe C library" variants.
#define clib_memcpy_fast(a, b, c)
void svm_queue_set_producer_event_fd(svm_queue_t *q, int fd)
Set producer's event fd.
static int svm_queue_timedwait_inline(svm_queue_t *q, double timeout)
clib_memset(h->entries, 0, sizeof(h->entries[0]) *entries)
int svm_queue_sub2(svm_queue_t *q, u8 *elem)
void svm_queue_unlock(svm_queue_t *q)
blocking call, returns on signal or time-out - best used in combination with condvars, with eventfds we don't yield the cpu
int svm_queue_sub(svm_queue_t *q, u8 *elem, svm_q_conditional_wait_t cond, u32 time)
static f64 unix_time_now(void)
void svm_queue_wait(svm_queue_t *q)
Wait for queue event.
int svm_queue_add2(svm_queue_t *q, u8 *elem, u8 *elem2, int nowait)
void svm_queue_lock(svm_queue_t *q)
static void svm_queue_send_signal_inline(svm_queue_t *q, u8 is_prod)
static void svm_queue_wait_inline(svm_queue_t *q)
void svm_queue_free(svm_queue_t *q)
void svm_queue_set_consumer_event_fd(svm_queue_t *q, int fd)
Set consumer's event fd.
svm_queue_t * svm_queue_alloc_and_init(int nels, int elsize, int consumer_pid)
Allocate and initialize svm queue.
static void clib_mem_free(void *p)
int svm_queue_sub_raw(svm_queue_t *q, u8 *elem)
#define clib_unix_warning(format, args...)
svm_queue_t * svm_queue_init(void *base, int nels, int elsize)
int svm_queue_add_nolock(svm_queue_t *q, u8 *elem)
struct _svm_queue svm_queue_t
static void * clib_mem_alloc_aligned(uword size, uword align)
int svm_queue_timedwait(svm_queue_t *q, double timeout)
Timed wait for queue event.
#define CLIB_CACHE_LINE_BYTES
void svm_queue_send_signal(svm_queue_t *q, u8 is_prod)
non-blocking call - works with both condvar and eventfd signaling