FD.io VPP  v16.06
Vector Packet Processing
unix_shared_memory_queue.c
Go to the documentation of this file.
1 /*
2  *------------------------------------------------------------------
3  * unix_shared_memory_queue.c - unidirectional shared-memory queues
4  *
5  * Copyright (c) 2009 Cisco and/or its affiliates.
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at:
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *------------------------------------------------------------------
18  */
19 
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <pthread.h>
24 #include <vppinfra/mem.h>
25 #include <vppinfra/format.h>
26 #include <vppinfra/cache.h>
28 #include <signal.h>
29 
30 /*
31  * unix_shared_memory_queue_init
32  *
33  * nels = number of elements on the queue
34  * elsize = element size, presumably 4 and cacheline-size will
35  * be popular choices.
36  * coid = consumer coid, from ChannelCreate
37  * pid = consumer pid
38  * pulse_code = pulse code consumer expects
39  * pulse_value = pulse value consumer expects
40  * consumer_prio = consumer's priority, so pulses won't change
41  * the consumer's priority.
42  *
43  * The idea is to call this function in the queue consumer,
44  * and e-mail the queue pointer to the producer(s).
45  *
46  * The spp process / main thread allocates one of these
47  * at startup; its main input queue. The spp main input queue
48  * has a pointer to it in the shared memory segment header.
49  *
50  * You probably want to be on an svm data heap before calling this
51  * function.
52  */
55  int elsize,
56  int consumer_pid,
57  int signal_when_queue_non_empty)
58 {
60  pthread_mutexattr_t attr;
61  pthread_condattr_t cattr;
62 
64  + nels*elsize, CLIB_CACHE_LINE_BYTES);
65  memset(q, 0, sizeof (*q));
66 
67  q->elsize = elsize;
68  q->maxsize = nels;
69  q->consumer_pid = consumer_pid;
70  q->signal_when_queue_non_empty = signal_when_queue_non_empty;
71 
72  memset(&attr,0,sizeof(attr));
73  memset(&cattr,0,sizeof(attr));
74 
75  if (pthread_mutexattr_init(&attr))
76  clib_unix_warning("mutexattr_init");
77  if (pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED))
78  clib_unix_warning("pthread_mutexattr_setpshared");
79  if (pthread_mutex_init(&q->mutex, &attr))
80  clib_unix_warning("mutex_init");
81  if (pthread_mutexattr_destroy(&attr))
82  clib_unix_warning("mutexattr_destroy");
83  if (pthread_condattr_init(&cattr))
84  clib_unix_warning("condattr_init");
85  /* prints funny-looking messages in the Linux target */
86  if (pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED))
87  clib_unix_warning("condattr_setpshared");
88  if (pthread_cond_init(&q->condvar, &cattr))
89  clib_unix_warning("cond_init1");
90  if(pthread_condattr_destroy(&cattr))
91  clib_unix_warning("cond_init2");
92 
93  return(q);
94 }
95 
96 /*
97  * unix_shared_memory_queue_free
98  */
100 {
101  (void) pthread_mutex_destroy(&q->mutex);
102  (void) pthread_cond_destroy(&q->condvar);
103  clib_mem_free(q);
104 }
105 
107 {
108  pthread_mutex_lock(&q->mutex);
109 }
110 
112 {
113  pthread_mutex_unlock(&q->mutex);
114 }
115 
117 {
118  return q->cursize == q->maxsize;
119 }
120 
121 /*
122  * unix_shared_memory_queue_add_nolock
123  */
125  u8 *elem)
126 {
127  i8 *tailp;
128  int need_broadcast=0;
129 
130  if (PREDICT_FALSE(q->cursize == q->maxsize)) {
131  while(q->cursize == q->maxsize) {
132  (void) pthread_cond_wait(&q->condvar, &q->mutex);
133  }
134  }
135 
136  tailp = (i8 *)(&q->data[0] + q->elsize*q->tail);
137  clib_memcpy(tailp, elem, q->elsize);
138 
139  q->tail++;
140  q->cursize++;
141 
142  need_broadcast = (q->cursize == 1);
143 
144  if (q->tail == q->maxsize)
145  q->tail = 0;
146 
147  if (need_broadcast) {
148  (void) pthread_cond_broadcast(&q->condvar);
149  if (q->signal_when_queue_non_empty)
150  kill (q->consumer_pid, q->signal_when_queue_non_empty);
151  }
152  return 0;
153 }
154 
156  u8 *elem)
157 {
158  i8 *tailp;
159 
160  if (PREDICT_FALSE(q->cursize == q->maxsize)) {
161  while(q->cursize == q->maxsize)
162  ;
163  }
164 
165  tailp = (i8 *)(&q->data[0] + q->elsize*q->tail);
166  clib_memcpy(tailp, elem, q->elsize);
167 
168  q->tail++;
169  q->cursize++;
170 
171  if (q->tail == q->maxsize)
172  q->tail = 0;
173  return 0;
174 }
175 
176 
177 /*
178  * unix_shared_memory_queue_add
179  */
181  u8 *elem, int nowait)
182 {
183  i8 *tailp;
184  int need_broadcast=0;
185 
186  if (nowait) {
187  /* zero on success */
188  if (pthread_mutex_trylock (&q->mutex)) {
189  return (-1);
190  }
191  } else
192  pthread_mutex_lock(&q->mutex);
193 
194  if (PREDICT_FALSE(q->cursize == q->maxsize)) {
195  if (nowait) {
196  pthread_mutex_unlock(&q->mutex);
197  return (-2);
198  }
199  while(q->cursize == q->maxsize) {
200  (void) pthread_cond_wait(&q->condvar, &q->mutex);
201  }
202  }
203 
204  tailp = (i8 *)(&q->data[0] + q->elsize*q->tail);
205  clib_memcpy(tailp, elem, q->elsize);
206 
207  q->tail++;
208  q->cursize++;
209 
210  need_broadcast = (q->cursize == 1);
211 
212  if (q->tail == q->maxsize)
213  q->tail = 0;
214 
215  if (need_broadcast) {
216  (void) pthread_cond_broadcast(&q->condvar);
217  if (q->signal_when_queue_non_empty)
218  kill (q->consumer_pid, q->signal_when_queue_non_empty);
219  }
220  pthread_mutex_unlock(&q->mutex);
221 
222  return 0;
223 }
224 
225 /*
226  * unix_shared_memory_queue_sub
227  */
229  u8 *elem, int nowait)
230 {
231  i8 *headp;
232  int need_broadcast=0;
233 
234  if (nowait) {
235  /* zero on success */
236  if (pthread_mutex_trylock (&q->mutex)) {
237  return (-1);
238  }
239  } else
240  pthread_mutex_lock(&q->mutex);
241 
242  if (PREDICT_FALSE(q->cursize == 0)) {
243  if (nowait) {
244  pthread_mutex_unlock(&q->mutex);
245  return (-2);
246  }
247  while (q->cursize == 0) {
248  (void) pthread_cond_wait(&q->condvar, &q->mutex);
249  }
250  }
251 
252  headp = (i8 *)(&q->data[0] + q->elsize*q->head);
253  clib_memcpy(elem, headp, q->elsize);
254 
255  q->head++;
256  if (q->cursize == q->maxsize)
257  need_broadcast = 1;
258 
259  q->cursize--;
260 
261  if(q->head == q->maxsize)
262  q->head = 0;
263 
264  if (need_broadcast)
265  (void) pthread_cond_broadcast(&q->condvar);
266 
267  pthread_mutex_unlock(&q->mutex);
268 
269  return 0;
270 }
271 
273  u8 *elem)
274 {
275  i8 *headp;
276 
277  if (PREDICT_FALSE(q->cursize == 0)) {
278  while (q->cursize == 0)
279  ;
280  }
281 
282  headp = (i8 *)(&q->data[0] + q->elsize*q->head);
283  clib_memcpy(elem, headp, q->elsize);
284 
285  q->head++;
286  q->cursize--;
287 
288  if(q->head == q->maxsize)
289  q->head = 0;
290  return 0;
291 }
int unix_shared_memory_queue_is_full(unix_shared_memory_queue_t *q)
static void(BVT(clib_bihash)*h, BVT(clib_bihash_value)*v)
always_inline void clib_mem_free(void *p)
Definition: mem.h:149
void unix_shared_memory_queue_free(unix_shared_memory_queue_t *q)
char i8
Definition: types.h:45
int unix_shared_memory_queue_add_nolock(unix_shared_memory_queue_t *q, u8 *elem)
always_inline void * clib_mem_alloc_aligned(uword size, uword align)
Definition: mem.h:113
int unix_shared_memory_queue_add(unix_shared_memory_queue_t *q, u8 *elem, int nowait)
#define PREDICT_FALSE(x)
Definition: clib.h:97
int unix_shared_memory_queue_sub_raw(unix_shared_memory_queue_t *q, u8 *elem)
void unix_shared_memory_queue_lock(unix_shared_memory_queue_t *q)
int unix_shared_memory_queue_sub(unix_shared_memory_queue_t *q, u8 *elem, int nowait)
#define clib_memcpy(a, b, c)
Definition: string.h:63
#define clib_unix_warning(format, args...)
Definition: error.h:68
unix_shared_memory_queue_t * unix_shared_memory_queue_init(int nels, int elsize, int consumer_pid, int signal_when_queue_non_empty)
int unix_shared_memory_queue_add_raw(unix_shared_memory_queue_t *q, u8 *elem)
void unix_shared_memory_queue_unlock(unix_shared_memory_queue_t *q)
unsigned char u8
Definition: types.h:56
#define CLIB_CACHE_LINE_BYTES
Definition: cache.h:67
struct _unix_shared_memory_queue unix_shared_memory_queue_t