36 pthread_mutexattr_t attr;
37 pthread_condattr_t cattr;
40 memset (q, 0,
sizeof (*q));
44 q->producer_evtfd = -1;
45 q->consumer_evtfd = -1;
47 memset (&attr, 0,
sizeof (attr));
48 memset (&cattr, 0,
sizeof (cattr));
50 if (pthread_mutexattr_init (&attr))
52 if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
54 if (pthread_mutex_init (&q->mutex, &attr))
56 if (pthread_mutexattr_destroy (&attr))
58 if (pthread_condattr_init (&cattr))
61 if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
63 if (pthread_cond_init (&q->condvar, &cattr))
65 if (pthread_condattr_destroy (&cattr))
78 memset (q, 0,
sizeof (*q));
80 q->consumer_pid = consumer_pid;
91 (void) pthread_mutex_destroy (&q->mutex);
92 (void) pthread_cond_destroy (&q->condvar);
99 pthread_mutex_lock (&q->mutex);
105 pthread_mutex_unlock (&q->mutex);
111 return q->cursize == q->maxsize;
117 if (q->producer_evtfd == -1)
119 (void) pthread_cond_broadcast (&q->condvar);
123 int __clib_unused rv, fd;
125 ASSERT (q->consumer_evtfd > 0 && q->producer_evtfd > 0);
126 fd = is_prod ? q->producer_evtfd : q->consumer_evtfd;
127 rv = write (fd, &data,
sizeof (data));
134 if (q->producer_evtfd == -1)
136 pthread_cond_wait (&q->condvar, &q->mutex);
142 u32 cursize = q->cursize;
143 pthread_mutex_unlock (&q->mutex);
144 while (q->cursize == cursize)
146 pthread_mutex_lock (&q->mutex);
161 ts.tv_nsec = (timeout - (
u32) timeout) * 1e9;
163 if (q->producer_evtfd == -1)
165 return pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
170 u32 cursize = q->cursize;
173 pthread_mutex_unlock (&q->mutex);
177 pthread_mutex_lock (&q->mutex);
195 int need_broadcast = 0;
199 while (q->cursize == q->maxsize)
203 tailp = (
i8 *) (&q->data[0] + q->elsize * q->tail);
209 need_broadcast = (q->cursize == 1);
211 if (q->tail == q->maxsize)
224 tailp = (
i8 *) (&q->data[0] + q->elsize * q->tail);
227 q->tail = (q->tail + 1) % q->maxsize;
242 int need_broadcast = 0;
247 if (pthread_mutex_trylock (&q->mutex))
253 pthread_mutex_lock (&q->mutex);
259 pthread_mutex_unlock (&q->mutex);
262 while (q->cursize == q->maxsize)
266 tailp = (
i8 *) (&q->data[0] + q->elsize * q->tail);
272 need_broadcast = (q->cursize == 1);
274 if (q->tail == q->maxsize)
280 pthread_mutex_unlock (&q->mutex);
292 int need_broadcast = 0;
297 if (pthread_mutex_trylock (&q->mutex))
303 pthread_mutex_lock (&q->mutex);
309 pthread_mutex_unlock (&q->mutex);
312 while (q->cursize + 1 == q->maxsize)
316 tailp = (
i8 *) (&q->data[0] + q->elsize * q->tail);
322 if (q->tail == q->maxsize)
325 need_broadcast = (q->cursize == 1);
327 tailp = (
i8 *) (&q->data[0] + q->elsize * q->tail);
333 if (q->tail == q->maxsize)
339 pthread_mutex_unlock (&q->mutex);
352 int need_broadcast = 0;
358 if (pthread_mutex_trylock (&q->mutex))
364 pthread_mutex_lock (&q->mutex);
370 pthread_mutex_unlock (&q->mutex);
375 while (q->cursize == 0 && rc == 0)
380 pthread_mutex_unlock (&q->mutex);
386 while (q->cursize == 0)
391 headp = (
i8 *) (&q->data[0] + q->elsize * q->head);
396 if (q->cursize == q->maxsize)
401 if (q->head == q->maxsize)
407 pthread_mutex_unlock (&q->mutex);
418 pthread_mutex_lock (&q->mutex);
421 pthread_mutex_unlock (&q->mutex);
425 headp = (
i8 *) (&q->data[0] + q->elsize * q->head);
429 need_broadcast = (q->cursize == q->maxsize / 2);
434 pthread_mutex_unlock (&q->mutex);
449 while (q->cursize == 0)
453 headp = (
i8 *) (&q->data[0] + q->elsize * q->head);
456 q->head = (q->head + 1) % q->maxsize;
465 q->producer_evtfd = fd;
471 q->consumer_evtfd = fd;
void svm_queue_add_raw(svm_queue_t *q, u8 *elem)
Add element to queue with mutex held.
int svm_queue_is_full(svm_queue_t *q)
int svm_queue_add(svm_queue_t *q, u8 *elem, int nowait)
static void svm_queue_send_signal(svm_queue_t *q, u8 is_prod)
void svm_queue_set_producer_event_fd(svm_queue_t *q, int fd)
Set producer's event fd.
static int svm_queue_timedwait_inline(svm_queue_t *q, double timeout)
int svm_queue_sub2(svm_queue_t *q, u8 *elem)
void svm_queue_unlock(svm_queue_t *q)
blocking call, returns on signal or time-out - best used in combination with condvars, with eventfds we don't yield the cpu
memset(h->entries, 0, sizeof(h->entries[0])*entries)
int svm_queue_sub(svm_queue_t *q, u8 *elem, svm_q_conditional_wait_t cond, u32 time)
static f64 unix_time_now(void)
void svm_queue_wait(svm_queue_t *q)
Wait for queue event.
int svm_queue_add2(svm_queue_t *q, u8 *elem, u8 *elem2, int nowait)
void svm_queue_lock(svm_queue_t *q)
#define clib_memcpy(a, b, c)
static void svm_queue_wait_inline(svm_queue_t *q)
void svm_queue_free(svm_queue_t *q)
void svm_queue_set_consumer_event_fd(svm_queue_t *q, int fd)
Set consumer's event fd.
svm_queue_t * svm_queue_alloc_and_init(int nels, int elsize, int consumer_pid)
Allocate and initialize svm queue.
static void clib_mem_free(void *p)
int svm_queue_sub_raw(svm_queue_t *q, u8 *elem)
#define clib_unix_warning(format, args...)
svm_queue_t * svm_queue_init(void *base, int nels, int elsize)
int svm_queue_add_nolock(svm_queue_t *q, u8 *elem)
struct _svm_queue svm_queue_t
static void * clib_mem_alloc_aligned(uword size, uword align)
int svm_queue_timedwait(svm_queue_t *q, double timeout)
Timed wait for queue event.
#define CLIB_CACHE_LINE_BYTES
non-blocking call - works with both condvar and eventfd signaling