FD.io VPP  v21.10.1-2-g0a485f517
Vector Packet Processing
message_queue.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2018 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <svm/message_queue.h>
17 #include <vppinfra/mem.h>
18 #include <vppinfra/format.h>
19 #include <vppinfra/time.h>
20 #include <sys/eventfd.h>
21 #include <sys/socket.h>
22 
23 static inline svm_msg_q_ring_t *
25 {
26  return vec_elt_at_index (mq->rings, ring_index);
27 }
28 
30 svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index)
31 {
32  return svm_msg_q_ring_inline (mq, ring_index);
33 }
34 
35 static inline void *
37 {
38  ASSERT (elt_index < ring->nitems);
39  return (ring->shr->data + elt_index * ring->elsize);
40 }
41 
42 static void
44 {
45  pthread_mutexattr_t attr;
46  pthread_condattr_t cattr;
47 
48  clib_memset (&attr, 0, sizeof (attr));
49  clib_memset (&cattr, 0, sizeof (cattr));
50 
51  if (pthread_mutexattr_init (&attr))
52  clib_unix_warning ("mutexattr_init");
53  if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
54  clib_unix_warning ("pthread_mutexattr_setpshared");
55  if (pthread_mutexattr_setrobust (&attr, PTHREAD_MUTEX_ROBUST))
56  clib_unix_warning ("setrobust");
57  if (pthread_mutex_init (&sq->mutex, &attr))
58  clib_unix_warning ("mutex_init");
59  if (pthread_mutexattr_destroy (&attr))
60  clib_unix_warning ("mutexattr_destroy");
61  if (pthread_condattr_init (&cattr))
62  clib_unix_warning ("condattr_init");
63  if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
64  clib_unix_warning ("condattr_setpshared");
65  if (pthread_cond_init (&sq->condvar, &cattr))
66  clib_unix_warning ("cond_init1");
67  if (pthread_condattr_destroy (&cattr))
68  clib_unix_warning ("cond_init2");
69 }
70 
72 svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg)
73 {
76  svm_msg_q_shared_t *smq;
77  u32 q_sz, offset;
78  int i;
79 
80  q_sz = sizeof (*sq) + cfg->q_nitems * sizeof (svm_msg_q_msg_t);
81 
82  smq = (svm_msg_q_shared_t *) base;
83  sq = smq->q;
84  clib_memset (sq, 0, sizeof (*sq));
85  sq->elsize = sizeof (svm_msg_q_msg_t);
86  sq->maxsize = cfg->q_nitems;
87  smq->n_rings = cfg->n_rings;
88  ring = (void *) ((u8 *) smq->q + q_sz);
89  for (i = 0; i < cfg->n_rings; i++)
90  {
91  ring->elsize = cfg->ring_cfgs[i].elsize;
92  ring->nitems = cfg->ring_cfgs[i].nitems;
93  ring->cursize = ring->head = ring->tail = 0;
94  offset = sizeof (*ring) + ring->nitems * ring->elsize;
95  ring = (void *) ((u8 *) ring + offset);
96  }
97 
99 
100  return smq;
101 }
102 
103 uword
105 {
106  svm_msg_q_ring_cfg_t *ring_cfg;
107  uword rings_sz = 0, mq_sz;
108  u32 q_sz;
109  int i;
110 
111  ASSERT (cfg);
112 
113  rings_sz = sizeof (svm_msg_q_ring_shared_t) * cfg->n_rings;
114  for (i = 0; i < cfg->n_rings; i++)
115  {
116  if (cfg->ring_cfgs[i].data)
117  continue;
118  ring_cfg = &cfg->ring_cfgs[i];
119  rings_sz += (uword) ring_cfg->nitems * ring_cfg->elsize;
120  }
121 
122  q_sz = sizeof (svm_msg_q_shared_queue_t) +
123  cfg->q_nitems * sizeof (svm_msg_q_msg_t);
124  mq_sz = sizeof (svm_msg_q_shared_t) + q_sz + rings_sz;
125 
126  return mq_sz;
127 }
128 
131 {
132  uword mq_sz;
133  u8 *base;
134 
135  mq_sz = svm_msg_q_size_to_alloc (cfg);
137  if (!base)
138  return 0;
139 
140  return svm_msg_q_init (base, cfg);
141 }
142 
143 void
144 svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base)
145 {
147  svm_msg_q_shared_t *smq;
148  u32 i, n_rings, q_sz, offset;
149 
150  smq = (svm_msg_q_shared_t *) smq_base;
151  mq->q.shr = smq->q;
152  mq->q.evtfd = -1;
153  n_rings = smq->n_rings;
154  vec_validate (mq->rings, n_rings - 1);
155  q_sz = sizeof (svm_msg_q_shared_queue_t) +
156  mq->q.shr->maxsize * sizeof (svm_msg_q_msg_t);
157  ring = (void *) ((u8 *) smq->q + q_sz);
158  for (i = 0; i < n_rings; i++)
159  {
160  mq->rings[i].nitems = ring->nitems;
161  mq->rings[i].elsize = ring->elsize;
162  mq->rings[i].shr = ring;
163  offset = sizeof (*ring) + ring->nitems * ring->elsize;
164  ring = (void *) ((u8 *) ring + offset);
165  }
166  clib_spinlock_init (&mq->q.lock);
167 }
168 
169 void
171 {
172  vec_free (mq->rings);
173  clib_spinlock_free (&mq->q.lock);
174  if (mq->q.evtfd != -1)
175  close (mq->q.evtfd);
176 }
177 
178 void
180 {
181  svm_msg_q_cleanup (mq);
182  clib_mem_free (mq->q.shr);
183  clib_mem_free (mq);
184 }
185 
186 static void
188 {
189  if (mq->q.evtfd == -1)
190  {
191  if (is_consumer)
192  {
193  int rv = pthread_mutex_lock (&mq->q.shr->mutex);
194  if (PREDICT_FALSE (rv == EOWNERDEAD))
195  {
196  rv = pthread_mutex_consistent (&mq->q.shr->mutex);
197  return;
198  }
199  }
200 
201  (void) pthread_cond_broadcast (&mq->q.shr->condvar);
202 
203  if (is_consumer)
204  pthread_mutex_unlock (&mq->q.shr->mutex);
205  }
206  else
207  {
208  int __clib_unused rv;
209  u64 data = 1;
210 
211  if (mq->q.evtfd < 0)
212  return;
213 
214  rv = write (mq->q.evtfd, &data, sizeof (data));
215  if (PREDICT_FALSE (rv < 0))
216  clib_unix_warning ("signal write on %d returned %d", mq->q.evtfd, rv);
217  }
218 }
219 
222 {
224  svm_msg_q_ring_t *ring;
225  svm_msg_q_msg_t msg;
226 
227  ring = svm_msg_q_ring_inline (mq, ring_index);
228  sr = ring->shr;
229 
230  ASSERT (sr->cursize < ring->nitems);
231  msg.ring_index = ring - mq->rings;
232  msg.elt_index = sr->tail;
233  sr->tail = (sr->tail + 1) % ring->nitems;
235  return msg;
236 }
237 
238 int
240  u8 noblock, svm_msg_q_msg_t * msg)
241 {
242  if (noblock)
243  {
244  if (svm_msg_q_try_lock (mq))
245  return -1;
247  || svm_msg_q_ring_is_full (mq, ring_index)))
248  {
249  svm_msg_q_unlock (mq);
250  return -2;
251  }
252  *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
253  }
254  else
255  {
256  svm_msg_q_lock (mq);
257  while (svm_msg_q_is_full (mq)
258  || svm_msg_q_ring_is_full (mq, ring_index))
259  svm_msg_q_wait_prod (mq);
260  *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
261  }
262  return 0;
263 }
264 
267 {
268  svm_msg_q_msg_t msg = {.as_u64 = ~0 };
270  svm_msg_q_ring_t *ring;
271 
272  vec_foreach (ring, mq->rings)
273  {
274  sr = ring->shr;
275  if (ring->elsize < nbytes || sr->cursize == ring->nitems)
276  continue;
277  msg.ring_index = ring - mq->rings;
278  msg.elt_index = sr->tail;
279  sr->tail = (sr->tail + 1) % ring->nitems;
281  break;
282  }
283  return msg;
284 }
285 
286 void *
288 {
290  return svm_msg_q_ring_data (ring, msg->elt_index);
291 }
292 
293 void
295 {
297  svm_msg_q_ring_t *ring;
298  u32 need_signal;
299 
300  ASSERT (vec_len (mq->rings) > msg->ring_index);
301  ring = svm_msg_q_ring_inline (mq, msg->ring_index);
302  sr = ring->shr;
303  if (msg->elt_index == sr->head)
304  {
305  sr->head = (sr->head + 1) % ring->nitems;
306  }
307  else
308  {
309  clib_warning ("message out of order: elt %u head %u ring %u",
310  msg->elt_index, sr->head, msg->ring_index);
311  /* for now, expect messages to be processed in order */
312  ASSERT (0);
313  }
314 
315  need_signal = clib_atomic_load_relax_n (&sr->cursize) == ring->nitems;
317 
318  if (PREDICT_FALSE (need_signal))
319  svm_msg_q_send_signal (mq, 1 /* is consumer */);
320 }
321 
322 static int
324 {
325  u32 dist1, dist2, tail, head;
327  svm_msg_q_ring_t *ring;
328 
329  if (vec_len (mq->rings) <= msg->ring_index)
330  return 0;
331 
332  ring = svm_msg_q_ring_inline (mq, msg->ring_index);
333  sr = ring->shr;
334  tail = sr->tail;
335  head = sr->head;
336 
337  dist1 = ((ring->nitems + msg->elt_index) - head) % ring->nitems;
338  if (tail == head)
339  dist2 = (sr->cursize == 0) ? 0 : ring->nitems;
340  else
341  dist2 = ((ring->nitems + tail) - head) % ring->nitems;
342  return (dist1 < dist2);
343 }
344 
345 static void
347 {
348  svm_msg_q_shared_queue_t *sq = mq->q.shr;
349  i8 *tailp;
350  u32 sz;
351 
352  tailp = (i8 *) (&sq->data[0] + sq->elsize * sq->tail);
353  clib_memcpy_fast (tailp, elem, sq->elsize);
354 
355  sq->tail = (sq->tail + 1) % sq->maxsize;
356 
357  sz = clib_atomic_fetch_add_rel (&sq->cursize, 1);
358  if (!sz)
359  svm_msg_q_send_signal (mq, 0 /* is consumer */);
360 }
361 
362 int
363 svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait)
364 {
365  ASSERT (svm_msq_q_msg_is_valid (mq, msg));
366 
367  if (nowait)
368  {
369  /* zero on success */
370  if (svm_msg_q_try_lock (mq))
371  {
372  return (-1);
373  }
374  }
375  else
376  svm_msg_q_lock (mq);
377 
378  if (PREDICT_FALSE (svm_msg_q_is_full (mq)))
379  {
380  if (nowait)
381  return (-2);
382  while (svm_msg_q_is_full (mq))
383  svm_msg_q_wait_prod (mq);
384  }
385 
386  svm_msg_q_add_raw (mq, (u8 *) msg);
387 
388  svm_msg_q_unlock (mq);
389 
390  return 0;
391 }
392 
393 void
395 {
396  ASSERT (svm_msq_q_msg_is_valid (mq, msg));
397  svm_msg_q_add_raw (mq, (u8 *) msg);
398  svm_msg_q_unlock (mq);
399 }
400 
401 int
403 {
404  svm_msg_q_shared_queue_t *sq = mq->q.shr;
405  i8 *headp;
406  u32 sz;
407 
408  ASSERT (!svm_msg_q_is_empty (mq));
409 
410  headp = (i8 *) (&sq->data[0] + sq->elsize * sq->head);
411  clib_memcpy_fast (elem, headp, sq->elsize);
412 
413  sq->head = (sq->head + 1) % sq->maxsize;
414 
415  sz = clib_atomic_fetch_sub_relax (&sq->cursize, 1);
416  if (PREDICT_FALSE (sz == sq->maxsize))
417  svm_msg_q_send_signal (mq, 1 /* is consumer */);
418 
419  return 0;
420 }
421 
422 int
424 {
425  svm_msg_q_shared_queue_t *sq = mq->q.shr;
426  u32 sz, to_deq;
427  i8 *headp;
428 
429  sz = svm_msg_q_size (mq);
430  ASSERT (sz);
431  to_deq = clib_min (sz, n_msgs);
432 
433  headp = (i8 *) (&sq->data[0] + sq->elsize * sq->head);
434 
435  if (sq->head + to_deq < sq->maxsize)
436  {
437  clib_memcpy_fast (msg_buf, headp, sq->elsize * to_deq);
438  sq->head += to_deq;
439  }
440  else
441  {
442  u32 first_batch = sq->maxsize - sq->head;
443  clib_memcpy_fast (msg_buf, headp, sq->elsize * first_batch);
444  clib_memcpy_fast (msg_buf + first_batch, sq->data,
445  sq->elsize * (to_deq - first_batch));
446  sq->head = (sq->head + to_deq) % sq->maxsize;
447  }
448 
449  clib_atomic_fetch_sub_relax (&sq->cursize, to_deq);
450  if (PREDICT_FALSE (sz == sq->maxsize))
451  svm_msg_q_send_signal (mq, 1 /* is consumer */);
452 
453  return to_deq;
454 }
455 
456 int
458  svm_q_conditional_wait_t cond, u32 time)
459 {
460  int rc = 0;
461 
462  if (svm_msg_q_is_empty (mq))
463  {
464  if (cond == SVM_Q_NOWAIT)
465  {
466  return (-2);
467  }
468  else if (cond == SVM_Q_TIMEDWAIT)
469  {
470  if ((rc = svm_msg_q_timedwait (mq, time)))
471  return rc;
472  }
473  else
474  {
476  }
477  }
478 
479  svm_msg_q_sub_raw (mq, msg);
480 
481  return 0;
482 }
483 
484 void
486 {
487  mq->q.evtfd = fd;
488 }
489 
490 int
492 {
493  int fd;
494  if ((fd = eventfd (0, 0)) < 0)
495  return -1;
496  svm_msg_q_set_eventfd (mq, fd);
497  return 0;
498 }
499 
500 int
502 {
503  u8 (*fn) (svm_msg_q_t *);
504  int rv;
505 
507 
508  if (mq->q.evtfd == -1)
509  {
510  rv = pthread_mutex_lock (&mq->q.shr->mutex);
511  if (PREDICT_FALSE (rv == EOWNERDEAD))
512  {
513  rv = pthread_mutex_consistent (&mq->q.shr->mutex);
514  return rv;
515  }
516 
517  while (fn (mq))
518  pthread_cond_wait (&mq->q.shr->condvar, &mq->q.shr->mutex);
519 
520  pthread_mutex_unlock (&mq->q.shr->mutex);
521  }
522  else
523  {
524  u64 buf;
525 
526  while (fn (mq))
527  {
528  while ((rv = read (mq->q.evtfd, &buf, sizeof (buf))) < 0)
529  {
530  if (errno != EAGAIN)
531  {
532  clib_unix_warning ("read error");
533  return rv;
534  }
535  }
536  }
537  }
538 
539  return 0;
540 }
541 
542 int
544 {
545  if (mq->q.evtfd == -1)
546  {
547  while (svm_msg_q_is_full (mq))
548  pthread_cond_wait (&mq->q.shr->condvar, &mq->q.shr->mutex);
549  }
550  else
551  {
552  u64 buf;
553  int rv;
554 
555  while (svm_msg_q_is_full (mq))
556  {
557  while ((rv = read (mq->q.evtfd, &buf, sizeof (buf))) < 0)
558  {
559  if (errno != EAGAIN)
560  {
561  clib_unix_warning ("read error");
562  return rv;
563  }
564  }
565  }
566  }
567 
568  return 0;
569 }
570 
571 int
572 svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout)
573 {
574  if (mq->q.evtfd == -1)
575  {
576  svm_msg_q_shared_queue_t *sq = mq->q.shr;
577  struct timespec ts;
578  u32 sz;
579  int rv;
580 
581  rv = pthread_mutex_lock (&sq->mutex);
582  if (PREDICT_FALSE (rv == EOWNERDEAD))
583  {
584  rv = pthread_mutex_consistent (&sq->mutex);
585  return rv;
586  }
587 
588  /* check if we're still in a signalable state after grabbing lock */
589  sz = svm_msg_q_size (mq);
590  if (sz != 0 && sz != sq->maxsize)
591  {
592  pthread_mutex_unlock (&sq->mutex);
593  return 0;
594  }
595 
596  ts.tv_sec = unix_time_now () + (u32) timeout;
597  ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
598  rv = pthread_cond_timedwait (&sq->condvar, &sq->mutex, &ts);
599 
600  pthread_mutex_unlock (&sq->mutex);
601  return rv;
602  }
603  else
604  {
605  struct timeval tv;
606  u64 buf;
607  int rv;
608 
609  tv.tv_sec = (u64) timeout;
610  tv.tv_usec = ((u64) timeout - (u64) timeout) * 1e9;
611  rv = setsockopt (mq->q.evtfd, SOL_SOCKET, SO_RCVTIMEO,
612  (const char *) &tv, sizeof tv);
613  if (rv < 0)
614  {
615  clib_unix_warning ("setsockopt");
616  return -1;
617  }
618 
619  rv = read (mq->q.evtfd, &buf, sizeof (buf));
620  if (rv < 0)
621  clib_warning ("read %u", errno);
622 
623  return rv < 0 ? errno : 0;
624  }
625 }
626 
627 u8 *
628 format_svm_msg_q (u8 * s, va_list * args)
629 {
630  svm_msg_q_t *mq = va_arg (*args, svm_msg_q_t *);
631  s = format (s, " [Q:%d/%d]", mq->q.shr->cursize, mq->q.shr->maxsize);
632  for (u32 i = 0; i < vec_len (mq->rings); i++)
633  {
634  s = format (s, " [R%d:%d/%d]", i, mq->rings[i].shr->cursize,
635  mq->rings[i].nitems);
636  }
637  return s;
638 }
639 
640 /*
641  * fd.io coding-style-patch-verification: ON
642  *
643  * Local Variables:
644  * eval: (c-set-style "gnu")
645  * End:
646  */
svm_msg_q_size
static u32 svm_msg_q_size(svm_msg_q_t *mq)
Check length of message queue.
Definition: message_queue.h:310
clib_spinlock_init
static void clib_spinlock_init(clib_spinlock_t *p)
Definition: lock.h:65
svm_msg_q_shared_
Definition: message_queue.h:65
svm_msg_q_cfg_::n_rings
u32 n_rings
number of msg rings
Definition: message_queue.h:89
unix_time_now
static f64 unix_time_now(void)
Definition: time.h:255
svm_msg_q_shr_queue_::mutex
pthread_mutex_t mutex
Definition: message_queue.h:30
svm_msg_q_msg_t::as_u64
u64 as_u64
Definition: message_queue.h:100
svm_msg_q_ring_shared_
Definition: message_queue.h:48
svm_msg_q_sub_raw_batch
int svm_msg_q_sub_raw_batch(svm_msg_q_t *mq, svm_msg_q_msg_t *msg_buf, u32 n_msgs)
Consumer dequeue multiple messages from queue.
Definition: message_queue.c:423
svm_msg_q_ring_shared_::nitems
u32 nitems
max size of the ring
Definition: message_queue.h:51
svm_msg_q_sub
int svm_msg_q_sub(svm_msg_q_t *mq, svm_msg_q_msg_t *msg, svm_q_conditional_wait_t cond, u32 time)
Consumer dequeue one message from queue.
Definition: message_queue.c:457
svm_msg_q_add_and_unlock
void svm_msg_q_add_and_unlock(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Producer enqueue one message to queue with mutex held.
Definition: message_queue.c:394
svm_msg_q_shr_queue_::elsize
u32 elsize
Definition: message_queue.h:36
svm_msg_q_alloc_msg_w_ring
svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring(svm_msg_q_t *mq, u32 ring_index)
Allocate message buffer on ring.
Definition: message_queue.c:221
svm_msg_q_try_lock
static int svm_msg_q_try_lock(svm_msg_q_t *mq)
Try locking message queue.
Definition: message_queue.h:353
svm_msg_q_init
svm_msg_q_shared_t * svm_msg_q_init(void *base, svm_msg_q_cfg_t *cfg)
Definition: message_queue.c:72
svm_msg_q_ring
svm_msg_q_ring_t * svm_msg_q_ring(svm_msg_q_t *mq, u32 ring_index)
Get message queue ring.
Definition: message_queue.c:30
svm_msg_q_msg_data
void * svm_msg_q_msg_data(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Get data for message in queue.
Definition: message_queue.c:287
clib_mem_free
static void clib_mem_free(void *p)
Definition: mem.h:314
svm_msg_q_ring_cfg_::data
void * data
Definition: message_queue.h:82
svm_msg_q_ring_shared_::tail
volatile u32 tail
current tail (for enqueue)
Definition: message_queue.h:53
u8
#define u8
Padding.
Definition: clib.h:121
svm_msg_q_cfg_::q_nitems
u32 q_nitems
msg queue size (not rings)
Definition: message_queue.h:88
SVM_Q_NOWAIT
@ SVM_Q_NOWAIT
non-blocking call - works with both condvar and eventfd signaling
Definition: queue.h:44
svm_msg_q_lock
static int svm_msg_q_lock(svm_msg_q_t *mq)
Lock, or block trying, the message queue.
Definition: message_queue.h:372
svm_msg_q_msg_t::elt_index
u32 elt_index
index in ring
Definition: message_queue.h:98
svm_msg_q_msg_t
Definition: message_queue.h:93
message_queue.h
Unidirectional shared-memory multi-ring message queue.
svm_msg_q_alloc_eventfd
int svm_msg_q_alloc_eventfd(svm_msg_q_t *mq)
Allocate event fd for queue.
Definition: message_queue.c:491
svm_msg_q_alloc
svm_msg_q_shared_t * svm_msg_q_alloc(svm_msg_q_cfg_t *cfg)
Allocate message queue.
Definition: message_queue.c:130
svm_msg_q_ring_
Definition: message_queue.h:58
clib_memcpy_fast
static_always_inline void * clib_memcpy_fast(void *restrict dst, const void *restrict src, size_t n)
Definition: string.h:92
svm_msg_q_free
void svm_msg_q_free(svm_msg_q_t *mq)
Free message queue.
Definition: message_queue.c:179
clib_unix_warning
#define clib_unix_warning(format, args...)
Definition: error.h:68
svm_msg_q_shared_::q
svm_msg_q_shared_queue_t q[0]
queue for exchanging messages
Definition: message_queue.h:69
svm_msg_q_ring_cfg_
Definition: message_queue.h:78
format_svm_msg_q
u8 * format_svm_msg_q(u8 *s, va_list *args)
Format message queue, shows msg count for each ring.
Definition: message_queue.c:628
svm_msg_q_wait_prod
int svm_msg_q_wait_prod(svm_msg_q_t *mq)
Wait for message queue event as producer.
Definition: message_queue.c:543
svm_msg_q_is_empty
static u8 svm_msg_q_is_empty(svm_msg_q_t *mq)
Check if message queue is empty.
Definition: message_queue.h:335
svm_msg_q_init_mutex
static void svm_msg_q_init_mutex(svm_msg_q_shared_queue_t *sq)
Definition: message_queue.c:43
svm_msg_q_shr_queue_::data
u8 data[0]
Definition: message_queue.h:38
vec_len
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
Definition: vec_bootstrap.h:142
svm_msg_q_shr_queue_::tail
u32 tail
Definition: message_queue.h:33
svm_msg_q_shared_t
struct svm_msg_q_shared_ svm_msg_q_shared_t
svm_msg_q_cleanup
void svm_msg_q_cleanup(svm_msg_q_t *mq)
Cleanup mq's private data.
Definition: message_queue.c:170
svm_msg_q_is_full
static u8 svm_msg_q_is_full(svm_msg_q_t *mq)
Check if message queue is full.
Definition: message_queue.h:319
svm_msg_q_shr_queue_::head
u32 head
Definition: message_queue.h:32
svm_msg_q_shr_queue_::maxsize
u32 maxsize
Definition: message_queue.h:35
svm_msg_q_shr_queue_::condvar
pthread_cond_t condvar
Definition: message_queue.h:31
svm_msg_q_shared_queue_t
struct svm_msg_q_shr_queue_ svm_msg_q_shared_queue_t
svm_msg_q_ring_inline
static svm_msg_q_ring_t * svm_msg_q_ring_inline(svm_msg_q_t *mq, u32 ring_index)
Definition: message_queue.c:24
vec_elt_at_index
#define vec_elt_at_index(v, i)
Get vector value at index i checking that i is in bounds.
Definition: vec_bootstrap.h:203
svm_msg_q_ring_shared_::data
u8 data[0]
chunk of memory for msg data
Definition: message_queue.h:55
offset
struct clib_bihash_value offset
template key/value backing page structure
svm_msg_q_queue_::shr
svm_msg_q_shared_queue_t * shr
pointer to shared queue
Definition: message_queue.h:43
PREDICT_FALSE
#define PREDICT_FALSE(x)
Definition: clib.h:124
svm_msg_q_add_raw
static void svm_msg_q_add_raw(svm_msg_q_t *mq, u8 *elem)
Definition: message_queue.c:346
svm_msg_q_msg_t::ring_index
u32 ring_index
ring index, could be u8
Definition: message_queue.h:97
svm_msg_q_ring_cfg_::nitems
u32 nitems
Definition: message_queue.h:80
uword
u64 uword
Definition: types.h:112
svm_msg_q_ring_shared_t
struct svm_msg_q_ring_shared_ svm_msg_q_ring_shared_t
svm_msg_q_queue_::evtfd
int evtfd
producer/consumer eventfd
Definition: message_queue.h:44
time.h
clib_spinlock_free
static void clib_spinlock_free(clib_spinlock_t *p)
Definition: lock.h:72
svm_msg_q_cfg_::ring_cfgs
svm_msg_q_ring_cfg_t * ring_cfgs
array of ring cfgs
Definition: message_queue.h:90
n_msgs
n_msgs
Definition: application.c:506
svm_msg_q_shared_::n_rings
u32 n_rings
number of rings after q
Definition: message_queue.h:67
svm_msg_q_ring_shared_::elsize
u32 elsize
size of an element
Definition: message_queue.h:54
vec_validate
#define vec_validate(V, I)
Make sure vector is long enough for given index (no header, unspecified alignment)
Definition: vec.h:523
format.h
svm_msg_q_attach
void svm_msg_q_attach(svm_msg_q_t *mq, void *smq_base)
Definition: message_queue.c:144
clib_min
#define clib_min(x, y)
Definition: clib.h:342
svm_msg_q_ring_::nitems
u32 nitems
max size of the ring
Definition: message_queue.h:60
clib_atomic_fetch_add_rel
#define clib_atomic_fetch_add_rel(a, b)
Definition: atomics.h:60
CLIB_CACHE_LINE_BYTES
#define CLIB_CACHE_LINE_BYTES
Definition: cache.h:58
svm_msg_q_
Definition: message_queue.h:72
svm_msg_q_ring_::shr
svm_msg_q_ring_shared_t * shr
ring in shared memory
Definition: message_queue.h:62
svm_msg_q_send_signal
static void svm_msg_q_send_signal(svm_msg_q_t *mq, u8 is_consumer)
Definition: message_queue.c:187
svm_msg_q_size_to_alloc
uword svm_msg_q_size_to_alloc(svm_msg_q_cfg_t *cfg)
Definition: message_queue.c:104
data
u8 data[128]
Definition: ipsec_types.api:95
svm_msg_q_ring_data
static void * svm_msg_q_ring_data(svm_msg_q_ring_t *ring, u32 elt_index)
Definition: message_queue.c:36
clib_atomic_fetch_add_relax
#define clib_atomic_fetch_add_relax(a, b)
Definition: atomics.h:63
vec_free
#define vec_free(V)
Free vector's memory (no header).
Definition: vec.h:395
svm_msg_q_shr_queue_
Definition: message_queue.h:28
svm_msg_q_set_eventfd
void svm_msg_q_set_eventfd(svm_msg_q_t *mq, int fd)
Set event fd for queue.
Definition: message_queue.c:485
svm_msg_q_ring_cfg_::elsize
u32 elsize
Definition: message_queue.h:81
svm_msg_q_timedwait
int svm_msg_q_timedwait(svm_msg_q_t *mq, double timeout)
Timed wait for message queue event.
Definition: message_queue.c:572
svm_msg_q_free_msg
void svm_msg_q_free_msg(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Free message buffer.
Definition: message_queue.c:294
clib_bihash_value
template key/value backing page structure
Definition: bihash_doc.h:44
SVM_Q_TIMEDWAIT
@ SVM_Q_TIMEDWAIT
blocking call, returns on signal or time-out - best used in combination with condvars,...
Definition: queue.h:46
u64
unsigned long u64
Definition: types.h:89
i8
signed char i8
Definition: types.h:45
format
description fragment has unexpected format
Definition: map.api:433
ASSERT
#define ASSERT(truth)
Definition: error_bootstrap.h:69
clib_atomic_fetch_sub_relax
#define clib_atomic_fetch_sub_relax(a, b)
Definition: atomics.h:64
svm_msg_q_ring_shared_::cursize
volatile u32 cursize
current size of the ring
Definition: message_queue.h:50
buf
u64 buf
Definition: application.c:493
u32
unsigned int u32
Definition: types.h:88
clib_atomic_load_relax_n
#define clib_atomic_load_relax_n(a)
Definition: atomics.h:50
vec_foreach
#define vec_foreach(var, vec)
Vector iterator.
Definition: vec_bootstrap.h:213
for
for(i=1;i<=collision_buckets;i++)
Definition: flowhash_template.h:378
svm_msg_q_ring_::elsize
u32 elsize
size of an element
Definition: message_queue.h:61
svm_msg_q_::rings
svm_msg_q_ring_t * rings
rings with message data
Definition: message_queue.h:75
svm_msg_q_cfg_
Definition: message_queue.h:85
clib_memset
clib_memset(h->entries, 0, sizeof(h->entries[0]) *entries)
svm_msg_q_lock_and_alloc_msg_w_ring
int svm_msg_q_lock_and_alloc_msg_w_ring(svm_msg_q_t *mq, u32 ring_index, u8 noblock, svm_msg_q_msg_t *msg)
Lock message queue and allocate message buffer on ring.
Definition: message_queue.c:239
svm_msg_q_shr_queue_::cursize
volatile u32 cursize
Definition: message_queue.h:34
u8
unsigned char u8
Definition: types.h:56
i
int i
Definition: flowhash_template.h:376
clib_warning
#define clib_warning(format, args...)
Definition: error.h:59
clib_mem_alloc_aligned
static void * clib_mem_alloc_aligned(uword size, uword align)
Definition: mem.h:264
rv
int __clib_unused rv
Definition: application.c:491
svm_msg_q_queue_::lock
clib_spinlock_t lock
private lock for multi-producer
Definition: message_queue.h:45
mem.h
svm_msg_q_wait_type_t
enum svm_msg_q_wait_type_ svm_msg_q_wait_type_t
svm_msg_q_ring_is_full
static u8 svm_msg_q_ring_is_full(svm_msg_q_t *mq, u32 ring_index)
Definition: message_queue.h:325
svm_msg_q_sub_raw
int svm_msg_q_sub_raw(svm_msg_q_t *mq, svm_msg_q_msg_t *elem)
Consumer dequeue one message from queue.
Definition: message_queue.c:402
SVM_MQ_WAIT_EMPTY
@ SVM_MQ_WAIT_EMPTY
Definition: message_queue.h:107
svm_msg_q_alloc_msg
svm_msg_q_msg_t svm_msg_q_alloc_msg(svm_msg_q_t *mq, u32 nbytes)
Allocate message buffer.
Definition: message_queue.c:266
svm_msg_q_::q
svm_msg_q_queue_t q
queue for exchanging messages
Definition: message_queue.h:74
svm_msg_q_wait
int svm_msg_q_wait(svm_msg_q_t *mq, svm_msg_q_wait_type_t type)
Wait for message queue event.
Definition: message_queue.c:501
type
vl_api_fib_path_type_t type
Definition: fib_types.api:123
svm_msq_q_msg_is_valid
static int svm_msq_q_msg_is_valid(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Definition: message_queue.c:323
svm_q_conditional_wait_t
svm_q_conditional_wait_t
Definition: queue.h:40
svm_msg_q_add
int svm_msg_q_add(svm_msg_q_t *mq, svm_msg_q_msg_t *msg, int nowait)
Producer enqueue one message to queue.
Definition: message_queue.c:363
svm_msg_q_ring_shared_::head
volatile u32 head
current head (for dequeue)
Definition: message_queue.h:52
svm_msg_q_unlock
static void svm_msg_q_unlock(svm_msg_q_t *mq)
Unlock message queue.
Definition: message_queue.h:392