FD.io VPP  v18.07.1-19-g511ce25
Vector Packet Processing
message_queue.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2018 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <svm/message_queue.h>
17 #include <vppinfra/mem.h>
18 
21 {
22  svm_msg_q_ring_t *ring;
23  svm_msg_q_t *mq;
24  uword size;
25  int i;
26 
27  if (!cfg)
28  return 0;
29 
31  memset (mq, 0, sizeof (*mq));
32  mq->q = svm_queue_init (cfg->q_nitems, sizeof (svm_msg_q_msg_t),
33  cfg->consumer_pid, 0);
34  vec_validate (mq->rings, cfg->n_rings - 1);
35  for (i = 0; i < cfg->n_rings; i++)
36  {
37  ring = &mq->rings[i];
38  ring->elsize = cfg->ring_cfgs[i].elsize;
39  ring->nitems = cfg->ring_cfgs[i].nitems;
40  if (cfg->ring_cfgs[i].data)
41  ring->data = cfg->ring_cfgs[i].data;
42  else
43  {
44  size = (uword) ring->nitems * ring->elsize;
46  }
47  }
48 
49  return mq;
50 }
51 
52 void
54 {
55  svm_msg_q_ring_t *ring;
56 
57  vec_foreach (ring, mq->rings)
58  {
59  clib_mem_free (ring->data);
60  }
61  vec_free (mq->rings);
62  clib_mem_free (mq);
63 }
64 
67 {
68  svm_msg_q_msg_t msg = {.as_u64 = ~0 };
69  svm_msg_q_ring_t *ring;
70 
71  vec_foreach (ring, mq->rings)
72  {
73  if (ring->elsize < nbytes || ring->cursize == ring->nitems)
74  continue;
75  msg.ring_index = ring - mq->rings;
76  msg.elt_index = ring->tail;
77  ring->tail = (ring->tail + 1) % ring->nitems;
78  __sync_fetch_and_add (&ring->cursize, 1);
79  break;
80  }
81  return msg;
82 }
83 
84 static inline svm_msg_q_ring_t *
86 {
87  return vec_elt_at_index (mq->rings, ring_index);
88 }
89 
90 static inline void *
92 {
93  ASSERT (elt_index < ring->nitems);
94  return (ring->data + elt_index * ring->elsize);
95 }
96 
97 void *
99 {
101  return svm_msg_q_ring_data (ring, msg->elt_index);
102 }
103 
104 void
106 {
107  svm_msg_q_ring_t *ring;
108 
109  if (vec_len (mq->rings) <= msg->ring_index)
110  return;
111  ring = &mq->rings[msg->ring_index];
112  if (msg->elt_index == ring->head)
113  {
114  ring->head = (ring->head + 1) % ring->nitems;
115  }
116  else
117  {
118  /* for now, expect messages to be processed in order */
119  ASSERT (0);
120  }
121  __sync_fetch_and_sub (&ring->cursize, 1);
122 }
123 
124 static int
126 {
127  svm_msg_q_ring_t *ring;
128  u32 dist1, dist2;
129 
130  if (vec_len (mq->rings) <= msg->ring_index)
131  return 0;
132  ring = &mq->rings[msg->ring_index];
133 
134  dist1 = ((ring->nitems + msg->ring_index) - ring->head) % ring->nitems;
135  if (ring->tail == ring->head)
136  dist2 = (ring->cursize == 0) ? 0 : ring->nitems;
137  else
138  dist2 = ((ring->nitems + ring->tail) - ring->head) % ring->nitems;
139  return (dist1 < dist2);
140 }
141 
142 int
144 {
145  ASSERT (svm_msq_q_msg_is_valid (mq, &msg));
146  return svm_queue_add (mq->q, (u8 *) & msg, nowait);
147 }
148 
149 int
151  svm_q_conditional_wait_t cond, u32 time)
152 {
153  return svm_queue_sub (mq->q, (u8 *) msg, cond, time);
154 }
155 
156 /*
157  * fd.io coding-style-patch-verification: ON
158  *
159  * Local Variables:
160  * eval: (c-set-style "gnu")
161  * End:
162  */
#define vec_validate(V, I)
Make sure vector is long enough for given index (no header, unspecified alignment) ...
Definition: vec.h:437
svm_msg_q_ring_t * rings
rings with message data
Definition: message_queue.h:39
void * svm_msg_q_msg_data(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Get data for message in queu.
Definition: message_queue.c:98
int svm_queue_add(svm_queue_t *q, u8 *elem, int nowait)
Definition: queue.c:184
svm_msg_q_msg_t svm_msg_q_alloc_msg(svm_msg_q_t *mq, u32 nbytes)
Allocate message buffer.
Definition: message_queue.c:66
int i
unsigned char u8
Definition: types.h:56
svm_queue_t * svm_queue_init(int nels, int elsize, int consumer_pid, int signal_when_queue_non_empty)
Definition: queue.c:51
static svm_msg_q_ring_t * svm_msg_q_get_ring(svm_msg_q_t *mq, u32 ring_index)
Definition: message_queue.c:85
int svm_queue_sub(svm_queue_t *q, u8 *elem, svm_q_conditional_wait_t cond, u32 time)
Definition: queue.c:303
svm_msg_q_t * svm_msg_q_alloc(svm_msg_q_cfg_t *cfg)
Allocate message queue.
Definition: message_queue.c:20
#define vec_elt_at_index(v, i)
Get vector value at index i checking that i is in bounds.
unsigned int u32
Definition: types.h:88
int svm_msg_q_sub(svm_msg_q_t *mq, svm_msg_q_msg_t *msg, svm_q_conditional_wait_t cond, u32 time)
Consumer dequeue one message from queue.
void svm_msg_q_free(svm_msg_q_t *mq)
Free message queue.
Definition: message_queue.c:53
uword size
static int svm_msq_q_msg_is_valid(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
volatile u32 cursize
current size of the ring
Definition: message_queue.h:28
u32 n_rings
number of msg rings
Definition: message_queue.h:53
svm_q_conditional_wait_t
Definition: queue.h:39
u32 head
current head (for dequeue)
Definition: message_queue.h:30
Unidirectional shared-memory multi-ring message queue.
#define vec_free(V)
Free vector&#39;s memory (no header).
Definition: vec.h:339
svm_queue_t * q
queue for exchanging messages
Definition: message_queue.h:38
u32 elt_index
index in ring
Definition: message_queue.h:62
u32 ring_index
ring index, could be u8
Definition: message_queue.h:61
#define ASSERT(truth)
static void clib_mem_free(void *p)
Definition: mem.h:179
u8 * data
chunk of memory for msg data
Definition: message_queue.h:33
svm_msg_q_ring_cfg_t * ring_cfgs
array of ring cfgs
Definition: message_queue.h:54
u32 q_nitems
msg queue size (not rings)
Definition: message_queue.h:52
void svm_msg_q_free_msg(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Free message buffer.
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
u64 uword
Definition: types.h:112
u32 elsize
size of an element
Definition: message_queue.h:32
static void * clib_mem_alloc_aligned(uword size, uword align)
Definition: mem.h:120
u32 tail
current tail (for enqueue)
Definition: message_queue.h:31
#define vec_foreach(var, vec)
Vector iterator.
int consumer_pid
pid of msg consumer
Definition: message_queue.h:51
#define CLIB_CACHE_LINE_BYTES
Definition: cache.h:62
int svm_msg_q_add(svm_msg_q_t *mq, svm_msg_q_msg_t msg, int nowait)
Producer enqueue one message to queue.
static void * svm_msg_q_ring_data(svm_msg_q_ring_t *ring, u32 elt_index)
Definition: message_queue.c:91
u32 nitems
max size of the ring
Definition: message_queue.h:29