FD.io VPP  v21.10.1-2-g0a485f517
Vector Packet Processing
queue.c
Go to the documentation of this file.
1 /*
2  *------------------------------------------------------------------
3  * svm_queue.c - unidirectional shared-memory queues
4  *
5  * Copyright (c) 2009-2019 Cisco and/or its affiliates.
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at:
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *------------------------------------------------------------------
18  */
19 
20 
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <pthread.h>
25 #include <vppinfra/mem.h>
26 #include <vppinfra/format.h>
27 #include <vppinfra/cache.h>
28 #include <svm/queue.h>
29 #include <vppinfra/time.h>
30 #include <vppinfra/lock.h>
31 
33 svm_queue_init (void *base, int nels, int elsize)
34 {
35  svm_queue_t *q;
36  pthread_mutexattr_t attr;
37  pthread_condattr_t cattr;
38 
39  q = (svm_queue_t *) base;
40  clib_memset (q, 0, sizeof (*q));
41 
42  q->elsize = elsize;
43  q->maxsize = nels;
44  q->producer_evtfd = -1;
45  q->consumer_evtfd = -1;
46 
47  clib_memset (&attr, 0, sizeof (attr));
48  clib_memset (&cattr, 0, sizeof (cattr));
49 
50  if (pthread_mutexattr_init (&attr))
51  clib_unix_warning ("mutexattr_init");
52  if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
53  clib_unix_warning ("pthread_mutexattr_setpshared");
54  if (pthread_mutexattr_setrobust (&attr, PTHREAD_MUTEX_ROBUST))
55  clib_unix_warning ("setrobust");
56  if (pthread_mutex_init (&q->mutex, &attr))
57  clib_unix_warning ("mutex_init");
58  if (pthread_mutexattr_destroy (&attr))
59  clib_unix_warning ("mutexattr_destroy");
60  if (pthread_condattr_init (&cattr))
61  clib_unix_warning ("condattr_init");
62  /* prints funny-looking messages in the Linux target */
63  if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
64  clib_unix_warning ("condattr_setpshared");
65  if (pthread_cond_init (&q->condvar, &cattr))
66  clib_unix_warning ("cond_init1");
67  if (pthread_condattr_destroy (&cattr))
68  clib_unix_warning ("cond_init2");
69 
70  return (q);
71 }
72 
74 svm_queue_alloc_and_init (int nels, int elsize, int consumer_pid)
75 {
76  svm_queue_t *q;
77 
79  + nels * elsize, CLIB_CACHE_LINE_BYTES);
80  clib_memset (q, 0, sizeof (*q));
81  q = svm_queue_init (q, nels, elsize);
82  q->consumer_pid = consumer_pid;
83 
84  return q;
85 }
86 
87 /*
88  * svm_queue_free
89  */
90 void
92 {
93  (void) pthread_mutex_destroy (&q->mutex);
94  (void) pthread_cond_destroy (&q->condvar);
95  clib_mem_free (q);
96 }
97 
98 void
100 {
101  int rv = pthread_mutex_lock (&q->mutex);
102  if (PREDICT_FALSE (rv == EOWNERDEAD))
103  pthread_mutex_consistent (&q->mutex);
104 }
105 
106 static int
108 {
109  int rv = pthread_mutex_trylock (&q->mutex);
110  if (PREDICT_FALSE (rv == EOWNERDEAD))
111  rv = pthread_mutex_consistent (&q->mutex);
112  return rv;
113 }
114 
115 void
117 {
118  pthread_mutex_unlock (&q->mutex);
119 }
120 
121 int
123 {
124  return q->cursize == q->maxsize;
125 }
126 
127 static inline void
129 {
130  if (q->producer_evtfd == -1)
131  {
132  (void) pthread_cond_broadcast (&q->condvar);
133  }
134  else
135  {
136  int __clib_unused rv, fd;
137  u64 data = 1;
138  ASSERT (q->consumer_evtfd > 0 && q->producer_evtfd > 0);
139  fd = is_prod ? q->producer_evtfd : q->consumer_evtfd;
140  rv = write (fd, &data, sizeof (data));
141  if (PREDICT_FALSE (rv < 0))
142  clib_unix_warning ("signal write on %d returned %d", fd, rv);
143  }
144 }
145 
146 void
148 {
149  svm_queue_send_signal_inline (q, is_prod);
150 }
151 
152 static inline void
154 {
155  if (q->producer_evtfd == -1)
156  {
157  pthread_cond_wait (&q->condvar, &q->mutex);
158  }
159  else
160  {
161  /* Fake a wait for event. We could use epoll but that would mean
162  * using yet another fd. Should do for now */
163  u32 cursize = q->cursize;
164  svm_queue_unlock (q);
165  while (q->cursize == cursize)
166  CLIB_PAUSE ();
167  svm_queue_lock (q);
168  }
169 }
170 
171 void
173 {
175 }
176 
177 static inline int
179 {
180  struct timespec ts;
181  ts.tv_sec = unix_time_now () + (u32) timeout;
182  ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
183 
184  if (q->producer_evtfd == -1)
185  {
186  return pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
187  }
188  else
189  {
190  double max_time = unix_time_now () + timeout;
191  u32 cursize = q->cursize;
192  int rv;
193 
194  svm_queue_unlock (q);
195  while (q->cursize == cursize && unix_time_now () < max_time)
196  CLIB_PAUSE ();
197  rv = unix_time_now () < max_time ? 0 : ETIMEDOUT;
198  svm_queue_lock (q);
199  return rv;
200  }
201 }
202 
203 int
204 svm_queue_timedwait (svm_queue_t * q, double timeout)
205 {
206  return svm_queue_timedwait_inline (q, timeout);
207 }
208 
209 /*
210  * svm_queue_add_nolock
211  */
212 int
214 {
215  i8 *tailp;
216  int need_broadcast = 0;
217 
218  if (PREDICT_FALSE (q->cursize == q->maxsize))
219  {
220  while (q->cursize == q->maxsize)
222  }
223 
224  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
225  clib_memcpy_fast (tailp, elem, q->elsize);
226 
227  q->tail++;
228  q->cursize++;
229 
230  need_broadcast = (q->cursize == 1);
231 
232  if (q->tail == q->maxsize)
233  q->tail = 0;
234 
235  if (need_broadcast)
237  return 0;
238 }
239 
240 void
242 {
243  i8 *tailp;
244 
245  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
246  clib_memcpy_fast (tailp, elem, q->elsize);
247 
248  q->tail = (q->tail + 1) % q->maxsize;
249  q->cursize++;
250 
251  if (q->cursize == 1)
253 }
254 
255 
256 /*
257  * svm_queue_add
258  */
259 int
260 svm_queue_add (svm_queue_t * q, u8 * elem, int nowait)
261 {
262  i8 *tailp;
263  int need_broadcast = 0;
264 
265  if (nowait)
266  {
267  /* zero on success */
268  if (svm_queue_trylock (q))
269  {
270  return (-1);
271  }
272  }
273  else
274  svm_queue_lock (q);
275 
276  if (PREDICT_FALSE (q->cursize == q->maxsize))
277  {
278  if (nowait)
279  {
280  svm_queue_unlock (q);
281  return (-2);
282  }
283  while (q->cursize == q->maxsize)
285  }
286 
287  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
288  clib_memcpy_fast (tailp, elem, q->elsize);
289 
290  q->tail++;
291  q->cursize++;
292 
293  need_broadcast = (q->cursize == 1);
294 
295  if (q->tail == q->maxsize)
296  q->tail = 0;
297 
298  if (need_broadcast)
300 
301  svm_queue_unlock (q);
302 
303  return 0;
304 }
305 
306 /*
307  * svm_queue_add2
308  */
309 int
310 svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait)
311 {
312  i8 *tailp;
313  int need_broadcast = 0;
314 
315  if (nowait)
316  {
317  /* zero on success */
318  if (svm_queue_trylock (q))
319  {
320  return (-1);
321  }
322  }
323  else
324  svm_queue_lock (q);
325 
326  if (PREDICT_FALSE (q->cursize + 1 == q->maxsize))
327  {
328  if (nowait)
329  {
330  svm_queue_unlock (q);
331  return (-2);
332  }
333  while (q->cursize + 1 == q->maxsize)
335  }
336 
337  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
338  clib_memcpy_fast (tailp, elem, q->elsize);
339 
340  q->tail++;
341  q->cursize++;
342 
343  if (q->tail == q->maxsize)
344  q->tail = 0;
345 
346  need_broadcast = (q->cursize == 1);
347 
348  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
349  clib_memcpy_fast (tailp, elem2, q->elsize);
350 
351  q->tail++;
352  q->cursize++;
353 
354  if (q->tail == q->maxsize)
355  q->tail = 0;
356 
357  if (need_broadcast)
359 
360  svm_queue_unlock (q);
361 
362  return 0;
363 }
364 
365 /*
366  * svm_queue_sub
367  */
368 int
370  u32 time)
371 {
372  i8 *headp;
373  int need_broadcast = 0;
374  int rc = 0;
375 
376  if (cond == SVM_Q_NOWAIT)
377  {
378  /* zero on success */
379  if (svm_queue_trylock (q))
380  {
381  return (-1);
382  }
383  }
384  else
385  svm_queue_lock (q);
386 
387  if (PREDICT_FALSE (q->cursize == 0))
388  {
389  if (cond == SVM_Q_NOWAIT)
390  {
391  svm_queue_unlock (q);
392  return (-2);
393  }
394  else if (cond == SVM_Q_TIMEDWAIT)
395  {
396  while (q->cursize == 0 && rc == 0)
397  rc = svm_queue_timedwait_inline (q, time);
398 
399  if (rc == ETIMEDOUT)
400  {
401  svm_queue_unlock (q);
402  return ETIMEDOUT;
403  }
404  }
405  else
406  {
407  while (q->cursize == 0)
409  }
410  }
411 
412  headp = (i8 *) (&q->data[0] + q->elsize * q->head);
413  clib_memcpy_fast (elem, headp, q->elsize);
414 
415  q->head++;
416  /* $$$$ JFC shouldn't this be == 0? */
417  if (q->cursize == q->maxsize)
418  need_broadcast = 1;
419 
420  q->cursize--;
421 
422  if (q->head == q->maxsize)
423  q->head = 0;
424 
425  if (need_broadcast)
427 
428  svm_queue_unlock (q);
429 
430  return 0;
431 }
432 
433 int
435 {
436  int need_broadcast;
437  i8 *headp;
438 
439  svm_queue_lock (q);
440  if (q->cursize == 0)
441  {
442  svm_queue_unlock (q);
443  return -1;
444  }
445 
446  headp = (i8 *) (&q->data[0] + q->elsize * q->head);
447  clib_memcpy_fast (elem, headp, q->elsize);
448 
449  q->head++;
450  need_broadcast = (q->cursize == q->maxsize / 2);
451  q->cursize--;
452 
453  if (PREDICT_FALSE (q->head == q->maxsize))
454  q->head = 0;
455  svm_queue_unlock (q);
456 
457  if (need_broadcast)
459 
460  return 0;
461 }
462 
463 int
465 {
466  int need_broadcast;
467  i8 *headp;
468 
469  if (PREDICT_FALSE (q->cursize == 0))
470  {
471  while (q->cursize == 0)
472  ;
473  }
474 
475  headp = (i8 *) (&q->data[0] + q->elsize * q->head);
476  clib_memcpy_fast (elem, headp, q->elsize);
477 
478  need_broadcast = q->cursize == q->maxsize;
479 
480  q->head = (q->head + 1) % q->maxsize;
481  q->cursize--;
482 
483  if (PREDICT_FALSE (need_broadcast))
485 
486  return 0;
487 }
488 
489 void
491 {
492  q->producer_evtfd = fd;
493 }
494 
495 void
497 {
498  q->consumer_evtfd = fd;
499 }
500 
501 /*
502  * fd.io coding-style-patch-verification: ON
503  *
504  * Local Variables:
505  * eval: (c-set-style "gnu")
506  * End:
507  */
unix_time_now
static f64 unix_time_now(void)
Definition: time.h:255
svm_queue_set_producer_event_fd
void svm_queue_set_producer_event_fd(svm_queue_t *q, int fd)
Set producer's event fd.
Definition: queue.c:490
svm_queue_trylock
static int svm_queue_trylock(svm_queue_t *q)
Definition: queue.c:107
svm_queue_send_signal_inline
static void svm_queue_send_signal_inline(svm_queue_t *q, u8 is_prod)
Definition: queue.c:128
svm_queue_lock
void svm_queue_lock(svm_queue_t *q)
Definition: queue.c:99
string.h
clib_mem_free
static void clib_mem_free(void *p)
Definition: mem.h:314
svm_queue_t
struct _svm_queue svm_queue_t
SVM_Q_NOWAIT
@ SVM_Q_NOWAIT
non-blocking call - works with both condvar and eventfd signaling
Definition: queue.h:44
svm_queue_timedwait_inline
static int svm_queue_timedwait_inline(svm_queue_t *q, double timeout)
Definition: queue.c:178
svm_queue_add_nolock
int svm_queue_add_nolock(svm_queue_t *q, u8 *elem)
Definition: queue.c:213
clib_memcpy_fast
static_always_inline void * clib_memcpy_fast(void *restrict dst, const void *restrict src, size_t n)
Definition: string.h:92
clib_unix_warning
#define clib_unix_warning(format, args...)
Definition: error.h:68
svm_queue_wait
void svm_queue_wait(svm_queue_t *q)
Wait for queue event.
Definition: queue.c:172
svm_queue_timedwait
int svm_queue_timedwait(svm_queue_t *q, double timeout)
Timed wait for queue event.
Definition: queue.c:204
svm_queue_free
void svm_queue_free(svm_queue_t *q)
Definition: queue.c:91
svm_queue_sub_raw
int svm_queue_sub_raw(svm_queue_t *q, u8 *elem)
Definition: queue.c:464
svm_queue_set_consumer_event_fd
void svm_queue_set_consumer_event_fd(svm_queue_t *q, int fd)
Set consumer's event fd.
Definition: queue.c:496
lock.h
PREDICT_FALSE
#define PREDICT_FALSE(x)
Definition: clib.h:124
if
if(node->flags &VLIB_NODE_FLAG_TRACE) vnet_interface_output_trace(vm
time.h
format.h
svm_queue_init
svm_queue_t * svm_queue_init(void *base, int nels, int elsize)
Definition: queue.c:33
CLIB_CACHE_LINE_BYTES
#define CLIB_CACHE_LINE_BYTES
Definition: cache.h:58
queue.h
svm_queue_unlock
void svm_queue_unlock(svm_queue_t *q)
Definition: queue.c:116
data
u8 data[128]
Definition: ipsec_types.api:95
svm_queue_sub2
int svm_queue_sub2(svm_queue_t *q, u8 *elem)
Definition: queue.c:434
SVM_Q_TIMEDWAIT
@ SVM_Q_TIMEDWAIT
blocking call, returns on signal or time-out - best used in combination with condvars,...
Definition: queue.h:46
svm_queue_add
int svm_queue_add(svm_queue_t *q, u8 *elem, int nowait)
Definition: queue.c:260
u64
unsigned long u64
Definition: types.h:89
i8
signed char i8
Definition: types.h:45
ASSERT
#define ASSERT(truth)
Definition: error_bootstrap.h:69
cache.h
svm_queue_is_full
int svm_queue_is_full(svm_queue_t *q)
Definition: queue.c:122
u32
unsigned int u32
Definition: types.h:88
svm_queue_add2
int svm_queue_add2(svm_queue_t *q, u8 *elem, u8 *elem2, int nowait)
Definition: queue.c:310
CLIB_PAUSE
#define CLIB_PAUSE()
Definition: lock.h:23
svm_queue_wait_inline
static void svm_queue_wait_inline(svm_queue_t *q)
Definition: queue.c:153
clib_memset
clib_memset(h->entries, 0, sizeof(h->entries[0]) *entries)
u8
unsigned char u8
Definition: types.h:56
clib_mem_alloc_aligned
static void * clib_mem_alloc_aligned(uword size, uword align)
Definition: mem.h:264
svm_queue_sub
int svm_queue_sub(svm_queue_t *q, u8 *elem, svm_q_conditional_wait_t cond, u32 time)
Definition: queue.c:369
rv
int __clib_unused rv
Definition: application.c:491
mem.h
svm_queue_send_signal
void svm_queue_send_signal(svm_queue_t *q, u8 is_prod)
Definition: queue.c:147
svm_queue_add_raw
void svm_queue_add_raw(svm_queue_t *q, u8 *elem)
Add element to queue with mutex held.
Definition: queue.c:241
svm_queue_alloc_and_init
svm_queue_t * svm_queue_alloc_and_init(int nels, int elsize, int consumer_pid)
Allocate and initialize svm queue.
Definition: queue.c:74
svm_q_conditional_wait_t
svm_q_conditional_wait_t
Definition: queue.h:40