FD.io VPP  v19.01.3-6-g70449b9b9
Vector Packet Processing
message_queue.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2018 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <svm/message_queue.h>
17 #include <vppinfra/mem.h>
18 #include <sys/eventfd.h>
19 
20 static inline svm_msg_q_ring_t *
22 {
23  return vec_elt_at_index (mq->rings, ring_index);
24 }
25 
27 svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index)
28 {
29  return svm_msg_q_ring_inline (mq, ring_index);
30 }
31 
32 static inline void *
34 {
35  ASSERT (elt_index < ring->nitems);
36  return (ring->data + elt_index * ring->elsize);
37 }
38 
41 {
42  svm_msg_q_ring_cfg_t *ring_cfg;
43  uword rings_sz = 0, mq_sz;
44  svm_msg_q_ring_t *ring;
45  u8 *base, *rings_ptr;
46  vec_header_t *vh;
47  u32 vec_sz, q_sz;
48  svm_msg_q_t *mq;
49  int i;
50 
51  ASSERT (cfg);
52 
53  vec_sz = vec_header_bytes (0) + sizeof (svm_msg_q_ring_t) * cfg->n_rings;
54  for (i = 0; i < cfg->n_rings; i++)
55  {
56  if (cfg->ring_cfgs[i].data)
57  continue;
58  ring_cfg = &cfg->ring_cfgs[i];
59  rings_sz += (uword) ring_cfg->nitems * ring_cfg->elsize;
60  }
61 
62  q_sz = sizeof (svm_queue_t) + cfg->q_nitems * sizeof (svm_msg_q_msg_t);
63  mq_sz = sizeof (svm_msg_q_t) + vec_sz + rings_sz + q_sz;
65  if (!base)
66  return 0;
67 
68  mq = (svm_msg_q_t *) base;
69  mq->q = svm_queue_init (base + sizeof (svm_msg_q_t), cfg->q_nitems,
70  sizeof (svm_msg_q_msg_t));
71  mq->q->consumer_pid = cfg->consumer_pid;
72  vh = (vec_header_t *) ((u8 *) mq->q + q_sz);
73  vh->len = cfg->n_rings;
74  mq->rings = (svm_msg_q_ring_t *) (vh + 1);
75  rings_ptr = (u8 *) mq->rings + vec_sz;
76  for (i = 0; i < cfg->n_rings; i++)
77  {
78  ring = &mq->rings[i];
79  ring->elsize = cfg->ring_cfgs[i].elsize;
80  ring->nitems = cfg->ring_cfgs[i].nitems;
81  ring->cursize = ring->head = ring->tail = 0;
82  if (cfg->ring_cfgs[i].data)
83  ring->data = cfg->ring_cfgs[i].data;
84  else
85  {
86  ring->data = rings_ptr;
87  rings_ptr += (uword) ring->nitems * ring->elsize;
88  }
89  }
90 
91  return mq;
92 }
93 
94 void
96 {
97  svm_queue_free (mq->q);
98  clib_mem_free (mq);
99 }
100 
103 {
104  svm_msg_q_msg_t msg;
105  svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, ring_index);
106 
107  ASSERT (ring->cursize < ring->nitems);
108  msg.ring_index = ring - mq->rings;
109  msg.elt_index = ring->tail;
110  ring->tail = (ring->tail + 1) % ring->nitems;
111  clib_atomic_fetch_add (&ring->cursize, 1);
112  return msg;
113 }
114 
115 int
117  u8 noblock, svm_msg_q_msg_t * msg)
118 {
119  if (noblock)
120  {
121  if (svm_msg_q_try_lock (mq))
122  return -1;
123  if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, ring_index)))
124  {
125  svm_msg_q_unlock (mq);
126  return -2;
127  }
128  *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
130  {
131  svm_msg_q_unlock (mq);
132  return -2;
133  }
134  }
135  else
136  {
137  svm_msg_q_lock (mq);
138  while (svm_msg_q_ring_is_full (mq, ring_index))
139  svm_msg_q_wait (mq);
140  *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
141  }
142  return 0;
143 }
144 
147 {
148  svm_msg_q_msg_t msg = {.as_u64 = ~0 };
149  svm_msg_q_ring_t *ring;
150 
151  vec_foreach (ring, mq->rings)
152  {
153  if (ring->elsize < nbytes || ring->cursize == ring->nitems)
154  continue;
155  msg.ring_index = ring - mq->rings;
156  msg.elt_index = ring->tail;
157  ring->tail = (ring->tail + 1) % ring->nitems;
158  clib_atomic_fetch_add (&ring->cursize, 1);
159  break;
160  }
161  return msg;
162 }
163 
164 void *
166 {
168  return svm_msg_q_ring_data (ring, msg->elt_index);
169 }
170 
171 void
173 {
174  svm_msg_q_ring_t *ring;
175 
176  ASSERT (vec_len (mq->rings) > msg->ring_index);
177  ring = &mq->rings[msg->ring_index];
178  if (msg->elt_index == ring->head)
179  {
180  ring->head = (ring->head + 1) % ring->nitems;
181  }
182  else
183  {
184  clib_warning ("message out of order");
185  /* for now, expect messages to be processed in order */
186  ASSERT (0);
187  }
188  clib_atomic_fetch_sub (&ring->cursize, 1);
189 }
190 
191 static int
193 {
194  u32 dist1, dist2, tail, head;
195  svm_msg_q_ring_t *ring;
196 
197  if (vec_len (mq->rings) <= msg->ring_index)
198  return 0;
199  ring = &mq->rings[msg->ring_index];
200  tail = ring->tail;
201  head = ring->head;
202 
203  dist1 = ((ring->nitems + msg->elt_index) - head) % ring->nitems;
204  if (tail == head)
205  dist2 = (ring->cursize == 0) ? 0 : ring->nitems;
206  else
207  dist2 = ((ring->nitems + tail) - head) % ring->nitems;
208  return (dist1 < dist2);
209 }
210 
211 int
212 svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait)
213 {
214  ASSERT (svm_msq_q_msg_is_valid (mq, msg));
215  return svm_queue_add (mq->q, (u8 *) msg, nowait);
216 }
217 
218 void
220 {
221  ASSERT (svm_msq_q_msg_is_valid (mq, msg));
222  svm_queue_add_raw (mq->q, (u8 *) msg);
223  svm_msg_q_unlock (mq);
224 }
225 
226 int
228  svm_q_conditional_wait_t cond, u32 time)
229 {
230  return svm_queue_sub (mq->q, (u8 *) msg, cond, time);
231 }
232 
233 void
235 {
236  svm_queue_sub_raw (mq->q, (u8 *) msg);
237 }
238 
239 void
241 {
242  mq->q->consumer_evtfd = fd;
243 }
244 
245 void
247 {
248  mq->q->producer_evtfd = fd;
249 }
250 
251 int
253 {
254  int fd;
255  if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
256  return -1;
258  return 0;
259 }
260 
261 int
263 {
264  int fd;
265  if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
266  return -1;
268  return 0;
269 }
270 
271 /*
272  * fd.io coding-style-patch-verification: ON
273  *
274  * Local Variables:
275  * eval: (c-set-style "gnu")
276  * End:
277  */
void svm_queue_add_raw(svm_queue_t *q, u8 *elem)
Add element to queue with mutex held.
Definition: queue.c:220
static u8 svm_msg_q_msg_is_invalid(svm_msg_q_msg_t *msg)
Check if message is invalid.
svm_msg_q_ring_t * rings
rings with message data
Definition: message_queue.h:40
void * svm_msg_q_msg_data(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Get data for message in queue.
static u8 svm_msg_q_ring_is_full(svm_msg_q_t *mq, u32 ring_index)
int svm_queue_add(svm_queue_t *q, u8 *elem, int nowait)
Definition: queue.c:239
svm_msg_q_msg_t svm_msg_q_alloc_msg(svm_msg_q_t *mq, u32 nbytes)
Allocate message buffer.
#define clib_atomic_fetch_sub(a, b)
Definition: atomics.h:24
for(i=1;i<=collision_buckets;i++)
int i
volatile u32 head
current head (for dequeue)
Definition: message_queue.h:31
unsigned char u8
Definition: types.h:56
static svm_msg_q_ring_t * svm_msg_q_ring_inline(svm_msg_q_t *mq, u32 ring_index)
Definition: message_queue.c:21
int svm_msg_q_lock_and_alloc_msg_w_ring(svm_msg_q_t *mq, u32 ring_index, u8 noblock, svm_msg_q_msg_t *msg)
Lock message queue and allocate message buffer on ring.
int svm_queue_sub(svm_queue_t *q, u8 *elem, svm_q_conditional_wait_t cond, u32 time)
Definition: queue.c:348
static uword vec_header_bytes(uword header_bytes)
Definition: vec_bootstrap.h:80
svm_msg_q_t * svm_msg_q_alloc(svm_msg_q_cfg_t *cfg)
Allocate message queue.
Definition: message_queue.c:40
#define vec_elt_at_index(v, i)
Get vector value at index i checking that i is in bounds.
volatile u32 tail
current tail (for enqueue)
Definition: message_queue.h:32
unsigned int u32
Definition: types.h:88
int svm_msg_q_sub(svm_msg_q_t *mq, svm_msg_q_msg_t *msg, svm_q_conditional_wait_t cond, u32 time)
Consumer dequeue one message from queue.
void svm_msg_q_free(svm_msg_q_t *mq)
Free message queue.
Definition: message_queue.c:95
int svm_msg_q_alloc_consumer_eventfd(svm_msg_q_t *mq)
Allocate event fd for queue consumer.
void svm_msg_q_set_producer_eventfd(svm_msg_q_t *mq, int fd)
Set event fd for queue producer.
#define PREDICT_FALSE(x)
Definition: clib.h:111
svm_msg_q_ring_t * svm_msg_q_ring(svm_msg_q_t *mq, u32 ring_index)
Get message queue ring.
Definition: message_queue.c:27
static int svm_msq_q_msg_is_valid(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
static void svm_msg_q_wait(svm_msg_q_t *mq)
Wait for message queue event.
volatile u32 cursize
current size of the ring
Definition: message_queue.h:29
struct svm_msg_q_ring_ svm_msg_q_ring_t
u32 n_rings
number of msg rings
Definition: message_queue.h:54
svm_q_conditional_wait_t
Definition: queue.h:40
Unidirectional shared-memory multi-ring message queue.
static void svm_msg_q_unlock(svm_msg_q_t *mq)
Unlock message queue.
#define clib_warning(format, args...)
Definition: error.h:59
svm_queue_t * q
queue for exchanging messages
Definition: message_queue.h:39
int svm_msg_q_add(svm_msg_q_t *mq, svm_msg_q_msg_t *msg, int nowait)
Producer enqueue one message to queue.
u32 elt_index
index in ring
Definition: message_queue.h:63
u32 len
Number of elements in vector (NOT its allocated length).
Definition: vec_bootstrap.h:60
static int svm_msg_q_try_lock(svm_msg_q_t *mq)
Try locking message queue.
void svm_queue_free(svm_queue_t *q)
Definition: queue.c:89
u32 ring_index
ring index, could be u8
Definition: message_queue.h:62
#define ASSERT(truth)
void svm_msg_q_add_and_unlock(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Producer enqueue one message to queue with mutex held.
static void clib_mem_free(void *p)
Definition: mem.h:205
vector header structure
Definition: vec_bootstrap.h:55
struct svm_msg_q_ svm_msg_q_t
u8 * data
chunk of memory for msg data
Definition: message_queue.h:34
int svm_queue_sub_raw(svm_queue_t *q, u8 *elem)
Definition: queue.c:443
void svm_msg_q_set_consumer_eventfd(svm_msg_q_t *mq, int fd)
Set event fd for queue consumer.
svm_msg_q_ring_cfg_t * ring_cfgs
array of ring cfgs
Definition: message_queue.h:55
#define clib_atomic_fetch_add(a, b)
Definition: atomics.h:23
u32 q_nitems
msg queue size (not rings)
Definition: message_queue.h:53
void svm_msg_q_free_msg(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Free message buffer.
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
u64 uword
Definition: types.h:112
svm_queue_t * svm_queue_init(void *base, int nels, int elsize)
Definition: queue.c:33
struct _svm_queue svm_queue_t
u32 elsize
size of an element
Definition: message_queue.h:33
static void * clib_mem_alloc_aligned(uword size, uword align)
Definition: mem.h:140
#define vec_foreach(var, vec)
Vector iterator.
int consumer_pid
pid of msg consumer
Definition: message_queue.h:52
int svm_msg_q_alloc_producer_eventfd(svm_msg_q_t *mq)
Allocate event fd for queue consumer.
#define CLIB_CACHE_LINE_BYTES
Definition: cache.h:59
static int svm_msg_q_lock(svm_msg_q_t *mq)
Lock, or block trying, the message queue.
svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring(svm_msg_q_t *mq, u32 ring_index)
Allocate message buffer on ring.
static void * svm_msg_q_ring_data(svm_msg_q_ring_t *ring, u32 elt_index)
Definition: message_queue.c:33
void svm_msg_q_sub_w_lock(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Consumer dequeue one message from queue with mutex held.
u32 nitems
max size of the ring
Definition: message_queue.h:30