FD.io VPP  v20.01-48-g3e0dafb74
Vector Packet Processing
message_queue.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2018 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <svm/message_queue.h>
17 #include <vppinfra/mem.h>
18 #include <vppinfra/format.h>
19 #include <sys/eventfd.h>
20 
21 static inline svm_msg_q_ring_t *
23 {
24  return vec_elt_at_index (mq->rings, ring_index);
25 }
26 
28 svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index)
29 {
30  return svm_msg_q_ring_inline (mq, ring_index);
31 }
32 
33 static inline void *
35 {
36  ASSERT (elt_index < ring->nitems);
37  return (ring->data + elt_index * ring->elsize);
38 }
39 
42 {
43  svm_msg_q_ring_cfg_t *ring_cfg;
44  uword rings_sz = 0, mq_sz;
45  svm_msg_q_ring_t *ring;
46  u8 *base, *rings_ptr;
47  vec_header_t *vh;
48  u32 vec_sz, q_sz;
49  svm_msg_q_t *mq;
50  int i;
51 
52  ASSERT (cfg);
53 
54  vec_sz = vec_header_bytes (0) + sizeof (svm_msg_q_ring_t) * cfg->n_rings;
55  for (i = 0; i < cfg->n_rings; i++)
56  {
57  if (cfg->ring_cfgs[i].data)
58  continue;
59  ring_cfg = &cfg->ring_cfgs[i];
60  rings_sz += (uword) ring_cfg->nitems * ring_cfg->elsize;
61  }
62 
63  q_sz = sizeof (svm_queue_t) + cfg->q_nitems * sizeof (svm_msg_q_msg_t);
64  mq_sz = sizeof (svm_msg_q_t) + vec_sz + rings_sz + q_sz;
66  if (!base)
67  return 0;
68 
69  mq = (svm_msg_q_t *) base;
70  mq->q = svm_queue_init (base + sizeof (svm_msg_q_t), cfg->q_nitems,
71  sizeof (svm_msg_q_msg_t));
72  mq->q->consumer_pid = cfg->consumer_pid;
73  vh = (vec_header_t *) ((u8 *) mq->q + q_sz);
74  vh->len = cfg->n_rings;
75  mq->rings = (svm_msg_q_ring_t *) (vh + 1);
76  rings_ptr = (u8 *) mq->rings + sizeof (svm_msg_q_ring_t) * cfg->n_rings;
77  for (i = 0; i < cfg->n_rings; i++)
78  {
79  ring = &mq->rings[i];
80  ring->elsize = cfg->ring_cfgs[i].elsize;
81  ring->nitems = cfg->ring_cfgs[i].nitems;
82  ring->cursize = ring->head = ring->tail = 0;
83  if (cfg->ring_cfgs[i].data)
84  ring->data = cfg->ring_cfgs[i].data;
85  else
86  {
87  ring->data = rings_ptr;
88  rings_ptr += (uword) ring->nitems * ring->elsize;
89  }
90  }
91 
92  return mq;
93 }
94 
95 void
97 {
98  svm_queue_free (mq->q);
99  clib_mem_free (mq);
100 }
101 
104 {
105  svm_msg_q_msg_t msg;
106  svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, ring_index);
107 
108  ASSERT (ring->cursize < ring->nitems);
109  msg.ring_index = ring - mq->rings;
110  msg.elt_index = ring->tail;
111  ring->tail = (ring->tail + 1) % ring->nitems;
112  clib_atomic_fetch_add (&ring->cursize, 1);
113  return msg;
114 }
115 
116 int
118  u8 noblock, svm_msg_q_msg_t * msg)
119 {
120  if (noblock)
121  {
122  if (svm_msg_q_try_lock (mq))
123  return -1;
125  || svm_msg_q_ring_is_full (mq, ring_index)))
126  {
127  svm_msg_q_unlock (mq);
128  return -2;
129  }
130  *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
131  }
132  else
133  {
134  svm_msg_q_lock (mq);
135  while (svm_msg_q_is_full (mq)
136  || svm_msg_q_ring_is_full (mq, ring_index))
137  svm_msg_q_wait (mq);
138  *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
139  }
140  return 0;
141 }
142 
145 {
146  svm_msg_q_msg_t msg = {.as_u64 = ~0 };
147  svm_msg_q_ring_t *ring;
148 
149  vec_foreach (ring, mq->rings)
150  {
151  if (ring->elsize < nbytes || ring->cursize == ring->nitems)
152  continue;
153  msg.ring_index = ring - mq->rings;
154  msg.elt_index = ring->tail;
155  ring->tail = (ring->tail + 1) % ring->nitems;
156  clib_atomic_fetch_add (&ring->cursize, 1);
157  break;
158  }
159  return msg;
160 }
161 
162 void *
164 {
166  return svm_msg_q_ring_data (ring, msg->elt_index);
167 }
168 
169 void
171 {
172  svm_msg_q_ring_t *ring;
173  int need_signal;
174 
175  ASSERT (vec_len (mq->rings) > msg->ring_index);
176  ring = &mq->rings[msg->ring_index];
177  if (msg->elt_index == ring->head)
178  {
179  ring->head = (ring->head + 1) % ring->nitems;
180  }
181  else
182  {
183  clib_warning ("message out of order");
184  /* for now, expect messages to be processed in order */
185  ASSERT (0);
186  }
187 
188  need_signal = ring->cursize == ring->nitems;
189  clib_atomic_fetch_sub (&ring->cursize, 1);
190 
191  if (PREDICT_FALSE (need_signal))
192  svm_queue_send_signal (mq->q, 0);
193 }
194 
195 static int
197 {
198  u32 dist1, dist2, tail, head;
199  svm_msg_q_ring_t *ring;
200 
201  if (vec_len (mq->rings) <= msg->ring_index)
202  return 0;
203  ring = &mq->rings[msg->ring_index];
204  tail = ring->tail;
205  head = ring->head;
206 
207  dist1 = ((ring->nitems + msg->elt_index) - head) % ring->nitems;
208  if (tail == head)
209  dist2 = (ring->cursize == 0) ? 0 : ring->nitems;
210  else
211  dist2 = ((ring->nitems + tail) - head) % ring->nitems;
212  return (dist1 < dist2);
213 }
214 
215 int
216 svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait)
217 {
218  ASSERT (svm_msq_q_msg_is_valid (mq, msg));
219  return svm_queue_add (mq->q, (u8 *) msg, nowait);
220 }
221 
222 void
224 {
225  ASSERT (svm_msq_q_msg_is_valid (mq, msg));
226  svm_queue_add_raw (mq->q, (u8 *) msg);
227  svm_msg_q_unlock (mq);
228 }
229 
230 int
232  svm_q_conditional_wait_t cond, u32 time)
233 {
234  return svm_queue_sub (mq->q, (u8 *) msg, cond, time);
235 }
236 
237 void
239 {
240  svm_queue_sub_raw (mq->q, (u8 *) msg);
241 }
242 
243 void
245 {
246  mq->q->consumer_evtfd = fd;
247 }
248 
249 void
251 {
252  mq->q->producer_evtfd = fd;
253 }
254 
255 int
257 {
258  int fd;
259  if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
260  return -1;
262  return 0;
263 }
264 
265 int
267 {
268  int fd;
269  if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
270  return -1;
272  return 0;
273 }
274 
275 u8 *
276 format_svm_msg_q (u8 * s, va_list * args)
277 {
278  svm_msg_q_t *mq = va_arg (*args, svm_msg_q_t *);
279  s = format (s, " [Q:%d/%d]", mq->q->cursize, mq->q->maxsize);
280  for (u32 i = 0; i < vec_len (mq->rings); i++)
281  {
282  s = format (s, " [R%d:%d/%d]", i, mq->rings[i].cursize,
283  mq->rings[i].nitems);
284  }
285  return s;
286 }
287 
288 /*
289  * fd.io coding-style-patch-verification: ON
290  *
291  * Local Variables:
292  * eval: (c-set-style "gnu")
293  * End:
294  */
void svm_queue_add_raw(svm_queue_t *q, u8 *elem)
Add element to queue with mutex held.
Definition: queue.c:228
svm_msg_q_ring_t * rings
rings with message data
Definition: message_queue.h:40
void * svm_msg_q_msg_data(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Get data for message in queue.
static u8 svm_msg_q_ring_is_full(svm_msg_q_t *mq, u32 ring_index)
int svm_queue_add(svm_queue_t *q, u8 *elem, int nowait)
Definition: queue.c:247
svm_msg_q_msg_t svm_msg_q_alloc_msg(svm_msg_q_t *mq, u32 nbytes)
Allocate message buffer.
u8 * format_svm_msg_q(u8 *s, va_list *args)
Format message queue, shows msg count for each ring.
#define clib_atomic_fetch_sub(a, b)
Definition: atomics.h:24
for(i=1;i<=collision_buckets;i++)
int i
volatile u32 head
current head (for dequeue)
Definition: message_queue.h:31
u8 * format(u8 *s, const char *fmt,...)
Definition: format.c:424
unsigned char u8
Definition: types.h:56
static svm_msg_q_ring_t * svm_msg_q_ring_inline(svm_msg_q_t *mq, u32 ring_index)
Definition: message_queue.c:22
int svm_msg_q_lock_and_alloc_msg_w_ring(svm_msg_q_t *mq, u32 ring_index, u8 noblock, svm_msg_q_msg_t *msg)
Lock message queue and allocate message buffer on ring.
int svm_queue_sub(svm_queue_t *q, u8 *elem, svm_q_conditional_wait_t cond, u32 time)
Definition: queue.c:356
static uword vec_header_bytes(uword header_bytes)
Definition: vec_bootstrap.h:80
svm_msg_q_t * svm_msg_q_alloc(svm_msg_q_cfg_t *cfg)
Allocate message queue.
Definition: message_queue.c:41
#define vec_elt_at_index(v, i)
Get vector value at index i checking that i is in bounds.
volatile u32 tail
current tail (for enqueue)
Definition: message_queue.h:32
unsigned int u32
Definition: types.h:88
int svm_msg_q_sub(svm_msg_q_t *mq, svm_msg_q_msg_t *msg, svm_q_conditional_wait_t cond, u32 time)
Consumer dequeue one message from queue.
void svm_msg_q_free(svm_msg_q_t *mq)
Free message queue.
Definition: message_queue.c:96
int svm_msg_q_alloc_consumer_eventfd(svm_msg_q_t *mq)
Allocate event fd for queue consumer.
void svm_msg_q_set_producer_eventfd(svm_msg_q_t *mq, int fd)
Set event fd for queue producer.
#define PREDICT_FALSE(x)
Definition: clib.h:111
svm_msg_q_ring_t * svm_msg_q_ring(svm_msg_q_t *mq, u32 ring_index)
Get message queue ring.
Definition: message_queue.c:28
static int svm_msq_q_msg_is_valid(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
static void svm_msg_q_wait(svm_msg_q_t *mq)
Wait for message queue event.
volatile u32 cursize
current size of the ring
Definition: message_queue.h:29
struct svm_msg_q_ring_ svm_msg_q_ring_t
u32 n_rings
number of msg rings
Definition: message_queue.h:54
svm_q_conditional_wait_t
Definition: queue.h:40
Unidirectional shared-memory multi-ring message queue.
static void svm_msg_q_unlock(svm_msg_q_t *mq)
Unlock message queue.
#define clib_warning(format, args...)
Definition: error.h:59
svm_queue_t * q
queue for exchanging messages
Definition: message_queue.h:39
int svm_msg_q_add(svm_msg_q_t *mq, svm_msg_q_msg_t *msg, int nowait)
Producer enqueue one message to queue.
u32 elt_index
index in ring
Definition: message_queue.h:63
u32 len
Number of elements in vector (NOT its allocated length).
Definition: vec_bootstrap.h:60
static int svm_msg_q_try_lock(svm_msg_q_t *mq)
Try locking message queue.
void svm_queue_free(svm_queue_t *q)
Definition: queue.c:89
u32 ring_index
ring index, could be u8
Definition: message_queue.h:62
#define ASSERT(truth)
void svm_msg_q_add_and_unlock(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Producer enqueue one message to queue with mutex held.
static void clib_mem_free(void *p)
Definition: mem.h:226
vector header structure
Definition: vec_bootstrap.h:55
struct svm_msg_q_ svm_msg_q_t
u8 * data
chunk of memory for msg data
Definition: message_queue.h:34
int svm_queue_sub_raw(svm_queue_t *q, u8 *elem)
Definition: queue.c:451
void svm_msg_q_set_consumer_eventfd(svm_msg_q_t *mq, int fd)
Set event fd for queue consumer.
svm_msg_q_ring_cfg_t * ring_cfgs
array of ring cfgs
Definition: message_queue.h:55
#define clib_atomic_fetch_add(a, b)
Definition: atomics.h:23
u32 q_nitems
msg queue size (not rings)
Definition: message_queue.h:53
void svm_msg_q_free_msg(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Free message buffer.
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
u64 uword
Definition: types.h:112
svm_queue_t * svm_queue_init(void *base, int nels, int elsize)
Definition: queue.c:33
struct _svm_queue svm_queue_t
u32 elsize
size of an element
Definition: message_queue.h:33
static void * clib_mem_alloc_aligned(uword size, uword align)
Definition: mem.h:161
#define vec_foreach(var, vec)
Vector iterator.
int consumer_pid
pid of msg consumer
Definition: message_queue.h:52
int svm_msg_q_alloc_producer_eventfd(svm_msg_q_t *mq)
Allocate event fd for queue consumer.
#define CLIB_CACHE_LINE_BYTES
Definition: cache.h:59
static int svm_msg_q_lock(svm_msg_q_t *mq)
Lock, or block trying, the message queue.
static u8 svm_msg_q_is_full(svm_msg_q_t *mq)
Check if message queue is full.
svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring(svm_msg_q_t *mq, u32 ring_index)
Allocate message buffer on ring.
static void * svm_msg_q_ring_data(svm_msg_q_ring_t *ring, u32 elt_index)
Definition: message_queue.c:34
void svm_queue_send_signal(svm_queue_t *q, u8 is_prod)
Definition: queue.c:134
void svm_msg_q_sub_w_lock(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Consumer dequeue one message from queue with mutex held.
u32 nitems
max size of the ring
Definition: message_queue.h:30