FD.io VPP  v21.01.1
Vector Packet Processing
client.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2016 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include <assert.h>
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <stddef.h>
19 #include <sys/types.h>
20 #include <sys/socket.h>
21 #include <sys/mman.h>
22 #include <sys/stat.h>
23 #include <netinet/in.h>
24 #include <netdb.h>
25 #include <signal.h>
26 #include <stdbool.h>
27 #include <vnet/vnet.h>
28 #include <vlib/vlib.h>
29 #include <vlib/unix/unix.h>
30 #include <vlibapi/api.h>
31 #include <vlibmemory/api.h>
32 
33 #include <vpp/api/vpe_msg_enum.h>
34 
35 #include "vppapiclient.h"
36 
40 
41 /*
42  * Asynchronous mode:
43  * Client registers a callback. All messages are sent to the callback.
44  * Synchronous mode:
45  * Client calls blocking read().
46  * Clients are expected to collate events on a queue.
47  * vac_write() -> suspends RX thread
48  * vac_read() -> resumes RX thread
49  */
50 
51 #define vl_typedefs /* define message structures */
52 #include <vpp/api/vpe_all_api_h.h>
53 #undef vl_typedefs
54 
55 #define vl_endianfun /* define message structures */
56 #include <vpp/api/vpe_all_api_h.h>
57 #undef vl_endianfun
58 
61 
62 typedef struct {
64  pthread_t rx_thread_handle;
66  pthread_mutex_t queue_lock;
67  pthread_cond_t suspend_cv;
68  pthread_cond_t resume_cv;
69  pthread_mutex_t timeout_lock;
71  pthread_cond_t timeout_cv;
72  pthread_cond_t timeout_cancel_cv;
73  pthread_cond_t terminate_cv;
74 } vac_main_t;
75 
79 bool rx_is_running = false;
81 
82 /* Only ever allocate one heap */
83 bool mem_initialized = false;
84 
85 static void
86 init (void)
87 {
88  vac_main_t *pm = &vac_main;
89  clib_memset(pm, 0, sizeof(*pm));
90  pthread_mutex_init(&pm->queue_lock, NULL);
91  pthread_cond_init(&pm->suspend_cv, NULL);
92  pthread_cond_init(&pm->resume_cv, NULL);
93  pthread_mutex_init(&pm->timeout_lock, NULL);
94  pm->timeout_loop = 1;
95  pthread_cond_init(&pm->timeout_cv, NULL);
96  pthread_cond_init(&pm->timeout_cancel_cv, NULL);
97  pthread_cond_init(&pm->terminate_cv, NULL);
98 }
99 
100 static void
101 cleanup (void)
102 {
103  vac_main_t *pm = &vac_main;
104  pthread_mutex_destroy(&pm->queue_lock);
105  pthread_cond_destroy(&pm->suspend_cv);
106  pthread_cond_destroy(&pm->resume_cv);
107  pthread_mutex_destroy(&pm->timeout_lock);
108  pthread_cond_destroy(&pm->timeout_cv);
109  pthread_cond_destroy(&pm->timeout_cancel_cv);
110  pthread_cond_destroy(&pm->terminate_cv);
111  clib_memset(pm, 0, sizeof(*pm));
112 }
113 
114 /*
115  * Satisfy external references when -lvlib is not available.
116  */
117 void vlib_cli_output (struct vlib_main_t * vm, char * fmt, ...)
118 {
119  clib_warning ("vlib_cli_output called...");
120 }
121 
122 void
123 vac_free (void * msg)
124 {
125  vl_msg_api_free (msg);
126 }
127 
128 static void
129 vac_api_handler (void *msg)
130 {
131  u16 id = ntohs(*((u16 *)msg));
132  msgbuf_t *msgbuf = (msgbuf_t *)(((u8 *)msg) - offsetof(msgbuf_t, data));
133  int l = ntohl(msgbuf->data_len);
134  if (l == 0)
135  clib_warning("Message ID %d has wrong length: %d\n", id, l);
136 
137  /* Call Python callback */
138  ASSERT(vac_callback);
139  (vac_callback)(msg, l);
140  vac_free(msg);
141 }
142 
143 static void *
144 vac_rx_thread_fn (void *arg)
145 {
146  svm_queue_t *q;
148  vl_api_memclnt_keepalive_reply_t *rmp;
149  vac_main_t *pm = &vac_main;
151  vl_shmem_hdr_t *shmem_hdr;
152  uword msg;
153 
154  q = am->vl_input_queue;
155 
156  while (1)
157  while (!svm_queue_sub(q, (u8 *)&msg, SVM_Q_WAIT, 0))
158  {
159  VL_MSG_API_UNPOISON((void *)msg);
160  u16 id = ntohs(*((u16 *)msg));
161  switch (id) {
162  case VL_API_RX_THREAD_EXIT:
163  vl_msg_api_free((void *) msg);
164  /* signal waiting threads that this thread is about to terminate */
165  pthread_mutex_lock(&pm->queue_lock);
166  rx_thread_done = true;
167  pthread_cond_signal(&pm->terminate_cv);
168  pthread_mutex_unlock(&pm->queue_lock);
169  pthread_exit(0);
170  return 0;
171  break;
172 
173  case VL_API_MEMCLNT_RX_THREAD_SUSPEND:
174  vl_msg_api_free((void * )msg);
175  /* Suspend thread and signal reader */
176  pthread_mutex_lock(&pm->queue_lock);
177  pthread_cond_signal(&pm->suspend_cv);
178  /* Wait for the resume signal */
179  pthread_cond_wait (&pm->resume_cv, &pm->queue_lock);
180  pthread_mutex_unlock(&pm->queue_lock);
181  break;
182 
183  case VL_API_MEMCLNT_READ_TIMEOUT:
184  clib_warning("Received read timeout in async thread\n");
185  vl_msg_api_free((void *) msg);
186  break;
187 
188  case VL_API_MEMCLNT_KEEPALIVE:
189  mp = (void *)msg;
190  rmp = vl_msg_api_alloc (sizeof (*rmp));
191  clib_memset (rmp, 0, sizeof (*rmp));
192  rmp->_vl_msg_id = ntohs(VL_API_MEMCLNT_KEEPALIVE_REPLY);
193  rmp->context = mp->context;
194  shmem_hdr = am->shmem_hdr;
195  vl_msg_api_send_shmem(shmem_hdr->vl_input_queue, (u8 *)&rmp);
196  vl_msg_api_free((void *) msg);
197  break;
198 
199  default:
200  vac_api_handler((void *)msg);
201  }
202  }
203 }
204 
205 static void *
207 {
209  vac_main_t *pm = &vac_main;
211  struct timespec ts;
212  struct timeval tv;
213  int rv;
214 
215  while (pm->timeout_loop)
216  {
217  /* Wait for poke */
218  pthread_mutex_lock(&pm->timeout_lock);
219  while (!timeout_in_progress)
220  pthread_cond_wait (&pm->timeout_cv, &pm->timeout_lock);
221 
222  /* Starting timer */
223  gettimeofday(&tv, NULL);
224  ts.tv_sec = tv.tv_sec + read_timeout;
225  ts.tv_nsec = 0;
226 
227  if (!timeout_cancelled) {
228  rv = pthread_cond_timedwait (&pm->timeout_cancel_cv,
229  &pm->timeout_lock, &ts);
230  if (rv == ETIMEDOUT && !timeout_thread_cancelled) {
231  ep = vl_msg_api_alloc (sizeof (*ep));
232  ep->_vl_msg_id = ntohs(VL_API_MEMCLNT_READ_TIMEOUT);
234  }
235  }
236 
237  pthread_mutex_unlock(&pm->timeout_lock);
238  }
239  pthread_exit(0);
240 }
241 
242 void
244 {
246  vac_main_t *pm = &vac_main;
248 
249  if (!pm->rx_thread_handle) return;
250  pthread_mutex_lock(&pm->queue_lock);
251  if (rx_is_running)
252  {
253  ep = vl_msg_api_alloc (sizeof (*ep));
254  ep->_vl_msg_id = ntohs(VL_API_MEMCLNT_RX_THREAD_SUSPEND);
256  /* Wait for RX thread to tell us it has suspended */
257  pthread_cond_wait(&pm->suspend_cv, &pm->queue_lock);
258  rx_is_running = false;
259  }
260  pthread_mutex_unlock(&pm->queue_lock);
261 }
262 
263 void
265 {
266  vac_main_t *pm = &vac_main;
267  if (!pm->rx_thread_handle) return;
268  pthread_mutex_lock(&pm->queue_lock);
269  if (rx_is_running) goto unlock;
270  pthread_cond_signal(&pm->resume_cv);
271  rx_is_running = true;
272  unlock:
273  pthread_mutex_unlock(&pm->queue_lock);
274 }
275 
276 static uword *
278 {
280  return (am->msg_index_by_name_and_crc);
281 }
282 
283 int
285 {
288 }
289 
290 int
291 vac_connect (char * name, char * chroot_prefix, vac_callback_t cb,
292  int rx_qlen)
293 {
294  rx_thread_done = false;
295  int rv = 0;
296  vac_main_t *pm = &vac_main;
297 
299  init();
300  if (chroot_prefix != NULL)
301  vl_set_memory_root_path (chroot_prefix);
302 
303  if ((rv = vl_client_api_map("/vpe-api"))) {
304  clib_warning ("vl_client_api_map returned %d", rv);
305  return rv;
306  }
307 
308  if (vl_client_connect(name, 0, rx_qlen) < 0) {
310  return (-1);
311  }
312 
313  if (cb) {
314  /* Start the rx queue thread */
315  rv = pthread_create(&pm->rx_thread_handle, NULL, vac_rx_thread_fn, 0);
316  if (rv) {
317  clib_warning("pthread_create returned %d", rv);
319  return (-1);
320  }
321  vac_callback = cb;
322  rx_is_running = true;
323  }
324 
325  /* Start read timeout thread */
326  rv = pthread_create(&pm->timeout_thread_handle, NULL,
328  if (rv) {
329  clib_warning("pthread_create returned %d", rv);
331  return (-1);
332  }
333 
334  pm->connected_to_vlib = 1;
335 
336  return (0);
337 }
338 static void
339 set_timeout (unsigned short timeout)
340 {
341  vac_main_t *pm = &vac_main;
342  pthread_mutex_lock(&pm->timeout_lock);
343  read_timeout = timeout;
344  timeout_in_progress = true;
345  timeout_cancelled = false;
346  pthread_cond_signal(&pm->timeout_cv);
347  pthread_mutex_unlock(&pm->timeout_lock);
348 }
349 
350 static void
352 {
353  vac_main_t *pm = &vac_main;
354  pthread_mutex_lock(&pm->timeout_lock);
355  timeout_in_progress = false;
356  timeout_cancelled = true;
357  pthread_cond_signal(&pm->timeout_cancel_cv);
358  pthread_mutex_unlock(&pm->timeout_lock);
359 }
360 
361 int
363 {
365  vac_main_t *pm = &vac_main;
366  uword junk;
367  int rv = 0;
368 
369  if (!pm->connected_to_vlib) return 0;
370 
371  if (pm->rx_thread_handle) {
373  ep = vl_msg_api_alloc (sizeof (*ep));
374  ep->_vl_msg_id = ntohs(VL_API_RX_THREAD_EXIT);
376 
377  /* wait (with timeout) until RX thread has finished */
378  struct timespec ts;
379  struct timeval tv;
380  gettimeofday(&tv, NULL);
381  ts.tv_sec = tv.tv_sec + 5;
382  ts.tv_nsec = 0;
383 
384  pthread_mutex_lock(&pm->queue_lock);
385  if (rx_thread_done == false)
386  rv = pthread_cond_timedwait(&pm->terminate_cv, &pm->queue_lock, &ts);
387  pthread_mutex_unlock(&pm->queue_lock);
388 
389  /* now join so we wait until thread has -really- finished */
390  if (rv == ETIMEDOUT)
391  pthread_cancel(pm->rx_thread_handle);
392  else
393  pthread_join(pm->rx_thread_handle, (void **) &junk);
394  }
395  if (pm->timeout_thread_handle) {
396  /* cancel, wake then join the timeout thread */
397  pm->timeout_loop = 0;
398  timeout_thread_cancelled = true;
399  set_timeout(0);
400  pthread_join(pm->timeout_thread_handle, (void **) &junk);
401  }
402 
405  //vac_callback = 0;
406 
407  cleanup();
408 
409  return (0);
410 }
411 
412 int
413 vac_read (char **p, int *l, u16 timeout)
414 {
415  svm_queue_t *q;
417  vac_main_t *pm = &vac_main;
419  vl_api_memclnt_keepalive_reply_t *rmp;
420  uword msg;
421  msgbuf_t *msgbuf;
422  int rv;
423  vl_shmem_hdr_t *shmem_hdr;
424 
425  /* svm_queue_sub(below) returns {-1, -2} */
426  if (!pm->connected_to_vlib) return -3;
427 
428  *l = 0;
429 
430  /* svm_queue_sub(below) returns {-1, -2} */
431  if (am->our_pid == 0) return (-4);
432 
433  /* Poke timeout thread */
434  if (timeout)
435  set_timeout(timeout);
436 
437  q = am->vl_input_queue;
438 
439  again:
440  rv = svm_queue_sub(q, (u8 *)&msg, SVM_Q_WAIT, 0);
441 
442  if (rv == 0) {
443  VL_MSG_API_UNPOISON((void *)msg);
444  u16 msg_id = ntohs(*((u16 *)msg));
445  switch (msg_id) {
446  case VL_API_RX_THREAD_EXIT:
447  vl_msg_api_free((void *) msg);
448  goto error;
449  case VL_API_MEMCLNT_RX_THREAD_SUSPEND:
450  goto error;
451  case VL_API_MEMCLNT_READ_TIMEOUT:
452  goto error;
453  case VL_API_MEMCLNT_KEEPALIVE:
454  /* Handle an alive-check ping from vpp. */
455  mp = (void *)msg;
456  rmp = vl_msg_api_alloc (sizeof (*rmp));
457  clib_memset (rmp, 0, sizeof (*rmp));
458  rmp->_vl_msg_id = ntohs(VL_API_MEMCLNT_KEEPALIVE_REPLY);
459  rmp->context = mp->context;
460  shmem_hdr = am->shmem_hdr;
461  vl_msg_api_send_shmem(shmem_hdr->vl_input_queue, (u8 *)&rmp);
462  vl_msg_api_free((void *) msg);
463  /*
464  * Python code is blissfully unaware of these pings, so
465  * act as if it never happened...
466  */
467  goto again;
468 
469  default:
470  msgbuf = (msgbuf_t *)(((u8 *)msg) - offsetof(msgbuf_t, data));
471  *l = ntohl(msgbuf->data_len);
472  if (*l == 0) {
473  fprintf(stderr, "Unregistered API message: %d\n", msg_id);
474  goto error;
475  }
476  }
477  *p = (char *)msg;
478 
479 
480  } else {
481  fprintf(stderr, "Read failed with %d\n", rv);
482  }
483  /* Let timeout notification thread know we're done */
484  if (timeout)
485  unset_timeout();
486 
487  return (rv);
488 
489  error:
490  if (timeout)
491  unset_timeout();
492  vl_msg_api_free((void *) msg);
493  /* Client might forget to resume RX thread on failure */
494  vac_rx_resume ();
495  return -1;
496 }
497 
498 /*
499  * XXX: Makes the assumption that client_index is the first member
500  */
501 typedef VL_API_PACKED(struct _vl_api_header {
502  u16 _vl_msg_id;
503  u32 client_index;
504 }) vl_api_header_t;
505 
506 static u32
507 vac_client_index (void)
508 {
509  return (vlibapi_get_main()->my_client_index);
510 }
511 
512 int
513 vac_write (char *p, int l)
514 {
515  int rv = -1;
517  vl_api_header_t *mp = vl_msg_api_alloc(l);
518  svm_queue_t *q;
519  vac_main_t *pm = &vac_main;
520 
521  if (!pm->connected_to_vlib) return -1;
522  if (!mp) return (-1);
523 
524  memcpy(mp, p, l);
525  mp->client_index = vac_client_index();
526  q = am->shmem_hdr->vl_input_queue;
527  rv = svm_queue_add(q, (u8 *)&mp, 0);
528  if (rv != 0) {
529  fprintf(stderr, "vpe_api_write fails: %d\n", rv);
530  /* Clear message */
531  vac_free(mp);
532  }
533  return (rv);
534 }
535 
536 int
538 {
539  return vl_msg_api_get_msg_index ((u8 *)name);
540 }
541 
542 int
544 {
545  int max = 0;
546  hash_pair_t *hp;
548  hash_foreach_pair (hp, h,
549  ({
550  if (hp->value[0] > max)
551  max = hp->value[0];
552  }));
553 
554  return max;
555 }
556 
557 void
559 {
561  if (cb) clib_error_register_handler (cb, 0);
562 }
563 
564 /*
565  * Required if application doesn't use a VPP heap.
566  */
567 void
569 {
570  if (mem_initialized)
571  return;
572  if (size == 0)
573  clib_mem_init (0, 1 << 30); // default
574  else
575  clib_mem_init (0, size);
576  mem_initialized = true;
577 }
bool timeout_thread_cancelled
Definition: client.c:80
void vac_rx_resume(void)
Definition: client.c:264
int svm_queue_add(svm_queue_t *q, u8 *elem, int nowait)
Definition: queue.c:260
bool mem_initialized
Definition: client.c:83
void vac_mem_init(size_t size)
Definition: client.c:568
#define ntohs(x)
Definition: af_xdp.bpf.c:29
int vac_get_msg_index(char *name)
Definition: client.c:537
void * clib_mem_init(void *base, uword size)
Definition: mem_dlmalloc.c:266
pthread_t timeout_thread_handle
Definition: client.c:65
clib_memset(h->entries, 0, sizeof(h->entries[0]) *entries)
static void set_timeout(unsigned short timeout)
Definition: client.c:339
pthread_t rx_thread_handle
Definition: client.c:64
int vac_msg_table_max_index(void)
Definition: client.c:543
vlib_main_t * vm
Definition: in2out_ed.c:1580
void vlib_cli_output(struct vlib_main_t *vm, char *fmt,...)
Definition: client.c:117
int vac_write(char *p, int l)
Definition: client.c:513
svm_queue_t * vl_input_queue
Peer input queue pointer.
Definition: api_common.h:332
void * vl_msg_api_alloc(int nbytes)
bool rx_is_running
Definition: client.c:79
unsigned char u8
Definition: types.h:56
u8 data[128]
Definition: ipsec_types.api:90
uword value[0]
Definition: hash.h:165
void vl_client_api_unmap(void)
#define assert(x)
Definition: dlmalloc.c:31
int our_pid
Current process PID.
Definition: api_common.h:280
u8 connected_to_vlib
Definition: client.c:63
pthread_cond_t timeout_cancel_cv
Definition: client.c:72
int svm_queue_sub(svm_queue_t *q, u8 *elem, svm_q_conditional_wait_t cond, u32 time)
Definition: queue.c:369
pthread_mutex_t queue_lock
Definition: client.c:66
vlib_main_t ** vlib_mains
Definition: client.c:60
bool rx_thread_done
Definition: client.c:39
__clib_export void clib_error_register_handler(clib_error_handler_func_t func, void *arg)
Definition: error.c:75
bool timeout_cancelled
Definition: client.c:37
struct vl_shmem_hdr_ * shmem_hdr
Binary API shared-memory segment header pointer.
Definition: api_common.h:293
unsigned int u32
Definition: types.h:88
void vac_rx_suspend(void)
Definition: client.c:243
void vl_msg_api_send_shmem(svm_queue_t *q, u8 *elem)
bool timeout_in_progress
Definition: client.c:38
vac_callback_t vac_callback
Definition: client.c:77
static void unset_timeout(void)
Definition: client.c:351
vlib_main_t vlib_global_main
Definition: client.c:59
int vac_connect(char *name, char *chroot_prefix, vac_callback_t cb, int rx_qlen)
Definition: client.c:291
void vl_set_memory_root_path(const char *name)
int vac_disconnect(void)
Definition: client.c:362
Definition: cJSON.c:84
void(* vac_error_callback_t)(void *, unsigned char *, int)
Definition: vppapiclient.h:22
u8 timeout_loop
Definition: client.c:70
int vl_client_connect(const char *name, int ctx_quota, int input_queue_size)
unsigned short u16
Definition: types.h:57
u32 size
Definition: vhost_user.h:106
vec_header_t h
Definition: buffer.c:322
static void cleanup(void)
Definition: client.c:101
vac_main_t vac_main
Definition: client.c:76
static void * vac_timeout_thread_fn(void *arg)
Definition: client.c:206
int cJSON_bool fmt
Definition: cJSON.h:160
u16 read_timeout
Definition: client.c:78
API main structure, used by both vpp and binary API clients.
Definition: api_common.h:227
#define clib_warning(format, args...)
Definition: error.h:59
u32 vl_msg_api_get_msg_index(u8 *name_and_crc)
Definition: api_shared.c:1105
int vl_client_api_map(const char *region_name)
pthread_cond_t suspend_cv
Definition: client.c:67
static uword * vac_msg_table_get_hash(void)
Definition: client.c:277
blocking call - best used in combination with condvars, for eventfds we don&#39;t yield the cpu ...
Definition: queue.h:42
svm_queue_t * vl_input_queue
Definition: memory_shared.h:84
string name[64]
Definition: ip.api:44
static clib_mem_heap_t * clib_mem_get_heap(void)
Definition: mem.h:359
static uword hash_elts(void *v)
Definition: hash.h:118
#define ASSERT(truth)
u32 data_len
message length not including header
Definition: api_common.h:142
Message header structure.
Definition: api_common.h:139
int vac_read(char **p, int *l, u16 timeout)
Definition: client.c:413
static void init(void)
Definition: client.c:86
pthread_cond_t terminate_cv
Definition: client.c:73
void vl_msg_api_free(void *)
pthread_cond_t timeout_cv
Definition: client.c:71
pthread_cond_t resume_cv
Definition: client.c:68
#define hash_foreach_pair(p, v, body)
Iterate over hash pairs.
Definition: hash.h:373
int vl_client_disconnect(void)
u64 uword
Definition: types.h:112
void vac_set_error_handler(vac_error_callback_t cb)
Definition: client.c:558
int vac_msg_table_size(void)
Definition: client.c:284
struct _svm_queue svm_queue_t
void(* vac_callback_t)(unsigned char *data, int len)
Definition: vppapiclient.h:21
pthread_mutex_t timeout_lock
Definition: client.c:69
static api_main_t * vlibapi_get_main(void)
Definition: api_common.h:389
void vac_free(void *msg)
Definition: client.c:123
static void vac_api_handler(void *msg)
Definition: client.c:129
uword * msg_index_by_name_and_crc
client message index hash table
Definition: api_common.h:350
typedef VL_API_PACKED(struct _vl_api_header { u16 _vl_msg_id;u32 client_index;})
Definition: client.c:501
static CLIB_NOSANITIZE_ADDR void VL_MSG_API_UNPOISON(const void *a)
Definition: api_common.h:148
static void * vac_rx_thread_fn(void *arg)
Definition: client.c:144