FD.io VPP  v17.04.2-2-ga8f93f8
Vector Packet Processing
pneum.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2016 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include <stdio.h>
16 #include <stdlib.h>
17 #include <stddef.h>
18 #include <sys/types.h>
19 #include <sys/socket.h>
20 #include <sys/mman.h>
21 #include <sys/stat.h>
22 #include <netinet/in.h>
23 #include <netdb.h>
24 #include <signal.h>
25 #include <stdbool.h>
26 #include <vnet/vnet.h>
27 #include <vlib/vlib.h>
28 #include <vlib/unix/unix.h>
29 #include <vlibapi/api.h>
30 #include <vlibmemory/api.h>
31 
32 #include <vpp/api/vpe_msg_enum.h>
33 
34 #include "pneum.h"
35 
36 /*
37  * Asynchronous mode:
38  * Client registers a callback. All messages are sent to the callback.
39  * Synchronous mode:
40  * Client calls blocking read().
41  * Clients are expected to collate events on a queue.
42  * pneum_write() -> suspends RX thread
43  * pneum_read() -> resumes RX thread
44  */
45 
46 #define vl_typedefs /* define message structures */
47 #include <vpp/api/vpe_all_api_h.h>
48 #undef vl_typedefs
49 
50 #define vl_endianfun /* define message structures */
51 #include <vpp/api/vpe_all_api_h.h>
52 #undef vl_endianfun
53 
56 
57 typedef struct {
59  pthread_t rx_thread_handle;
61  pthread_mutex_t queue_lock;
62  pthread_cond_t suspend_cv;
63  pthread_cond_t resume_cv;
64  pthread_mutex_t timeout_lock;
65  pthread_cond_t timeout_cv;
66  pthread_cond_t timeout_cancel_cv;
67  pthread_cond_t terminate_cv;
68 } pneum_main_t;
69 
73 bool rx_is_running = false;
74 
75 static void
76 init (void)
77 {
78  pneum_main_t *pm = &pneum_main;
79  memset(pm, 0, sizeof(*pm));
80  pthread_mutex_init(&pm->queue_lock, NULL);
81  pthread_cond_init(&pm->suspend_cv, NULL);
82  pthread_cond_init(&pm->resume_cv, NULL);
83  pthread_mutex_init(&pm->timeout_lock, NULL);
84  pthread_cond_init(&pm->timeout_cv, NULL);
85  pthread_cond_init(&pm->timeout_cancel_cv, NULL);
86  pthread_cond_init(&pm->terminate_cv, NULL);
87 }
88 
89 static void
90 cleanup (void)
91 {
92  pneum_main_t *pm = &pneum_main;
93  pthread_cond_destroy(&pm->suspend_cv);
94  pthread_cond_destroy(&pm->resume_cv);
95  pthread_cond_destroy(&pm->timeout_cv);
96  pthread_cond_destroy(&pm->timeout_cancel_cv);
97  pthread_cond_destroy(&pm->terminate_cv);
98  pthread_mutex_destroy(&pm->queue_lock);
99  pthread_mutex_destroy(&pm->timeout_lock);
100  memset (pm, 0, sizeof (*pm));
101 }
102 
103 /*
104  * Satisfy external references when -lvlib is not available.
105  */
106 void vlib_cli_output (struct vlib_main_t * vm, char * fmt, ...)
107 {
108  clib_warning ("vlib_cli_output called...");
109 }
110 
111 void
112 pneum_free (void * msg)
113 {
114  vl_msg_api_free (msg);
115 }
116 
117 static void
118 pneum_api_handler (void *msg)
119 {
120  u16 id = ntohs(*((u16 *)msg));
121  msgbuf_t *msgbuf = (msgbuf_t *)(((u8 *)msg) - offsetof(msgbuf_t, data));
122  int l = ntohl(msgbuf->data_len);
123  if (l == 0)
124  clib_warning("Message ID %d has wrong length: %d\n", id, l);
125 
126  /* Call Python callback */
127  ASSERT(pneum_callback);
128  (pneum_callback)(msg, l);
129  pneum_free(msg);
130 }
131 
132 static void *
134 {
136  pneum_main_t *pm = &pneum_main;
137  api_main_t *am = &api_main;
138  uword msg;
139 
140  q = am->vl_input_queue;
141 
142  while (1)
143  while (!unix_shared_memory_queue_sub(q, (u8 *)&msg, 0))
144  {
145  u16 id = ntohs(*((u16 *)msg));
146  switch (id) {
147  case VL_API_RX_THREAD_EXIT:
148  vl_msg_api_free((void *) msg);
149  /* signal waiting threads that this thread is about to terminate */
150  pthread_mutex_lock(&pm->queue_lock);
151  pthread_cond_signal(&pm->terminate_cv);
152  pthread_mutex_unlock(&pm->queue_lock);
153  pthread_exit(0);
154  return 0;
155  break;
156 
157  case VL_API_MEMCLNT_RX_THREAD_SUSPEND:
158  vl_msg_api_free((void * )msg);
159  /* Suspend thread and signal reader */
160  pthread_mutex_lock(&pm->queue_lock);
161  pthread_cond_signal(&pm->suspend_cv);
162  /* Wait for the resume signal */
163  pthread_cond_wait (&pm->resume_cv, &pm->queue_lock);
164  pthread_mutex_unlock(&pm->queue_lock);
165  break;
166 
167  case VL_API_MEMCLNT_READ_TIMEOUT:
168  clib_warning("Received read timeout in async thread\n");
169  vl_msg_api_free((void *) msg);
170  break;
171 
172  default:
173  pneum_api_handler((void *)msg);
174  }
175  }
176 }
177 
178 static void *
180 {
182  pneum_main_t *pm = &pneum_main;
183  api_main_t *am = &api_main;
184  struct timespec ts;
185  struct timeval tv;
186  u16 timeout;
187  int rv;
188 
189  while (1)
190  {
191  /* Wait for poke */
192  pthread_mutex_lock(&pm->timeout_lock);
193  pthread_cond_wait (&pm->timeout_cv, &pm->timeout_lock);
194  timeout = read_timeout;
195  gettimeofday(&tv, NULL);
196  ts.tv_sec = tv.tv_sec + timeout;
197  ts.tv_nsec = 0;
198  rv = pthread_cond_timedwait (&pm->timeout_cancel_cv,
199  &pm->timeout_lock, &ts);
200  pthread_mutex_unlock(&pm->timeout_lock);
201  if (rv == ETIMEDOUT)
202  {
203  ep = vl_msg_api_alloc (sizeof (*ep));
204  ep->_vl_msg_id = ntohs(VL_API_MEMCLNT_READ_TIMEOUT);
206  }
207  }
208  pthread_exit(0);
209 }
210 
211 void
213 {
214  api_main_t *am = &api_main;
215  pneum_main_t *pm = &pneum_main;
217 
218  if (!pm->rx_thread_handle) return;
219  pthread_mutex_lock(&pm->queue_lock);
220  if (rx_is_running)
221  {
222  ep = vl_msg_api_alloc (sizeof (*ep));
223  ep->_vl_msg_id = ntohs(VL_API_MEMCLNT_RX_THREAD_SUSPEND);
225  /* Wait for RX thread to tell us it has suspendend */
226  pthread_cond_wait(&pm->suspend_cv, &pm->queue_lock);
227  rx_is_running = false;
228  }
229  pthread_mutex_unlock(&pm->queue_lock);
230 }
231 
232 void
234 {
235  pneum_main_t *pm = &pneum_main;
236  if (!pm->rx_thread_handle) return;
237  pthread_mutex_lock(&pm->queue_lock);
238  if (rx_is_running) goto unlock;
239  pthread_cond_signal(&pm->resume_cv);
240  rx_is_running = true;
241  unlock:
242  pthread_mutex_unlock(&pm->queue_lock);
243 }
244 
245 static uword *
247 {
248  api_main_t *am = &api_main;
249  return (am->msg_index_by_name_and_crc);
250 }
251 
252 int
254 {
255  api_main_t *am = &api_main;
257 }
258 
259 int
260 pneum_connect (char * name, char * chroot_prefix, pneum_callback_t cb,
261  int rx_qlen)
262 {
263  int rv = 0;
264  pneum_main_t *pm = &pneum_main;
265 
266  init();
267  if (chroot_prefix != NULL)
268  vl_set_memory_root_path (chroot_prefix);
269 
270  if ((rv = vl_client_api_map("/vpe-api"))) {
271  clib_warning ("vl_client_api map rv %d", rv);
272  return rv;
273  }
274 
275  if (vl_client_connect(name, 0, rx_qlen) < 0) {
277  return (-1);
278  }
279 
280  if (cb) {
281  /* Start the rx queue thread */
282  rv = pthread_create(&pm->rx_thread_handle, NULL, pneum_rx_thread_fn, 0);
283  if (rv) {
284  clib_warning("pthread_create returned %d", rv);
286  return (-1);
287  }
288  pneum_callback = cb;
289  rx_is_running = true;
290  }
291 
292  /* Start read timeout thread */
293  rv = pthread_create(&pm->timeout_thread_handle, NULL,
295  if (rv) {
296  clib_warning("pthread_create returned %d", rv);
298  return (-1);
299  }
300 
301  pm->connected_to_vlib = 1;
302 
303  return (0);
304 }
305 
306 int
308 {
309  api_main_t *am = &api_main;
310  pneum_main_t *pm = &pneum_main;
311 
312  if (!pm->connected_to_vlib) return 0;
313 
314  if (pm->rx_thread_handle) {
316  uword junk;
317  ep = vl_msg_api_alloc (sizeof (*ep));
318  ep->_vl_msg_id = ntohs(VL_API_RX_THREAD_EXIT);
320 
321  /* wait (with timeout) until RX thread has finished */
322  struct timespec ts;
323  struct timeval tv;
324  gettimeofday(&tv, NULL);
325  ts.tv_sec = tv.tv_sec + 5;
326  ts.tv_nsec = 0;
327  pthread_mutex_lock(&pm->queue_lock);
328  int rv = pthread_cond_timedwait(&pm->terminate_cv, &pm->queue_lock, &ts);
329  pthread_mutex_unlock(&pm->queue_lock);
330  /* now join so we wait until thread has -really- finished */
331  if (rv == ETIMEDOUT)
332  pthread_cancel(pm->rx_thread_handle);
333  else
334  pthread_join(pm->rx_thread_handle, (void **) &junk);
335  }
336  if (pm->timeout_thread_handle)
337  pthread_cancel(pm->timeout_thread_handle);
338 
341  pneum_callback = 0;
342 
343  cleanup();
344 
345  return (0);
346 }
347 
348 static void
349 set_timeout (unsigned short timeout)
350 {
351  pneum_main_t *pm = &pneum_main;
352  pthread_mutex_lock(&pm->timeout_lock);
353  read_timeout = timeout;
354  pthread_cond_signal(&pm->timeout_cv);
355  pthread_mutex_unlock(&pm->timeout_lock);
356 }
357 
358 static void
360 {
361  pneum_main_t *pm = &pneum_main;
362  pthread_mutex_lock(&pm->timeout_lock);
363  pthread_cond_signal(&pm->timeout_cancel_cv);
364  pthread_mutex_unlock(&pm->timeout_lock);
365 }
366 
367 int
368 pneum_read (char **p, int *l, u16 timeout)
369 {
371  api_main_t *am = &api_main;
372  pneum_main_t *pm = &pneum_main;
373  uword msg;
374  msgbuf_t *msgbuf;
375 
376  if (!pm->connected_to_vlib) return -1;
377 
378  *l = 0;
379 
380  if (am->our_pid == 0) return (-1);
381 
382  /* Poke timeout thread */
383  if (timeout)
384  set_timeout(timeout);
385 
386  q = am->vl_input_queue;
387  int rv = unix_shared_memory_queue_sub(q, (u8 *)&msg, 0);
388  if (rv == 0) {
389  u16 msg_id = ntohs(*((u16 *)msg));
390  switch (msg_id) {
391  case VL_API_RX_THREAD_EXIT:
392  printf("Received thread exit\n");
393  return -1;
394  case VL_API_MEMCLNT_RX_THREAD_SUSPEND:
395  printf("Received thread suspend\n");
396  goto error;
397  case VL_API_MEMCLNT_READ_TIMEOUT:
398  printf("Received read timeout %ds\n", timeout);
399  goto error;
400 
401  default:
402  msgbuf = (msgbuf_t *)(((u8 *)msg) - offsetof(msgbuf_t, data));
403  *l = ntohl(msgbuf->data_len);
404  if (*l == 0) {
405  printf("Unregistered API message: %d\n", msg_id);
406  goto error;
407  }
408  }
409  *p = (char *)msg;
410 
411  /* Let timeout notification thread know we're done */
412  unset_timeout();
413 
414  } else {
415  printf("Read failed with %d\n", rv);
416  }
417  return (rv);
418 
419  error:
420  vl_msg_api_free((void *) msg);
421  /* Client might forget to resume RX thread on failure */
422  pneum_rx_resume ();
423  return -1;
424 }
425 
426 /*
427  * XXX: Makes the assumption that client_index is the first member
428  */
429 typedef VL_API_PACKED(struct _vl_api_header {
430  u16 _vl_msg_id;
431  u32 client_index;
432 }) vl_api_header_t;
433 
434 static unsigned int
435 pneum_client_index (void)
436 {
437  return (api_main.my_client_index);
438 }
439 
440 int
441 pneum_write (char *p, int l)
442 {
443  int rv = -1;
444  api_main_t *am = &api_main;
445  vl_api_header_t *mp = vl_msg_api_alloc(l);
447  pneum_main_t *pm = &pneum_main;
448 
449  if (!pm->connected_to_vlib) return -1;
450  if (!mp) return (-1);
451 
452  memcpy(mp, p, l);
453  mp->client_index = pneum_client_index();
454  q = am->shmem_hdr->vl_input_queue;
455  rv = unix_shared_memory_queue_add(q, (u8 *)&mp, 0);
456  if (rv != 0) {
457  clib_warning("vpe_api_write fails: %d\n", rv);
458  /* Clear message */
459  pneum_free(mp);
460  }
461  return (rv);
462 }
463 
464 int
465 pneum_get_msg_index (unsigned char * name)
466 {
467  return vl_api_get_msg_index (name);
468 }
469 
470 int
472 {
473  int max = 0;
474  hash_pair_t *hp;
476  hash_foreach_pair (hp, h,
477  ({
478  if (hp->value[0] > max)
479  max = hp->value[0];
480  }));
481 
482  return max;
483 }
484 
485 void
487 {
488  if (cb) clib_error_register_handler (cb, 0);
489 }
pthread_t rx_thread_handle
Definition: pneum.c:59
pthread_cond_t timeout_cancel_cv
Definition: pneum.c:66
pthread_cond_t suspend_cv
Definition: pneum.c:62
void vl_msg_api_send_shmem(unix_shared_memory_queue_t *q, u8 *elem)
u8 connected_to_vlib
Definition: pneum.c:58
unix_shared_memory_queue_t * vl_input_queue
Definition: api.h:73
int my_client_index
Definition: api.h:185
#define NULL
Definition: clib.h:55
void pneum_rx_suspend(void)
Definition: pneum.c:212
int pneum_get_msg_index(unsigned char *name)
Definition: pneum.c:465
void clib_error_register_handler(clib_error_handler_func_t func, void *arg)
Definition: error.c:75
vlib_main_t ** vlib_mains
Definition: pneum.c:55
static void cleanup(void)
Definition: pneum.c:90
unix_shared_memory_queue_t * vl_input_queue
Definition: api.h:179
uword value[0]
Definition: hash.h:164
api_main_t api_main
Definition: api_shared.c:35
int our_pid
Definition: api.h:141
static void * pneum_rx_thread_fn(void *arg)
Definition: pneum.c:133
pneum_main_t pneum_main
Definition: pneum.c:70
void vl_msg_api_free(void *)
int pneum_connect(char *name, char *chroot_prefix, pneum_callback_t cb, int rx_qlen)
Definition: pneum.c:260
pthread_mutex_t queue_lock
Definition: pneum.c:61
struct vl_shmem_hdr_ * shmem_hdr
Definition: api.h:144
void pneum_rx_resume(void)
Definition: pneum.c:233
vlib_main_t vlib_global_main
Definition: pneum.c:54
int unix_shared_memory_queue_add(unix_shared_memory_queue_t *q, u8 *elem, int nowait)
int vl_client_api_map(char *region_name)
void vl_set_memory_root_path(char *root_path)
void(* pneum_callback_t)(unsigned char *data, int len)
Definition: pneum.h:21
int pneum_read(char **p, int *l, u16 timeout)
Definition: pneum.c:368
static void * pneum_timeout_thread_fn(void *arg)
Definition: pneum.c:179
int pneum_disconnect(void)
Definition: pneum.c:307
pthread_t timeout_thread_handle
Definition: pneum.c:60
int unix_shared_memory_queue_sub(unix_shared_memory_queue_t *q, u8 *elem, int nowait)
void * vl_msg_api_alloc(int nbytes)
static uword * pneum_msg_table_get_hash(void)
Definition: pneum.c:246
vlib_main_t * vm
Definition: buffer.c:276
vec_header_t h
Definition: buffer.c:275
void(* pneum_error_callback_t)(void *, unsigned char *, int)
Definition: pneum.h:22
static void pneum_api_handler(void *msg)
Definition: pneum.c:118
#define clib_warning(format, args...)
Definition: error.h:59
bool rx_is_running
Definition: pneum.c:73
void vlib_cli_output(struct vlib_main_t *vm, char *fmt,...)
Definition: pneum.c:106
u32 vl_api_get_msg_index(u8 *name_and_crc)
static void init(void)
Definition: pneum.c:76
int vl_client_connect(char *name, int ctx_quota, int input_queue_size)
static uword hash_elts(void *v)
Definition: hash.h:117
#define ASSERT(truth)
unsigned int u32
Definition: types.h:88
u16 read_timeout
Definition: pneum.c:72
u32 data_len
Definition: api.h:230
Definition: api.h:227
void pneum_set_error_handler(pneum_error_callback_t cb)
Definition: pneum.c:486
void pneum_free(void *msg)
Definition: pneum.c:112
static void set_timeout(unsigned short timeout)
Definition: pneum.c:349
u64 uword
Definition: types.h:112
void vl_client_disconnect(void)
void vl_client_api_unmap(void)
typedef VL_API_PACKED(struct _vl_api_header{u16 _vl_msg_id;u32 client_index;})
Definition: pneum.c:429
unsigned short u16
Definition: types.h:57
pneum_callback_t pneum_callback
Definition: pneum.c:71
unsigned char u8
Definition: types.h:56
#define hash_foreach_pair(p, v, body)
Iterate over hash pairs.
Definition: hash.h:349
int pneum_msg_table_max_index(void)
Definition: pneum.c:471
pthread_cond_t terminate_cv
Definition: pneum.c:67
pthread_mutex_t timeout_lock
Definition: pneum.c:64
int pneum_msg_table_size(void)
Definition: pneum.c:253
uword * msg_index_by_name_and_crc
Definition: api.h:196
pthread_cond_t timeout_cv
Definition: pneum.c:65
int pneum_write(char *p, int l)
Definition: pneum.c:441
static void unset_timeout(void)
Definition: pneum.c:359
struct _unix_shared_memory_queue unix_shared_memory_queue_t
pthread_cond_t resume_cv
Definition: pneum.c:63