FD.io VPP  v17.07.01-10-g3be13f0
Vector Packet Processing
session.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19 
20 #include <vnet/session/session.h>
21 #include <vlibmemory/api.h>
22 #include <vnet/dpo/load_balance.h>
23 #include <vnet/fib/ip4_fib.h>
25 #include <vnet/tcp/tcp.h>
27 
28 /**
29  * Per-type vector of transport protocol virtual function tables
30  */
32 
34 
35 /*
36  * Session lookup key; (src-ip, dst-ip, src-port, dst-port, session-type)
37  * Value: (owner thread index << 32 | session_index);
38  */
39 void
41 {
43  session_kv4_t kv4;
44  session_kv6_t kv6;
45 
46  switch (tc->proto)
47  {
48  case SESSION_TYPE_IP4_UDP:
49  case SESSION_TYPE_IP4_TCP:
50  make_v4_ss_kv_from_tc (&kv4, tc);
51  kv4.value = value;
52  clib_bihash_add_del_16_8 (&smm->v4_session_hash, &kv4, 1 /* is_add */ );
53  break;
54  case SESSION_TYPE_IP6_UDP:
55  case SESSION_TYPE_IP6_TCP:
56  make_v6_ss_kv_from_tc (&kv6, tc);
57  kv6.value = value;
58  clib_bihash_add_del_48_8 (&smm->v6_session_hash, &kv6, 1 /* is_add */ );
59  break;
60  default:
61  clib_warning ("Session type not supported");
62  ASSERT (0);
63  }
64 }
65 
66 void
68  u64 value)
69 {
71 
72  tc = tp_vfts[s->session_type].get_connection (s->connection_index,
73  s->thread_index);
75 }
76 
77 static void
79  transport_connection_t * tc, u64 value)
80 {
82  session_kv4_t kv4;
83  session_kv6_t kv6;
84 
85  switch (sst)
86  {
87  case SESSION_TYPE_IP4_UDP:
88  case SESSION_TYPE_IP4_TCP:
89  make_v4_ss_kv_from_tc (&kv4, tc);
90  kv4.value = value;
91  clib_bihash_add_del_16_8 (&smm->v4_half_open_hash, &kv4,
92  1 /* is_add */ );
93  break;
94  case SESSION_TYPE_IP6_UDP:
95  case SESSION_TYPE_IP6_TCP:
96  make_v6_ss_kv_from_tc (&kv6, tc);
97  kv6.value = value;
98  clib_bihash_add_del_48_8 (&smm->v6_half_open_hash, &kv6,
99  1 /* is_add */ );
100  break;
101  default:
102  clib_warning ("Session type not supported");
103  ASSERT (0);
104  }
105 }
106 
107 int
109 {
111  session_kv4_t kv4;
112  session_kv6_t kv6;
113  switch (tc->proto)
114  {
115  case SESSION_TYPE_IP4_UDP:
116  case SESSION_TYPE_IP4_TCP:
117  make_v4_ss_kv_from_tc (&kv4, tc);
118  return clib_bihash_add_del_16_8 (&smm->v4_session_hash, &kv4,
119  0 /* is_add */ );
120  break;
121  case SESSION_TYPE_IP6_UDP:
122  case SESSION_TYPE_IP6_TCP:
123  make_v6_ss_kv_from_tc (&kv6, tc);
124  return clib_bihash_add_del_48_8 (&smm->v6_session_hash, &kv6,
125  0 /* is_add */ );
126  break;
127  default:
128  clib_warning ("Session type not supported");
129  ASSERT (0);
130  }
131 
132  return 0;
133 }
134 
135 static int
137 {
139 
140  ts = tp_vfts[s->session_type].get_connection (s->connection_index,
141  s->thread_index);
143 }
144 
145 static void
148 {
149  session_kv4_t kv4;
150  session_kv6_t kv6;
151 
152  switch (sst)
153  {
154  case SESSION_TYPE_IP4_UDP:
155  case SESSION_TYPE_IP4_TCP:
156  make_v4_ss_kv_from_tc (&kv4, tc);
157  clib_bihash_add_del_16_8 (&smm->v4_half_open_hash, &kv4,
158  0 /* is_add */ );
159  break;
160  case SESSION_TYPE_IP6_UDP:
161  case SESSION_TYPE_IP6_TCP:
162  make_v6_ss_kv_from_tc (&kv6, tc);
163  clib_bihash_add_del_48_8 (&smm->v6_half_open_hash, &kv6,
164  0 /* is_add */ );
165  break;
166  default:
167  clib_warning ("Session type not supported");
168  ASSERT (0);
169  }
170 }
171 
174 {
176  session_kv4_t kv4;
177  int rv;
178 
179  make_v4_listener_kv (&kv4, lcl, lcl_port, proto);
180  rv = clib_bihash_search_inline_16_8 (&smm->v4_session_hash, &kv4);
181  if (rv == 0)
182  return pool_elt_at_index (smm->listen_sessions[proto], (u32) kv4.value);
183 
184  /* Zero out the lcl ip */
185  kv4.key[0] = 0;
186  rv = clib_bihash_search_inline_16_8 (&smm->v4_session_hash, &kv4);
187  if (rv == 0)
188  return pool_elt_at_index (smm->listen_sessions[proto], kv4.value);
189 
190  return 0;
191 }
192 
193 /** Looks up a session based on the 5-tuple passed as argument.
194  *
195  * First it tries to find an established session, if this fails, it tries
196  * finding a listener session if this fails, it tries a lookup with a
197  * wildcarded local source (listener bound to all interfaces)
198  */
201  u16 lcl_port, u16 rmt_port, u8 proto,
202  u32 my_thread_index)
203 {
205  session_kv4_t kv4;
206  int rv;
207 
208  /* Lookup session amongst established ones */
209  make_v4_ss_kv (&kv4, lcl, rmt, lcl_port, rmt_port, proto);
210  rv = clib_bihash_search_inline_16_8 (&smm->v4_session_hash, &kv4);
211  if (rv == 0)
212  return stream_session_get_tsi (kv4.value, my_thread_index);
213 
214  /* If nothing is found, check if any listener is available */
215  return stream_session_lookup_listener4 (lcl, lcl_port, proto);
216 }
217 
220 {
222  session_kv6_t kv6;
223  int rv;
224 
225  make_v6_listener_kv (&kv6, lcl, lcl_port, proto);
226  rv = clib_bihash_search_inline_48_8 (&smm->v6_session_hash, &kv6);
227  if (rv == 0)
228  return pool_elt_at_index (smm->listen_sessions[proto], kv6.value);
229 
230  /* Zero out the lcl ip */
231  kv6.key[0] = kv6.key[1] = 0;
232  rv = clib_bihash_search_inline_48_8 (&smm->v6_session_hash, &kv6);
233  if (rv == 0)
234  return pool_elt_at_index (smm->listen_sessions[proto], kv6.value);
235 
236  return 0;
237 }
238 
239 /* Looks up a session based on the 5-tuple passed as argument.
240  * First it tries to find an established session, if this fails, it tries
241  * finding a listener session if this fails, it tries a lookup with a
242  * wildcarded local source (listener bound to all interfaces) */
245  u16 lcl_port, u16 rmt_port, u8 proto,
246  u32 my_thread_index)
247 {
249  session_kv6_t kv6;
250  int rv;
251 
252  make_v6_ss_kv (&kv6, lcl, rmt, lcl_port, rmt_port, proto);
253  rv = clib_bihash_search_inline_48_8 (&smm->v6_session_hash, &kv6);
254  if (rv == 0)
255  return stream_session_get_tsi (kv6.value, my_thread_index);
256 
257  /* If nothing is found, check if any listener is available */
258  return stream_session_lookup_listener6 (lcl, lcl_port, proto);
259 }
260 
262 stream_session_lookup_listener (ip46_address_t * lcl, u16 lcl_port, u8 proto)
263 {
264  switch (proto)
265  {
266  case SESSION_TYPE_IP4_UDP:
267  case SESSION_TYPE_IP4_TCP:
268  return stream_session_lookup_listener4 (&lcl->ip4, lcl_port, proto);
269  break;
270  case SESSION_TYPE_IP6_UDP:
271  case SESSION_TYPE_IP6_TCP:
272  return stream_session_lookup_listener6 (&lcl->ip6, lcl_port, proto);
273  break;
274  }
275  return 0;
276 }
277 
278 static u64
280  ip46_address_t * lcl, ip46_address_t * rmt,
281  u16 lcl_port, u16 rmt_port, u8 proto)
282 {
283  session_kv4_t kv4;
284  session_kv6_t kv6;
285  int rv;
286 
287  switch (proto)
288  {
289  case SESSION_TYPE_IP4_UDP:
290  case SESSION_TYPE_IP4_TCP:
291  make_v4_ss_kv (&kv4, &lcl->ip4, &rmt->ip4, lcl_port, rmt_port, proto);
292  rv = clib_bihash_search_inline_16_8 (&smm->v4_half_open_hash, &kv4);
293 
294  if (rv == 0)
295  return kv4.value;
296 
297  return (u64) ~ 0;
298  break;
299  case SESSION_TYPE_IP6_UDP:
300  case SESSION_TYPE_IP6_TCP:
301  make_v6_ss_kv (&kv6, &lcl->ip6, &rmt->ip6, lcl_port, rmt_port, proto);
302  rv = clib_bihash_search_inline_48_8 (&smm->v6_half_open_hash, &kv6);
303 
304  if (rv == 0)
305  return kv6.value;
306 
307  return (u64) ~ 0;
308  break;
309  }
310  return 0;
311 }
312 
315  u16 lcl_port, u16 rmt_port, u8 proto,
316  u32 my_thread_index)
317 {
319  session_kv4_t kv4;
320  stream_session_t *s;
321  int rv;
322 
323  /* Lookup session amongst established ones */
324  make_v4_ss_kv (&kv4, lcl, rmt, lcl_port, rmt_port, proto);
325  rv = clib_bihash_search_inline_16_8 (&smm->v4_session_hash, &kv4);
326  if (rv == 0)
327  {
328  s = stream_session_get_tsi (kv4.value, my_thread_index);
329 
330  return tp_vfts[s->session_type].get_connection (s->connection_index,
331  my_thread_index);
332  }
333 
334  /* If nothing is found, check if any listener is available */
335  s = stream_session_lookup_listener4 (lcl, lcl_port, proto);
336  if (s)
337  return tp_vfts[s->session_type].get_listener (s->connection_index);
338 
339  /* Finally, try half-open connections */
340  rv = clib_bihash_search_inline_16_8 (&smm->v4_half_open_hash, &kv4);
341  if (rv == 0)
342  return tp_vfts[proto].get_half_open (kv4.value & 0xFFFFFFFF);
343 
344  return 0;
345 }
346 
349  u16 lcl_port, u16 rmt_port, u8 proto,
350  u32 my_thread_index)
351 {
353  stream_session_t *s;
354  session_kv6_t kv6;
355  int rv;
356 
357  make_v6_ss_kv (&kv6, lcl, rmt, lcl_port, rmt_port, proto);
358  rv = clib_bihash_search_inline_48_8 (&smm->v6_session_hash, &kv6);
359  if (rv == 0)
360  {
361  s = stream_session_get_tsi (kv6.value, my_thread_index);
362 
363  return tp_vfts[s->session_type].get_connection (s->connection_index,
364  my_thread_index);
365  }
366 
367  /* If nothing is found, check if any listener is available */
368  s = stream_session_lookup_listener6 (lcl, lcl_port, proto);
369  if (s)
370  return tp_vfts[s->session_type].get_listener (s->connection_index);
371 
372  /* Finally, try half-open connections */
373  rv = clib_bihash_search_inline_48_8 (&smm->v6_half_open_hash, &kv6);
374  if (rv == 0)
375  return tp_vfts[proto].get_half_open (kv6.value & 0xFFFFFFFF);
376 
377  return 0;
378 }
379 
380 int
382  stream_session_t ** ret_s)
383 {
385  svm_fifo_t *server_rx_fifo = 0, *server_tx_fifo = 0;
386  u32 fifo_segment_index;
387  u32 pool_index;
388  stream_session_t *s;
389  u64 value;
390  u32 thread_index = tc->thread_index;
391  int rv;
392 
393  if ((rv = segment_manager_alloc_session_fifos (sm, &server_rx_fifo,
394  &server_tx_fifo,
395  &fifo_segment_index)))
396  return rv;
397 
398  /* Create the session */
399  pool_get_aligned (smm->sessions[thread_index], s, CLIB_CACHE_LINE_BYTES);
400  memset (s, 0, sizeof (*s));
401 
402  /* Initialize backpointers */
403  pool_index = s - smm->sessions[thread_index];
404  server_rx_fifo->master_session_index = pool_index;
405  server_rx_fifo->master_thread_index = thread_index;
406 
407  server_tx_fifo->master_session_index = pool_index;
408  server_tx_fifo->master_thread_index = thread_index;
409 
410  s->server_rx_fifo = server_rx_fifo;
411  s->server_tx_fifo = server_tx_fifo;
412 
413  /* Initialize state machine, such as it is... */
414  s->session_type = tc->proto;
415  s->session_state = SESSION_STATE_CONNECTING;
416  s->svm_segment_index = fifo_segment_index;
417  s->thread_index = thread_index;
418  s->session_index = pool_index;
419 
420  /* Attach transport to session */
421  s->connection_index = tc->c_index;
422 
423  /* Attach session to transport */
424  tc->s_index = s->session_index;
425 
426  /* Add to the main lookup table */
427  value = (((u64) thread_index) << 32) | (u64) s->session_index;
429 
430  *ret_s = s;
431 
432  return 0;
433 }
434 
435 /** Enqueue buffer chain tail */
436 always_inline int
438  u32 offset, u8 is_in_order)
439 {
440  vlib_buffer_t *chain_b;
441  u32 chain_bi = b->next_buffer;
442  vlib_main_t *vm = vlib_get_main ();
443  u8 *data, len;
444  u16 written = 0;
445  int rv = 0;
446 
447  do
448  {
449  chain_b = vlib_get_buffer (vm, chain_bi);
450  data = vlib_buffer_get_current (chain_b);
451  len = chain_b->current_length;
452  if (is_in_order)
453  {
454  rv = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data);
455  if (rv < len)
456  {
457  return (rv > 0) ? (written + rv) : written;
458  }
459  written += rv;
460  }
461  else
462  {
463  rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset, len,
464  data);
465  if (rv)
466  return -1;
467  offset += len;
468  }
469  }
470  while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
471  ? chain_b->next_buffer : 0));
472 
473  if (is_in_order)
474  return written;
475 
476  return 0;
477 }
478 
479 /*
480  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
481  * event but on request can queue notification events for later delivery by
482  * calling stream_server_flush_enqueue_events().
483  *
484  * @param tc Transport connection which is to be enqueued data
485  * @param b Buffer to be enqueued
486  * @param offset Offset at which to start enqueueing if out-of-order
487  * @param queue_event Flag to indicate if peer is to be notified or if event
488  * is to be queued. The former is useful when more data is
489  * enqueued and only one event is to be generated.
490  * @param is_in_order Flag to indicate if data is in order
491  * @return Number of bytes enqueued or a negative value if enqueueing failed.
492  */
493 int
495  u32 offset, u8 queue_event, u8 is_in_order)
496 {
497  stream_session_t *s;
498  int enqueued = 0, rv;
499 
500  s = stream_session_get (tc->s_index, tc->thread_index);
501 
502  if (is_in_order)
503  {
504  enqueued =
505  svm_fifo_enqueue_nowait (s->server_rx_fifo, b->current_length,
507  if (PREDICT_FALSE
508  ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued > 0))
509  {
510  rv = session_enqueue_chain_tail (s, b, 0, 1);
511  if (rv <= 0)
512  return enqueued;
513  enqueued += rv;
514  }
515  }
516  else
517  {
518  rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset,
519  b->current_length,
521  if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
522  rv = session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
523  if (rv)
524  return -1;
525  }
526 
527  if (queue_event)
528  {
529  /* Queue RX event on this fifo. Eventually these will need to be flushed
530  * by calling stream_server_flush_enqueue_events () */
532  u32 thread_index = s->thread_index;
533  u32 my_enqueue_epoch = smm->current_enqueue_epoch[thread_index];
534 
535  if (s->enqueue_epoch != my_enqueue_epoch)
536  {
537  s->enqueue_epoch = my_enqueue_epoch;
538  vec_add1 (smm->session_indices_to_enqueue_by_thread[thread_index],
539  s - smm->sessions[thread_index]);
540  }
541  }
542 
543  if (is_in_order)
544  return enqueued;
545 
546  return 0;
547 }
548 
549 /** Check if we have space in rx fifo to push more bytes */
550 u8
552  u16 data_len)
553 {
554  stream_session_t *s = stream_session_get (tc->s_index, thread_index);
555 
556  if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY))
557  return 1;
558 
559  if (data_len > svm_fifo_max_enqueue (s->server_rx_fifo))
560  return 1;
561 
562  return 0;
563 }
564 
565 u32
567 {
568  stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
569  if (s->session_state != SESSION_STATE_READY)
570  return 0;
571  return svm_fifo_max_dequeue (s->server_tx_fifo);
572 }
573 
574 int
576  u32 offset, u32 max_bytes)
577 {
578  stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
579  return svm_fifo_peek (s->server_tx_fifo, offset, max_bytes, buffer);
580 }
581 
582 u32
584 {
585  stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
586  return svm_fifo_dequeue_drop (s->server_tx_fifo, max_bytes);
587 }
588 
589 /**
590  * Notify session peer that new data has been enqueued.
591  *
592  * @param s Stream session for which the event is to be generated.
593  * @param block Flag to indicate if call should block if event queue is full.
594  *
595  * @return 0 on succes or negative number if failed to send notification.
596  */
597 static int
599 {
600  application_t *app;
601  session_fifo_event_t evt;
603  static u32 serial_number;
604 
605  if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
606  return 0;
607 
608  /* Get session's server */
609  app = application_get (s->app_index);
610 
611  /* Built-in server? Hand event to the callback... */
612  if (app->cb_fns.builtin_server_rx_callback)
613  return app->cb_fns.builtin_server_rx_callback (s);
614 
615  /* If no event, send one */
616  if (svm_fifo_set_event (s->server_rx_fifo))
617  {
618  /* Fabricate event */
619  evt.fifo = s->server_rx_fifo;
620  evt.event_type = FIFO_EVENT_APP_RX;
621  evt.event_id = serial_number++;
622 
623  /* Add event to server's event queue */
624  q = app->event_queue;
625 
626  /* Based on request block (or not) for lack of space */
627  if (block || PREDICT_TRUE (q->cursize < q->maxsize))
628  unix_shared_memory_queue_add (app->event_queue, (u8 *) & evt,
629  0 /* do wait for mutex */ );
630  else
631  {
632  clib_warning ("fifo full");
633  return -1;
634  }
635  }
636 
637  /* *INDENT-OFF* */
638  SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({
639  ed->data[0] = evt.event_id;
640  ed->data[1] = svm_fifo_max_dequeue (s->server_rx_fifo);
641  }));
642  /* *INDENT-ON* */
643 
644  return 0;
645 }
646 
647 /**
648  * Flushes queue of sessions that are to be notified of new data
649  * enqueued events.
650  *
651  * @param thread_index Thread index for which the flush is to be performed.
652  * @return 0 on success or a positive number indicating the number of
653  * failures due to API queue being full.
654  */
655 int
657 {
659  u32 *session_indices_to_enqueue;
660  int i, errors = 0;
661 
662  session_indices_to_enqueue =
663  smm->session_indices_to_enqueue_by_thread[thread_index];
664 
665  for (i = 0; i < vec_len (session_indices_to_enqueue); i++)
666  {
667  stream_session_t *s0;
668 
669  /* Get session */
670  s0 = stream_session_get (session_indices_to_enqueue[i], thread_index);
671  if (stream_session_enqueue_notify (s0, 0 /* don't block */ ))
672  {
673  errors++;
674  }
675  }
676 
677  vec_reset_length (session_indices_to_enqueue);
678 
679  smm->session_indices_to_enqueue_by_thread[thread_index] =
680  session_indices_to_enqueue;
681 
682  /* Increment enqueue epoch for next round */
683  smm->current_enqueue_epoch[thread_index]++;
684 
685  return errors;
686 }
687 
688 /**
689  * Init fifo tail and head pointers
690  *
691  * Useful if transport uses absolute offsets for tracking ooo segments.
692  */
693 void
695  u32 rx_pointer, u32 tx_pointer)
696 {
697  stream_session_t *s;
698  s = stream_session_get (tc->s_index, tc->thread_index);
699  svm_fifo_init_pointers (s->server_rx_fifo, rx_pointer);
700  svm_fifo_init_pointers (s->server_tx_fifo, tx_pointer);
701 }
702 
703 int
705  u8 is_fail)
706 {
708  application_t *app;
709  stream_session_t *new_s = 0;
710  u64 handle;
711  u32 api_context = 0;
712  int error = 0;
713 
714  handle = stream_session_half_open_lookup (smm, &tc->lcl_ip, &tc->rmt_ip,
715  tc->lcl_port, tc->rmt_port,
716  tc->proto);
717  if (handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
718  {
719  clib_warning ("This can't be good!");
720  return -1;
721  }
722 
723  /* Get the app's index from the handle we stored when opening connection */
724  app = application_get (handle >> 32);
725  api_context = tc->s_index;
726 
727  if (!is_fail)
728  {
729  segment_manager_t *sm;
731 
732  /* Create new session (svm segments are allocated if needed) */
733  if (stream_session_create_i (sm, tc, &new_s))
734  {
735  is_fail = 1;
736  error = -1;
737  }
738  else
739  new_s->app_index = app->index;
740  }
741 
742  /* Notify client */
743  app->cb_fns.session_connected_callback (app->index, api_context, new_s,
744  is_fail);
745 
746  /* Cleanup session lookup */
747  stream_session_half_open_table_del (smm, sst, tc);
748 
749  return error;
750 }
751 
752 void
754 {
755  application_t *server;
756  stream_session_t *s;
757 
758  s = stream_session_get (tc->s_index, tc->thread_index);
759  server = application_get (s->app_index);
760  server->cb_fns.session_accept_callback (s);
761 }
762 
763 /**
764  * Notification from transport that connection is being closed.
765  *
766  * A disconnect is sent to application but state is not removed. Once
767  * disconnect is acknowledged by application, session disconnect is called.
768  * Ultimately this leads to close being called on transport (passive close).
769  */
770 void
772 {
773  application_t *server;
774  stream_session_t *s;
775 
776  s = stream_session_get (tc->s_index, tc->thread_index);
777  server = application_get (s->app_index);
778  server->cb_fns.session_disconnect_callback (s);
779 }
780 
781 /**
782  * Cleans up session and associated app if needed.
783  */
784 void
786 {
788 
789  /* Delete from the main lookup table. */
790  stream_session_table_del (smm, s);
791 
792  /* Cleanup fifo segments */
793  segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo,
794  s->server_tx_fifo);
795 
796  pool_put (smm->sessions[s->thread_index], s);
797 }
798 
799 /**
800  * Notification from transport that connection is being deleted
801  *
802  * This should be called only on previously fully established sessions. For
803  * instance failed connects should call stream_session_connect_notify and
804  * indicate that the connect has failed.
805  */
806 void
808 {
809  stream_session_t *s;
810 
811  /* App might've been removed already */
812  s = stream_session_get_if_valid (tc->s_index, tc->thread_index);
813  if (!s)
814  {
815  return;
816  }
818 }
819 
820 /**
821  * Notify application that connection has been reset.
822  */
823 void
825 {
826  stream_session_t *s;
827  application_t *app;
828  s = stream_session_get (tc->s_index, tc->thread_index);
829 
830  app = application_get (s->app_index);
831  app->cb_fns.session_reset_callback (s);
832 }
833 
834 /**
835  * Accept a stream session. Optionally ping the server by callback.
836  */
837 int
839  u8 sst, u8 notify)
840 {
841  application_t *server;
842  stream_session_t *s, *listener;
843  segment_manager_t *sm;
844 
845  int rv;
846 
847  /* Find the server */
848  listener = listen_session_get (sst, listener_index);
849  server = application_get (listener->app_index);
850 
851  sm = application_get_listen_segment_manager (server, listener);
852  if ((rv = stream_session_create_i (sm, tc, &s)))
853  return rv;
854 
855  s->app_index = server->index;
856  s->listener_index = listener_index;
857 
858  /* Shoulder-tap the server */
859  if (notify)
860  {
861  server->cb_fns.session_accept_callback (s);
862  }
863 
864  return 0;
865 }
866 
867 /**
868  * Ask transport to open connection to remote transport endpoint.
869  *
870  * Stores handle for matching request with reply since the call can be
871  * asynchronous. For instance, for TCP the 3-way handshake must complete
872  * before reply comes. Session is only created once connection is established.
873  *
874  * @param app_index Index of the application requesting the connect
875  * @param st Session type requested.
876  * @param tep Remote transport endpoint
877  * @param res Resulting transport connection .
878  */
879 int
881  transport_endpoint_t * tep,
882  transport_connection_t ** res)
883 {
885  int rv;
886  u64 handle;
887 
888  rv = tp_vfts[st].open (&tep->ip, tep->port);
889  if (rv < 0)
890  {
891  clib_warning ("Transport failed to open connection.");
892  return VNET_API_ERROR_SESSION_CONNECT_FAIL;
893  }
894 
895  tc = tp_vfts[st].get_half_open ((u32) rv);
896 
897  /* Save app and tc index. The latter is needed to help establish the
898  * connection while the former is needed when the connect notify comes
899  * and we have to notify the external app */
900  handle = (((u64) app_index) << 32) | (u64) tc->c_index;
901 
902  /* Add to the half-open lookup table */
903  stream_session_half_open_table_add (st, tc, handle);
904 
905  *res = tc;
906 
907  return 0;
908 }
909 
910 /**
911  * Ask transport to listen on local transport endpoint.
912  *
913  * @param s Session for which listen will be called. Note that unlike
914  * established sessions, listen sessions are not associated to a
915  * thread.
916  * @param tep Local endpoint to be listened on.
917  */
918 int
920 {
922  u32 tci;
923 
924  /* Transport bind/listen */
925  tci = tp_vfts[s->session_type].bind (s->session_index, &tep->ip, tep->port);
926 
927  if (tci == (u32) ~ 0)
928  return -1;
929 
930  /* Attach transport to session */
931  s->connection_index = tci;
932  tc = tp_vfts[s->session_type].get_listener (tci);
933 
934  /* Weird but handle it ... */
935  if (tc == 0)
936  return -1;
937 
938  /* Add to the main lookup table */
939  stream_session_table_add_for_tc (tc, s->session_index);
940 
941  return 0;
942 }
943 
944 /**
945  * Ask transport to stop listening on local transport endpoint.
946  *
947  * @param s Session to stop listening on. It must be in state LISTENING.
948  */
949 int
951 {
953 
954  if (s->session_state != SESSION_STATE_LISTENING)
955  {
956  clib_warning ("not a listening session");
957  return -1;
958  }
959 
960  tc = tp_vfts[s->session_type].get_listener (s->connection_index);
961  if (!tc)
962  {
963  clib_warning ("no transport");
964  return VNET_API_ERROR_ADDRESS_NOT_IN_USE;
965  }
966 
968  tp_vfts[s->session_type].unbind (s->connection_index);
969  return 0;
970 }
971 
972 void
974  fifo_event_type_t evt_type,
975  u32 thread_index)
976 {
977  static u16 serial_number = 0;
978  session_fifo_event_t evt;
980 
981  /* Fabricate event */
982  evt.session_handle = session_handle;
983  evt.event_type = evt_type;
984  evt.event_id = serial_number++;
985 
986  q = session_manager_get_vpp_event_queue (thread_index);
987 
988  /* Based on request block (or not) for lack of space */
989  if (PREDICT_TRUE (q->cursize < q->maxsize))
990  {
991  if (unix_shared_memory_queue_add (q, (u8 *) & evt,
992  1 /* do wait for mutex */ ))
993  {
994  clib_warning ("failed to enqueue evt");
995  }
996  }
997  else
998  {
999  clib_warning ("queue full");
1000  return;
1001  }
1002 }
1003 
1004 /**
1005  * Disconnect session and propagate to transport. This should eventually
1006  * result in a delete notification that allows us to cleanup session state.
1007  * Called for both active/passive disconnects.
1008  *
1009  * Should be called from the session's thread.
1010  */
1011 void
1013 {
1014  s->session_state = SESSION_STATE_CLOSED;
1015  tp_vfts[s->session_type].close (s->connection_index, s->thread_index);
1016 }
1017 
1018 /**
1019  * Cleanup transport and session state.
1020  *
1021  * Notify transport of the cleanup, wait for a delete notify to actually
1022  * remove the session state.
1023  */
1024 void
1026 {
1028  int rv;
1029 
1030  s->session_state = SESSION_STATE_CLOSED;
1031 
1032  /* Delete from the main lookup table to avoid more enqueues */
1033  rv = stream_session_table_del (smm, s);
1034  if (rv)
1035  clib_warning ("hash delete error, rv %d", rv);
1036 
1037  tp_vfts[s->session_type].cleanup (s->connection_index, s->thread_index);
1038 }
1039 
1040 void
1042 {
1044 
1045  vec_validate (tp_vfts, type);
1046  tp_vfts[type] = *vft;
1047 
1048  /* If an offset function is provided, then peek instead of dequeue */
1049  smm->session_tx_fns[type] =
1050  (vft->tx_fifo_offset) ? session_tx_fifo_peek_and_snd :
1052 }
1053 
1056 {
1057  if (type >= vec_len (tp_vfts))
1058  return 0;
1059  return &tp_vfts[type];
1060 }
1061 
1062 /**
1063  * Allocate vpp event queue (once) per worker thread
1064  */
1065 void
1067  u32 thread_index)
1068 {
1069  api_main_t *am = &api_main;
1070  void *oldheap;
1071  u32 event_queue_length = 2048;
1072 
1073  if (smm->vpp_event_queues[thread_index] == 0)
1074  {
1075  /* Allocate event fifo in the /vpe-api shared-memory segment */
1076  oldheap = svm_push_data_heap (am->vlib_rp);
1077 
1078  if (smm->configured_event_queue_length)
1079  event_queue_length = smm->configured_event_queue_length;
1080 
1081  smm->vpp_event_queues[thread_index] =
1083  (event_queue_length,
1084  sizeof (session_fifo_event_t), 0 /* consumer pid */ ,
1085  0 /* (do not) send signal when queue non-empty */ );
1086 
1087  svm_pop_heap (oldheap);
1088  }
1089 }
1090 
1091 static clib_error_t *
1093 {
1096  u32 num_threads;
1097  int i;
1098 
1099  num_threads = 1 /* main thread */ + vtm->n_threads;
1100 
1101  if (num_threads < 1)
1102  return clib_error_return (0, "n_thread_stacks not set");
1103 
1104  /* $$$ config parameters */
1105  svm_fifo_segment_init (0x200000000ULL /* first segment base VA */ ,
1106  20 /* timeout in seconds */ );
1107 
1108  /* configure per-thread ** vectors */
1109  vec_validate (smm->sessions, num_threads - 1);
1110  vec_validate (smm->session_indices_to_enqueue_by_thread, num_threads - 1);
1111  vec_validate (smm->tx_buffers, num_threads - 1);
1112  vec_validate (smm->pending_event_vector, num_threads - 1);
1113  vec_validate (smm->free_event_vector, num_threads - 1);
1114  vec_validate (smm->current_enqueue_epoch, num_threads - 1);
1115  vec_validate (smm->vpp_event_queues, num_threads - 1);
1116 
1117  for (i = 0; i < num_threads; i++)
1118  {
1119  vec_validate (smm->free_event_vector[i], 0);
1120  _vec_len (smm->free_event_vector[i]) = 0;
1121  vec_validate (smm->pending_event_vector[i], 0);
1122  _vec_len (smm->pending_event_vector[i]) = 0;
1123  }
1124 
1125 #if SESSION_DBG
1126  vec_validate (smm->last_event_poll_by_thread, num_threads - 1);
1127 #endif
1128 
1129  /* Allocate vpp event queues */
1130  for (i = 0; i < vec_len (smm->vpp_event_queues); i++)
1132 
1133  /* $$$$ preallocate hack config parameter */
1134  for (i = 0; i < 200000; i++)
1135  {
1136  stream_session_t *ss;
1137  pool_get_aligned (smm->sessions[0], ss, CLIB_CACHE_LINE_BYTES);
1138  memset (ss, 0, sizeof (*ss));
1139  }
1140 
1141  for (i = 0; i < 200000; i++)
1142  pool_put_index (smm->sessions[0], i);
1143 
1144  clib_bihash_init_16_8 (&smm->v4_session_hash, "v4 session table",
1145  200000 /* $$$$ config parameter nbuckets */ ,
1146  (64 << 20) /*$$$ config parameter table size */ );
1147  clib_bihash_init_48_8 (&smm->v6_session_hash, "v6 session table",
1148  200000 /* $$$$ config parameter nbuckets */ ,
1149  (64 << 20) /*$$$ config parameter table size */ );
1150 
1151  clib_bihash_init_16_8 (&smm->v4_half_open_hash, "v4 half-open table",
1152  200000 /* $$$$ config parameter nbuckets */ ,
1153  (64 << 20) /*$$$ config parameter table size */ );
1154  clib_bihash_init_48_8 (&smm->v6_half_open_hash, "v6 half-open table",
1155  200000 /* $$$$ config parameter nbuckets */ ,
1156  (64 << 20) /*$$$ config parameter table size */ );
1157 
1158  smm->is_enabled = 1;
1159 
1160  /* Enable TCP transport */
1161  vnet_tcp_enable_disable (vm, 1);
1162 
1163  return 0;
1164 }
1165 
1166 void
1168 {
1169  u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
1170  /* *INDENT-OFF* */
1171  foreach_vlib_main (({
1172  vlib_node_set_state (this_vlib_main, session_queue_node.index,
1173  state);
1174  }));
1175  /* *INDENT-ON* */
1176 }
1177 
1178 clib_error_t *
1180 {
1181  if (is_en)
1182  {
1183  if (session_manager_main.is_enabled)
1184  return 0;
1185 
1187 
1188  return session_manager_main_enable (vm);
1189  }
1190  else
1191  {
1192  session_manager_main.is_enabled = 0;
1194  }
1195 
1196  return 0;
1197 }
1198 
1199 clib_error_t *
1201 {
1203 
1204  smm->vlib_main = vm;
1205  smm->vnet_main = vnet_get_main ();
1206  smm->is_enabled = 0;
1207 
1208  return 0;
1209 }
1210 
1213  unformat_input_t * input)
1214 {
1216  u32 nitems;
1217 
1218  while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1219  {
1220  if (unformat (input, "event-queue-length %d", &nitems))
1221  {
1222  if (nitems >= 2048)
1223  smm->configured_event_queue_length = nitems;
1224  else
1225  clib_warning ("event queue length %d too small, ignored", nitems);
1226  }
1227  else
1228  return clib_error_return (0, "unknown input `%U'",
1229  format_unformat_error, input);
1230  }
1231  return 0;
1232 }
1233 
1235 
1236 /*
1237  * fd.io coding-style-patch-verification: ON
1238  *
1239  * Local Variables:
1240  * eval: (c-set-style "gnu")
1241  * End:
1242  */
#define vec_validate(V, I)
Make sure vector is long enough for given index (no header, unspecified alignment) ...
Definition: vec.h:436
int session_manager_flush_enqueue_events(u32 thread_index)
Flushes queue of sessions that are to be notified of new data enqueued events.
Definition: session.c:656
sll srl srl sll sra u16x4 i
Definition: vector_sse2.h:337
static void svm_pop_heap(void *oldheap)
Definition: svm.h:94
int segment_manager_alloc_session_fifos(segment_manager_t *sm, svm_fifo_t **server_rx_fifo, svm_fifo_t **server_tx_fifo, u32 *fifo_segment_index)
void svm_fifo_init_pointers(svm_fifo_t *f, u32 pointer)
Set fifo pointers to requested offset.
Definition: svm_fifo.c:726
int stream_session_accept(transport_connection_t *tc, u32 listener_index, u8 sst, u8 notify)
Accept a stream session.
Definition: session.c:838
void stream_session_table_add(session_manager_main_t *smm, stream_session_t *s, u64 value)
Definition: session.c:67
static transport_proto_vft_t * tp_vfts
Per-type vector of transport protocol virtual function tables.
Definition: session.c:31
stream_session_t * stream_session_lookup_listener4(ip4_address_t *lcl, u16 lcl_port, u8 proto)
Definition: session.c:173
vnet_main_t * vnet_get_main(void)
Definition: misc.c:46
struct _transport_connection transport_connection_t
void stream_session_table_add_for_tc(transport_connection_t *tc, u64 value)
Definition: session.c:40
static void make_v4_ss_kv_from_tc(session_kv4_t *kv, transport_connection_t *t)
Definition: transport.h:179
#define PREDICT_TRUE(x)
Definition: clib.h:98
int stream_session_enqueue_data(transport_connection_t *tc, vlib_buffer_t *b, u32 offset, u8 queue_event, u8 is_in_order)
Definition: session.c:494
session_manager_main_t session_manager_main
Definition: session.c:33
#define vec_add1(V, E)
Add 1 element to end of vector (unspecified alignment).
Definition: vec.h:522
static void make_v6_ss_kv(session_kv6_t *kv, ip6_address_t *lcl, ip6_address_t *rmt, u16 lcl_port, u16 rmt_port, u8 proto)
Definition: transport.h:186
int stream_session_table_del_for_tc(transport_connection_t *tc)
Definition: session.c:108
static void make_v6_ss_kv_from_tc(session_kv6_t *kv, transport_connection_t *t)
Definition: transport.h:222
int stream_session_create_i(segment_manager_t *sm, transport_connection_t *tc, stream_session_t **ret_s)
Definition: session.c:381
static u32 svm_fifo_max_enqueue(svm_fifo_t *f)
Definition: svm_fifo.h:77
void session_node_enable_disable(u8 is_en)
Definition: session.c:1167
session_fifo_rx_fn session_tx_fifo_peek_and_snd
static void make_v6_listener_kv(session_kv6_t *kv, ip6_address_t *lcl, u16 lcl_port, u8 proto)
Definition: transport.h:204
#define vec_reset_length(v)
Reset vector length to zero NULL-pointer tolerant.
struct _stream_session_t stream_session_t
struct _svm_fifo svm_fifo_t
fifo_event_type_t
Definition: session.h:29
segment_manager_t * application_get_listen_segment_manager(application_t *app, stream_session_t *s)
Definition: application.c:332
static void make_v4_listener_kv(session_kv4_t *kv, ip4_address_t *lcl, u16 lcl_port, u8 proto)
Definition: transport.h:164
#define VLIB_BUFFER_NEXT_PRESENT
Definition: buffer.h:87
static void stream_session_half_open_table_del(session_manager_main_t *smm, u8 sst, transport_connection_t *tc)
Definition: session.c:146
#define VLIB_INIT_FUNCTION(x)
Definition: init.h:111
static void * svm_push_data_heap(svm_region_t *rp)
Definition: svm.h:86
void stream_session_accept_notify(transport_connection_t *tc)
Definition: session.c:753
#define always_inline
Definition: clib.h:84
static u32 svm_fifo_max_dequeue(svm_fifo_t *f)
Definition: svm_fifo.h:71
static u64 stream_session_half_open_lookup(session_manager_main_t *smm, ip46_address_t *lcl, ip46_address_t *rmt, u16 lcl_port, u16 rmt_port, u8 proto)
Definition: session.c:279
stream_session_t * stream_session_lookup_listener6(ip6_address_t *lcl, u16 lcl_port, u8 proto)
Definition: session.c:219
#define clib_error_return(e, args...)
Definition: error.h:99
svm_region_t * vlib_rp
Definition: api_common.h:187
stream_session_t * stream_session_lookup4(ip4_address_t *lcl, ip4_address_t *rmt, u16 lcl_port, u16 rmt_port, u8 proto, u32 my_thread_index)
Looks up a session based on the 5-tuple passed as argument.
Definition: session.c:200
unsigned long u64
Definition: types.h:89
void stream_session_delete_notify(transport_connection_t *tc)
Notification from transport that connection is being deleted.
Definition: session.c:807
void stream_session_cleanup(stream_session_t *s)
Cleanup transport and session state.
Definition: session.c:1025
void stream_session_delete(stream_session_t *s)
Cleans up session and associated app if needed.
Definition: session.c:785
int stream_session_connect_notify(transport_connection_t *tc, u8 sst, u8 is_fail)
Definition: session.c:704
void session_send_session_evt_to_thread(u64 session_handle, fifo_event_type_t evt_type, u32 thread_index)
Definition: session.c:973
transport_connection_t * stream_session_lookup_transport4(ip4_address_t *lcl, ip4_address_t *rmt, u16 lcl_port, u16 rmt_port, u8 proto, u32 my_thread_index)
Definition: session.c:314
static clib_error_t * session_manager_main_enable(vlib_main_t *vm)
Definition: session.c:1092
struct _transport_proto_vft transport_proto_vft_t
int unix_shared_memory_queue_add(unix_shared_memory_queue_t *q, u8 *elem, int nowait)
u32 stream_session_dequeue_drop(transport_connection_t *tc, u32 max_bytes)
Definition: session.c:583
static session_manager_main_t * vnet_get_session_manager_main()
Definition: session.h:237
#define pool_elt_at_index(p, i)
Returns pointer to element at given index.
Definition: pool.h:397
u16 current_length
Nbytes between current data and the end of this buffer.
Definition: buffer.h:71
struct _unformat_input_t unformat_input_t
static void * vlib_buffer_get_current(vlib_buffer_t *b)
Get pointer to current data to process.
Definition: buffer.h:188
session_type_t
Definition: session.h:75
#define pool_put(P, E)
Free an object E in pool P.
Definition: pool.h:241
void svm_fifo_segment_init(u64 baseva, u32 timeout_in_seconds)
#define PREDICT_FALSE(x)
Definition: clib.h:97
#define VLIB_CONFIG_FUNCTION(x, n,...)
Definition: init.h:119
clib_error_t * vnet_tcp_enable_disable(vlib_main_t *vm, u8 is_en)
Definition: tcp.c:1097
clib_error_t * session_manager_main_init(vlib_main_t *vm)
Definition: session.c:1200
struct _session_manager_main session_manager_main_t
Definition: session.h:157
#define foreach_vlib_main(body)
Definition: threads.h:208
clib_error_t * vnet_session_enable_disable(vlib_main_t *vm, u8 is_en)
Definition: session.c:1179
#define pool_get_aligned(P, E, A)
Allocate an object E from a pool P (general version).
Definition: pool.h:169
static u8 svm_fifo_set_event(svm_fifo_t *f)
Sets fifo event flag.
Definition: svm_fifo.h:94
#define SESSION_EVT_DBG(_evt, _args...)
static unix_shared_memory_queue_t * session_manager_get_vpp_event_queue(u32 thread_index)
Definition: session.h:406
session_fifo_rx_fn session_tx_fifo_dequeue_and_snd
api_main_t api_main
Definition: api_shared.c:35
#define UNFORMAT_END_OF_INPUT
Definition: format.h:143
int stream_session_open(u32 app_index, session_type_t st, transport_endpoint_t *tep, transport_connection_t **res)
Ask transport to open connection to remote transport endpoint.
Definition: session.c:880
u32 stream_session_tx_fifo_max_dequeue(transport_connection_t *tc)
Definition: session.c:566
segment_manager_t * application_get_connect_segment_manager(application_t *app)
Definition: application.c:325
#define clib_warning(format, args...)
Definition: error.h:59
void stream_session_init_fifos_pointers(transport_connection_t *tc, u32 rx_pointer, u32 tx_pointer)
Init fifo tail and head pointers.
Definition: session.c:694
struct _application application_t
int svm_fifo_enqueue_nowait(svm_fifo_t *f, u32 max_bytes, u8 *copy_from_here)
Definition: svm_fifo.c:441
static stream_session_t * stream_session_get_if_valid(u64 si, u32 thread_index)
Definition: session.h:287
void stream_session_disconnect_notify(transport_connection_t *tc)
Notification from transport that connection is being closed.
Definition: session.c:771
static stream_session_t * stream_session_get_tsi(u64 ti_and_si, u32 thread_index)
Definition: session.h:273
#define pool_put_index(p, i)
Free pool element with given index.
Definition: pool.h:255
#define ASSERT(truth)
unsigned int u32
Definition: types.h:88
unix_shared_memory_queue_t * unix_shared_memory_queue_init(int nels, int elsize, int consumer_pid, int signal_when_queue_non_empty)
vhost_vring_state_t state
Definition: vhost-user.h:81
u32 next_buffer
Next buffer for this linked-list of buffers.
Definition: buffer.h:109
static void make_v4_ss_kv(session_kv4_t *kv, ip4_address_t *lcl, ip4_address_t *rmt, u16 lcl_port, u16 rmt_port, u8 proto)
Definition: transport.h:149
static void stream_session_half_open_table_add(session_type_t sst, transport_connection_t *tc, u64 value)
Definition: session.c:78
void stream_session_disconnect(stream_session_t *s)
Disconnect session and propagate to transport.
Definition: session.c:1012
void stream_session_reset_notify(transport_connection_t *tc)
Notify application that connection has been reset.
Definition: session.c:824
static void vlib_node_set_state(vlib_main_t *vm, u32 node_index, vlib_node_state_t new_state)
Set node dispatch state.
Definition: node_funcs.h:146
int svm_fifo_enqueue_with_offset(svm_fifo_t *f, u32 offset, u32 required_bytes, u8 *copy_from_here)
Definition: svm_fifo.c:514
int stream_session_stop_listen(stream_session_t *s)
Ask transport to stop listening on local transport endpoint.
Definition: session.c:950
stream_session_t * stream_session_lookup6(ip6_address_t *lcl, ip6_address_t *rmt, u16 lcl_port, u16 rmt_port, u8 proto, u32 my_thread_index)
Definition: session.c:244
vlib_node_registration_t session_queue_node
(constructor) VLIB_REGISTER_NODE (session_queue_node)
Definition: node.c:25
void session_register_transport(u8 type, const transport_proto_vft_t *vft)
Definition: session.c:1041
static vlib_main_t * vlib_get_main(void)
Definition: global_funcs.h:23
#define HALF_OPEN_LOOKUP_INVALID_VALUE
Definition: session.h:23
transport_proto_vft_t * session_get_transport_vft(u8 type)
Definition: session.c:1055
template key/value backing page structure
Definition: bihash_doc.h:44
static int stream_session_table_del(session_manager_main_t *smm, stream_session_t *s)
Definition: session.c:136
static int stream_session_enqueue_notify(stream_session_t *s, u8 block)
Notify session peer that new data has been enqueued.
Definition: session.c:598
int svm_fifo_dequeue_drop(svm_fifo_t *f, u32 max_bytes)
Definition: svm_fifo.c:675
unsigned short u16
Definition: types.h:57
void segment_manager_dealloc_fifos(u32 svm_segment_index, svm_fifo_t *rx_fifo, svm_fifo_t *tx_fifo)
void session_vpp_event_queue_allocate(session_manager_main_t *smm, u32 thread_index)
Allocate vpp event queue (once) per worker thread.
Definition: session.c:1066
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
static stream_session_t * listen_session_get(session_type_t type, u32 index)
Definition: session.h:453
unsigned char u8
Definition: types.h:56
application_t * application_get(u32 index)
Definition: application.c:189
struct _transport_endpoint transport_endpoint_t
vlib_init_function_t *static _vlib_init_function_session_manager_main_init clib_error_t * session_config_fn(vlib_main_t *vm, unformat_input_t *input)
(constructor) VLIB_INIT_FUNCTION (session_manager_main_init)
Definition: session.c:1212
struct _segment_manager segment_manager_t
int stream_session_listen(stream_session_t *s, transport_endpoint_t *tep)
Ask transport to listen on local transport endpoint.
Definition: session.c:919
u8 * format_unformat_error(u8 *s, va_list *va)
Definition: unformat.c:91
static vlib_thread_main_t * vlib_get_thread_main()
Definition: global_funcs.h:32
static int session_enqueue_chain_tail(stream_session_t *s, vlib_buffer_t *b, u32 offset, u8 is_in_order)
Enqueue buffer chain tail.
Definition: session.c:437
stream_session_t * stream_session_lookup_listener(ip46_address_t *lcl, u16 lcl_port, u8 proto)
Definition: session.c:262
#define CLIB_CACHE_LINE_BYTES
Definition: cache.h:67
transport_connection_t * stream_session_lookup_transport6(ip6_address_t *lcl, ip6_address_t *rmt, u16 lcl_port, u16 rmt_port, u8 proto, u32 my_thread_index)
Definition: session.c:348
u32 flags
buffer flags: VLIB_BUFFER_IS_TRACED: trace this buffer.
Definition: buffer.h:74
int svm_fifo_peek(svm_fifo_t *f, u32 relative_offset, u32 max_bytes, u8 *copy_here)
Definition: svm_fifo.c:659
int stream_session_peek_bytes(transport_connection_t *tc, u8 *buffer, u32 offset, u32 max_bytes)
Definition: session.c:575
static vlib_buffer_t * vlib_get_buffer(vlib_main_t *vm, u32 buffer_index)
Translate buffer index into buffer pointer.
Definition: buffer_funcs.h:57
u8 stream_session_no_space(transport_connection_t *tc, u32 thread_index, u16 data_len)
Check if we have space in rx fifo to push more bytes.
Definition: session.c:551
uword unformat(unformat_input_t *i, const char *fmt,...)
Definition: unformat.c:972
static uword unformat_check_input(unformat_input_t *i)
Definition: format.h:169
struct _unix_shared_memory_queue unix_shared_memory_queue_t
static stream_session_t * stream_session_get(u32 si, u32 thread_index)
Definition: session.h:281