FD.io VPP  v20.01-48-g3e0dafb74
Vector Packet Processing
queue.c
Go to the documentation of this file.
1 /*
2  *------------------------------------------------------------------
3  * svm_queue.c - unidirectional shared-memory queues
4  *
5  * Copyright (c) 2009-2019 Cisco and/or its affiliates.
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at:
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *------------------------------------------------------------------
18  */
19 
20 
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <pthread.h>
25 #include <vppinfra/mem.h>
26 #include <vppinfra/format.h>
27 #include <vppinfra/cache.h>
28 #include <svm/queue.h>
29 #include <vppinfra/time.h>
30 #include <vppinfra/lock.h>
31 
33 svm_queue_init (void *base, int nels, int elsize)
34 {
35  svm_queue_t *q;
36  pthread_mutexattr_t attr;
37  pthread_condattr_t cattr;
38 
39  q = (svm_queue_t *) base;
40  clib_memset (q, 0, sizeof (*q));
41 
42  q->elsize = elsize;
43  q->maxsize = nels;
44  q->producer_evtfd = -1;
45  q->consumer_evtfd = -1;
46 
47  clib_memset (&attr, 0, sizeof (attr));
48  clib_memset (&cattr, 0, sizeof (cattr));
49 
50  if (pthread_mutexattr_init (&attr))
51  clib_unix_warning ("mutexattr_init");
52  if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
53  clib_unix_warning ("pthread_mutexattr_setpshared");
54  if (pthread_mutex_init (&q->mutex, &attr))
55  clib_unix_warning ("mutex_init");
56  if (pthread_mutexattr_destroy (&attr))
57  clib_unix_warning ("mutexattr_destroy");
58  if (pthread_condattr_init (&cattr))
59  clib_unix_warning ("condattr_init");
60  /* prints funny-looking messages in the Linux target */
61  if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
62  clib_unix_warning ("condattr_setpshared");
63  if (pthread_cond_init (&q->condvar, &cattr))
64  clib_unix_warning ("cond_init1");
65  if (pthread_condattr_destroy (&cattr))
66  clib_unix_warning ("cond_init2");
67 
68  return (q);
69 }
70 
72 svm_queue_alloc_and_init (int nels, int elsize, int consumer_pid)
73 {
74  svm_queue_t *q;
75 
77  + nels * elsize, CLIB_CACHE_LINE_BYTES);
78  clib_memset (q, 0, sizeof (*q));
79  q = svm_queue_init (q, nels, elsize);
80  q->consumer_pid = consumer_pid;
81 
82  return q;
83 }
84 
85 /*
86  * svm_queue_free
87  */
88 void
90 {
91  (void) pthread_mutex_destroy (&q->mutex);
92  (void) pthread_cond_destroy (&q->condvar);
93  clib_mem_free (q);
94 }
95 
96 void
98 {
99  pthread_mutex_lock (&q->mutex);
100 }
101 
102 void
104 {
105  pthread_mutex_unlock (&q->mutex);
106 }
107 
108 int
110 {
111  return q->cursize == q->maxsize;
112 }
113 
114 static inline void
116 {
117  if (q->producer_evtfd == -1)
118  {
119  (void) pthread_cond_broadcast (&q->condvar);
120  }
121  else
122  {
123  int __clib_unused rv, fd;
124  u64 data = 1;
125  ASSERT (q->consumer_evtfd > 0 && q->producer_evtfd > 0);
126  fd = is_prod ? q->producer_evtfd : q->consumer_evtfd;
127  rv = write (fd, &data, sizeof (data));
128  if (PREDICT_FALSE (rv < 0))
129  clib_unix_warning ("signal write on %d returned %d", fd, rv);
130  }
131 }
132 
133 void
135 {
136  svm_queue_send_signal_inline (q, is_prod);
137 }
138 
139 static inline void
141 {
142  if (q->producer_evtfd == -1)
143  {
144  pthread_cond_wait (&q->condvar, &q->mutex);
145  }
146  else
147  {
148  /* Fake a wait for event. We could use epoll but that would mean
149  * using yet another fd. Should do for now */
150  u32 cursize = q->cursize;
151  svm_queue_unlock (q);
152  while (q->cursize == cursize)
153  CLIB_PAUSE ();
154  svm_queue_lock (q);
155  }
156 }
157 
158 void
160 {
162 }
163 
164 static inline int
166 {
167  struct timespec ts;
168  ts.tv_sec = unix_time_now () + (u32) timeout;
169  ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
170 
171  if (q->producer_evtfd == -1)
172  {
173  return pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
174  }
175  else
176  {
177  double max_time = unix_time_now () + timeout;
178  u32 cursize = q->cursize;
179  int rv;
180 
181  svm_queue_unlock (q);
182  while (q->cursize == cursize && unix_time_now () < max_time)
183  CLIB_PAUSE ();
184  rv = unix_time_now () < max_time ? 0 : ETIMEDOUT;
185  svm_queue_lock (q);
186  return rv;
187  }
188 }
189 
190 int
191 svm_queue_timedwait (svm_queue_t * q, double timeout)
192 {
193  return svm_queue_timedwait_inline (q, timeout);
194 }
195 
196 /*
197  * svm_queue_add_nolock
198  */
199 int
201 {
202  i8 *tailp;
203  int need_broadcast = 0;
204 
205  if (PREDICT_FALSE (q->cursize == q->maxsize))
206  {
207  while (q->cursize == q->maxsize)
209  }
210 
211  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
212  clib_memcpy_fast (tailp, elem, q->elsize);
213 
214  q->tail++;
215  q->cursize++;
216 
217  need_broadcast = (q->cursize == 1);
218 
219  if (q->tail == q->maxsize)
220  q->tail = 0;
221 
222  if (need_broadcast)
224  return 0;
225 }
226 
227 void
229 {
230  i8 *tailp;
231 
232  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
233  clib_memcpy_fast (tailp, elem, q->elsize);
234 
235  q->tail = (q->tail + 1) % q->maxsize;
236  q->cursize++;
237 
238  if (q->cursize == 1)
240 }
241 
242 
243 /*
244  * svm_queue_add
245  */
246 int
247 svm_queue_add (svm_queue_t * q, u8 * elem, int nowait)
248 {
249  i8 *tailp;
250  int need_broadcast = 0;
251 
252  if (nowait)
253  {
254  /* zero on success */
255  if (pthread_mutex_trylock (&q->mutex))
256  {
257  return (-1);
258  }
259  }
260  else
261  svm_queue_lock (q);
262 
263  if (PREDICT_FALSE (q->cursize == q->maxsize))
264  {
265  if (nowait)
266  {
267  svm_queue_unlock (q);
268  return (-2);
269  }
270  while (q->cursize == q->maxsize)
272  }
273 
274  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
275  clib_memcpy_fast (tailp, elem, q->elsize);
276 
277  q->tail++;
278  q->cursize++;
279 
280  need_broadcast = (q->cursize == 1);
281 
282  if (q->tail == q->maxsize)
283  q->tail = 0;
284 
285  if (need_broadcast)
287 
288  svm_queue_unlock (q);
289 
290  return 0;
291 }
292 
293 /*
294  * svm_queue_add2
295  */
296 int
297 svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait)
298 {
299  i8 *tailp;
300  int need_broadcast = 0;
301 
302  if (nowait)
303  {
304  /* zero on success */
305  if (pthread_mutex_trylock (&q->mutex))
306  {
307  return (-1);
308  }
309  }
310  else
311  svm_queue_lock (q);
312 
313  if (PREDICT_FALSE (q->cursize + 1 == q->maxsize))
314  {
315  if (nowait)
316  {
317  svm_queue_unlock (q);
318  return (-2);
319  }
320  while (q->cursize + 1 == q->maxsize)
322  }
323 
324  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
325  clib_memcpy_fast (tailp, elem, q->elsize);
326 
327  q->tail++;
328  q->cursize++;
329 
330  if (q->tail == q->maxsize)
331  q->tail = 0;
332 
333  need_broadcast = (q->cursize == 1);
334 
335  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
336  clib_memcpy_fast (tailp, elem2, q->elsize);
337 
338  q->tail++;
339  q->cursize++;
340 
341  if (q->tail == q->maxsize)
342  q->tail = 0;
343 
344  if (need_broadcast)
346 
347  svm_queue_unlock (q);
348 
349  return 0;
350 }
351 
352 /*
353  * svm_queue_sub
354  */
355 int
357  u32 time)
358 {
359  i8 *headp;
360  int need_broadcast = 0;
361  int rc = 0;
362 
363  if (cond == SVM_Q_NOWAIT)
364  {
365  /* zero on success */
366  if (pthread_mutex_trylock (&q->mutex))
367  {
368  return (-1);
369  }
370  }
371  else
372  svm_queue_lock (q);
373 
374  if (PREDICT_FALSE (q->cursize == 0))
375  {
376  if (cond == SVM_Q_NOWAIT)
377  {
378  svm_queue_unlock (q);
379  return (-2);
380  }
381  else if (cond == SVM_Q_TIMEDWAIT)
382  {
383  while (q->cursize == 0 && rc == 0)
384  rc = svm_queue_timedwait_inline (q, time);
385 
386  if (rc == ETIMEDOUT)
387  {
388  svm_queue_unlock (q);
389  return ETIMEDOUT;
390  }
391  }
392  else
393  {
394  while (q->cursize == 0)
396  }
397  }
398 
399  headp = (i8 *) (&q->data[0] + q->elsize * q->head);
400  clib_memcpy_fast (elem, headp, q->elsize);
401 
402  q->head++;
403  /* $$$$ JFC shouldn't this be == 0? */
404  if (q->cursize == q->maxsize)
405  need_broadcast = 1;
406 
407  q->cursize--;
408 
409  if (q->head == q->maxsize)
410  q->head = 0;
411 
412  if (need_broadcast)
414 
415  svm_queue_unlock (q);
416 
417  return 0;
418 }
419 
420 int
422 {
423  int need_broadcast;
424  i8 *headp;
425 
426  svm_queue_lock (q);
427  if (q->cursize == 0)
428  {
429  svm_queue_unlock (q);
430  return -1;
431  }
432 
433  headp = (i8 *) (&q->data[0] + q->elsize * q->head);
434  clib_memcpy_fast (elem, headp, q->elsize);
435 
436  q->head++;
437  need_broadcast = (q->cursize == q->maxsize / 2);
438  q->cursize--;
439 
440  if (PREDICT_FALSE (q->head == q->maxsize))
441  q->head = 0;
442  svm_queue_unlock (q);
443 
444  if (need_broadcast)
446 
447  return 0;
448 }
449 
450 int
452 {
453  int need_broadcast;
454  i8 *headp;
455 
456  if (PREDICT_FALSE (q->cursize == 0))
457  {
458  while (q->cursize == 0)
459  ;
460  }
461 
462  headp = (i8 *) (&q->data[0] + q->elsize * q->head);
463  clib_memcpy_fast (elem, headp, q->elsize);
464 
465  need_broadcast = q->cursize == q->maxsize;
466 
467  q->head = (q->head + 1) % q->maxsize;
468  q->cursize--;
469 
470  if (PREDICT_FALSE (need_broadcast))
472 
473  return 0;
474 }
475 
476 void
478 {
479  q->producer_evtfd = fd;
480 }
481 
482 void
484 {
485  q->consumer_evtfd = fd;
486 }
487 
488 /*
489  * fd.io coding-style-patch-verification: ON
490  *
491  * Local Variables:
492  * eval: (c-set-style "gnu")
493  * End:
494  */
#define CLIB_PAUSE()
Definition: lock.h:23
void svm_queue_add_raw(svm_queue_t *q, u8 *elem)
Add element to queue with mutex held.
Definition: queue.c:228
int svm_queue_is_full(svm_queue_t *q)
Definition: queue.c:109
int svm_queue_add(svm_queue_t *q, u8 *elem, int nowait)
Definition: queue.c:247
Optimized string handling code, including c11-compliant "safe C library" variants.
unsigned long u64
Definition: types.h:89
#define clib_memcpy_fast(a, b, c)
Definition: string.h:81
void svm_queue_set_producer_event_fd(svm_queue_t *q, int fd)
Set producer&#39;s event fd.
Definition: queue.c:477
static int svm_queue_timedwait_inline(svm_queue_t *q, double timeout)
Definition: queue.c:165
clib_memset(h->entries, 0, sizeof(h->entries[0]) *entries)
int svm_queue_sub2(svm_queue_t *q, u8 *elem)
Definition: queue.c:421
unsigned char u8
Definition: types.h:56
void svm_queue_unlock(svm_queue_t *q)
Definition: queue.c:103
blocking call, returns on signal or time-out - best used in combination with condvars, with eventfds we don&#39;t yield the cpu
Definition: queue.h:46
int svm_queue_sub(svm_queue_t *q, u8 *elem, svm_q_conditional_wait_t cond, u32 time)
Definition: queue.c:356
static f64 unix_time_now(void)
Definition: time.h:249
unsigned int u32
Definition: types.h:88
void svm_queue_wait(svm_queue_t *q)
Wait for queue event.
Definition: queue.c:159
int svm_queue_add2(svm_queue_t *q, u8 *elem, u8 *elem2, int nowait)
Definition: queue.c:297
void svm_queue_lock(svm_queue_t *q)
Definition: queue.c:97
#define PREDICT_FALSE(x)
Definition: clib.h:111
signed char i8
Definition: types.h:45
static void svm_queue_send_signal_inline(svm_queue_t *q, u8 is_prod)
Definition: queue.c:115
svm_q_conditional_wait_t
Definition: queue.h:40
static void svm_queue_wait_inline(svm_queue_t *q)
Definition: queue.c:140
void svm_queue_free(svm_queue_t *q)
Definition: queue.c:89
void svm_queue_set_consumer_event_fd(svm_queue_t *q, int fd)
Set consumer&#39;s event fd.
Definition: queue.c:483
#define ASSERT(truth)
u8 data[128]
Definition: ipsec_types.api:87
svm_queue_t * svm_queue_alloc_and_init(int nels, int elsize, int consumer_pid)
Allocate and initialize svm queue.
Definition: queue.c:72
static void clib_mem_free(void *p)
Definition: mem.h:226
int svm_queue_sub_raw(svm_queue_t *q, u8 *elem)
Definition: queue.c:451
#define clib_unix_warning(format, args...)
Definition: error.h:68
svm_queue_t * svm_queue_init(void *base, int nels, int elsize)
Definition: queue.c:33
int svm_queue_add_nolock(svm_queue_t *q, u8 *elem)
Definition: queue.c:200
struct _svm_queue svm_queue_t
static void * clib_mem_alloc_aligned(uword size, uword align)
Definition: mem.h:161
int svm_queue_timedwait(svm_queue_t *q, double timeout)
Timed wait for queue event.
Definition: queue.c:191
#define CLIB_CACHE_LINE_BYTES
Definition: cache.h:59
void svm_queue_send_signal(svm_queue_t *q, u8 is_prod)
Definition: queue.c:134
non-blocking call - works with both condvar and eventfd signaling
Definition: queue.h:44