FD.io VPP  v18.04-17-g3a0d853
Vector Packet Processing
session.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19 
20 #include <vnet/session/session.h>
23 #include <vlibmemory/api.h>
24 #include <vnet/dpo/load_balance.h>
25 #include <vnet/fib/ip4_fib.h>
26 
29 
30 static void
32  u32 thread_index, void *fp, void *rpc_args)
33 {
34  u32 tries = 0;
35  session_fifo_event_t evt = { {0}, };
36  svm_queue_t *q;
37 
38  evt.event_type = evt_type;
39  if (evt_type == FIFO_EVENT_RPC)
40  {
41  evt.rpc_args.fp = fp;
42  evt.rpc_args.arg = rpc_args;
43  }
44  else
45  evt.session_handle = session_handle;
46 
47  q = session_manager_get_vpp_event_queue (thread_index);
48  while (svm_queue_add (q, (u8 *) & evt, 1))
49  {
50  if (tries++ == 3)
51  {
52  SESSION_DBG ("failed to enqueue evt");
53  break;
54  }
55  }
56 }
57 
58 void
60  fifo_event_type_t evt_type,
61  u32 thread_index)
62 {
63  session_send_evt_to_thread (session_handle, evt_type, thread_index, 0, 0);
64 }
65 
66 void
67 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
68 {
69  if (thread_index != vlib_get_thread_index ())
70  session_send_evt_to_thread (0, FIFO_EVENT_RPC, thread_index, fp,
71  rpc_args);
72  else
73  {
74  void (*fnp) (void *) = fp;
75  fnp (rpc_args);
76  }
77 }
78 
80 session_alloc (u32 thread_index)
81 {
84  u8 will_expand = 0;
85  pool_get_aligned_will_expand (smm->sessions[thread_index], will_expand,
87  /* If we have peekers, let them finish */
88  if (PREDICT_FALSE (will_expand && vlib_num_workers ()))
89  {
90  clib_rwlock_writer_lock (&smm->peekers_rw_locks[thread_index]);
91  pool_get_aligned (session_manager_main.sessions[thread_index], s,
93  clib_rwlock_writer_unlock (&smm->peekers_rw_locks[thread_index]);
94  }
95  else
96  {
97  pool_get_aligned (session_manager_main.sessions[thread_index], s,
99  }
100  memset (s, 0, sizeof (*s));
101  s->session_index = s - session_manager_main.sessions[thread_index];
102  s->thread_index = thread_index;
103  return s;
104 }
105 
106 void
108 {
109  pool_put (session_manager_main.sessions[s->thread_index], s);
110  if (CLIB_DEBUG)
111  memset (s, 0xFA, sizeof (*s));
112 }
113 
114 int
116 {
117  svm_fifo_t *server_rx_fifo = 0, *server_tx_fifo = 0;
118  u32 fifo_segment_index;
119  int rv;
120 
121  if ((rv = segment_manager_alloc_session_fifos (sm, &server_rx_fifo,
122  &server_tx_fifo,
123  &fifo_segment_index)))
124  return rv;
125  /* Initialize backpointers */
126  server_rx_fifo->master_session_index = s->session_index;
127  server_rx_fifo->master_thread_index = s->thread_index;
128 
129  server_tx_fifo->master_session_index = s->session_index;
130  server_tx_fifo->master_thread_index = s->thread_index;
131 
132  s->server_rx_fifo = server_rx_fifo;
133  s->server_tx_fifo = server_tx_fifo;
134  s->svm_segment_index = fifo_segment_index;
135  return 0;
136 }
137 
138 static stream_session_t *
140 {
141  stream_session_t *s;
142  u32 thread_index = tc->thread_index;
143 
144  ASSERT (thread_index == vlib_get_thread_index ());
145 
146  s = session_alloc (thread_index);
147  s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
148  s->session_state = SESSION_STATE_CONNECTING;
149  s->enqueue_epoch = ~0;
150 
151  /* Attach transport to session and vice versa */
152  s->connection_index = tc->c_index;
153  tc->s_index = s->session_index;
154  return s;
155 }
156 
157 static int
159  u8 alloc_fifos, stream_session_t ** ret_s)
160 {
161  stream_session_t *s;
162  int rv;
163 
165  if (alloc_fifos && (rv = session_alloc_fifos (sm, s)))
166  {
167  session_free (s);
168  *ret_s = 0;
169  return rv;
170  }
171 
172  /* Add to the main lookup table */
174 
175  *ret_s = s;
176  return 0;
177 }
178 
179 /**
180  * Discards bytes from buffer chain
181  *
182  * It discards n_bytes_to_drop starting at first buffer after chain_b
183  */
184 always_inline void
186  vlib_buffer_t ** chain_b,
187  u32 n_bytes_to_drop)
188 {
189  vlib_buffer_t *next = *chain_b;
190  u32 to_drop = n_bytes_to_drop;
191  ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
192  while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
193  {
194  next = vlib_get_buffer (vm, next->next_buffer);
195  if (next->current_length > to_drop)
196  {
197  vlib_buffer_advance (next, to_drop);
198  to_drop = 0;
199  }
200  else
201  {
202  to_drop -= next->current_length;
203  next->current_length = 0;
204  }
205  }
206  *chain_b = next;
207 
208  if (to_drop == 0)
209  b->total_length_not_including_first_buffer -= n_bytes_to_drop;
210 }
211 
212 /**
213  * Enqueue buffer chain tail
214  */
215 always_inline int
217  u32 offset, u8 is_in_order)
218 {
219  vlib_buffer_t *chain_b;
220  u32 chain_bi, len, diff;
222  u8 *data;
223  u32 written = 0;
224  int rv = 0;
225 
226  if (is_in_order && offset)
227  {
228  diff = offset - b->current_length;
230  return 0;
231  chain_b = b;
232  session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
233  chain_bi = vlib_get_buffer_index (vm, chain_b);
234  }
235  else
236  chain_bi = b->next_buffer;
237 
238  do
239  {
240  chain_b = vlib_get_buffer (vm, chain_bi);
241  data = vlib_buffer_get_current (chain_b);
242  len = chain_b->current_length;
243  if (!len)
244  continue;
245  if (is_in_order)
246  {
247  rv = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data);
248  if (rv == len)
249  {
250  written += rv;
251  }
252  else if (rv < len)
253  {
254  return (rv > 0) ? (written + rv) : written;
255  }
256  else if (rv > len)
257  {
258  written += rv;
259 
260  /* written more than what was left in chain */
262  return written;
263 
264  /* drop the bytes that have already been delivered */
265  session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
266  }
267  }
268  else
269  {
270  rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset, len,
271  data);
272  if (rv)
273  {
274  clib_warning ("failed to enqueue multi-buffer seg");
275  return -1;
276  }
277  offset += len;
278  }
279  }
280  while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
281  ? chain_b->next_buffer : 0));
282 
283  if (is_in_order)
284  return written;
285 
286  return 0;
287 }
288 
289 /*
290  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
291  * event but on request can queue notification events for later delivery by
292  * calling stream_server_flush_enqueue_events().
293  *
294  * @param tc Transport connection which is to be enqueued data
295  * @param b Buffer to be enqueued
296  * @param offset Offset at which to start enqueueing if out-of-order
297  * @param queue_event Flag to indicate if peer is to be notified or if event
298  * is to be queued. The former is useful when more data is
299  * enqueued and only one event is to be generated.
300  * @param is_in_order Flag to indicate if data is in order
301  * @return Number of bytes enqueued or a negative value if enqueueing failed.
302  */
303 int
305  vlib_buffer_t * b, u32 offset,
306  u8 queue_event, u8 is_in_order)
307 {
308  stream_session_t *s;
309  int enqueued = 0, rv, in_order_off;
310 
311  s = session_get (tc->s_index, tc->thread_index);
312 
313  if (is_in_order)
314  {
315  enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo,
316  b->current_length,
318  if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
319  && enqueued >= 0))
320  {
321  in_order_off = enqueued > b->current_length ? enqueued : 0;
322  rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
323  if (rv > 0)
324  enqueued += rv;
325  }
326  }
327  else
328  {
329  rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset,
330  b->current_length,
332  if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
333  session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
334  /* if something was enqueued, report even this as success for ooo
335  * segment handling */
336  return rv;
337  }
338 
339  if (queue_event)
340  {
341  /* Queue RX event on this fifo. Eventually these will need to be flushed
342  * by calling stream_server_flush_enqueue_events () */
344  u32 thread_index = s->thread_index;
345  u32 enqueue_epoch = smm->current_enqueue_epoch[tc->proto][thread_index];
346 
347  if (s->enqueue_epoch != enqueue_epoch)
348  {
349  s->enqueue_epoch = enqueue_epoch;
350  vec_add1 (smm->session_to_enqueue[tc->proto][thread_index],
351  s - smm->sessions[thread_index]);
352  }
353  }
354 
355  return enqueued;
356 }
357 
358 int
360  u8 proto, u8 queue_event)
361 {
362  int enqueued = 0, rv, in_order_off;
363 
364  if (svm_fifo_max_enqueue (s->server_rx_fifo) < b->current_length)
365  return -1;
366  enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, b->current_length,
368  if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0))
369  {
370  in_order_off = enqueued > b->current_length ? enqueued : 0;
371  rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
372  if (rv > 0)
373  enqueued += rv;
374  }
375  if (queue_event)
376  {
377  /* Queue RX event on this fifo. Eventually these will need to be flushed
378  * by calling stream_server_flush_enqueue_events () */
380  u32 thread_index = s->thread_index;
381  u32 enqueue_epoch = smm->current_enqueue_epoch[proto][thread_index];
382 
383  if (s->enqueue_epoch != enqueue_epoch)
384  {
385  s->enqueue_epoch = enqueue_epoch;
386  vec_add1 (smm->session_to_enqueue[proto][thread_index],
387  s - smm->sessions[thread_index]);
388  }
389  }
390  return enqueued;
391 }
392 
393 /** Check if we have space in rx fifo to push more bytes */
394 u8
396  u16 data_len)
397 {
398  stream_session_t *s = session_get (tc->s_index, thread_index);
399 
400  if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY))
401  return 1;
402 
403  if (data_len > svm_fifo_max_enqueue (s->server_rx_fifo))
404  return 1;
405 
406  return 0;
407 }
408 
409 u32
411 {
412  stream_session_t *s = session_get (tc->s_index, tc->thread_index);
413  if (!s->server_tx_fifo)
414  return 0;
415  return svm_fifo_max_dequeue (s->server_tx_fifo);
416 }
417 
418 int
420  u32 offset, u32 max_bytes)
421 {
422  stream_session_t *s = session_get (tc->s_index, tc->thread_index);
423  return svm_fifo_peek (s->server_tx_fifo, offset, max_bytes, buffer);
424 }
425 
426 u32
428 {
429  stream_session_t *s = session_get (tc->s_index, tc->thread_index);
430  return svm_fifo_dequeue_drop (s->server_tx_fifo, max_bytes);
431 }
432 
433 /**
434  * Notify session peer that new data has been enqueued.
435  *
436  * @param s Stream session for which the event is to be generated.
437  * @param block Flag to indicate if call should block if event queue is full.
438  *
439  * @return 0 on succes or negative number if failed to send notification.
440  */
441 static int
443 {
444  application_t *app;
445  session_fifo_event_t evt;
446  svm_queue_t *q;
447 
448  if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
449  {
450  /* Session is closed so app will never clean up. Flush rx fifo */
451  u32 to_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo);
452  if (to_dequeue)
453  svm_fifo_dequeue_drop (s->server_rx_fifo, to_dequeue);
454  return 0;
455  }
456 
457  /* Get session's server */
458  app = application_get_if_valid (s->app_index);
459 
460  if (PREDICT_FALSE (app == 0))
461  {
462  clib_warning ("invalid s->app_index = %d", s->app_index);
463  return 0;
464  }
465 
466  /* Built-in app? Hand event to the callback... */
467  if (app->cb_fns.builtin_app_rx_callback)
468  return app->cb_fns.builtin_app_rx_callback (s);
469 
470  /* If no event, send one */
471  if (svm_fifo_set_event (s->server_rx_fifo))
472  {
473  /* Fabricate event */
474  evt.fifo = s->server_rx_fifo;
475  evt.event_type = FIFO_EVENT_APP_RX;
476 
477  /* Add event to server's event queue */
478  q = app->event_queue;
479 
480  /* Based on request block (or not) for lack of space */
481  if (block || PREDICT_TRUE (q->cursize < q->maxsize))
482  svm_queue_add (app->event_queue, (u8 *) & evt,
483  0 /* do wait for mutex */ );
484  else
485  {
486  clib_warning ("fifo full");
487  return -1;
488  }
489  }
490 
491  /* *INDENT-OFF* */
492  SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({
493  ed->data[0] = evt.event_type;
494  ed->data[1] = svm_fifo_max_dequeue (s->server_rx_fifo);
495  }));
496  /* *INDENT-ON* */
497 
498  return 0;
499 }
500 
501 /**
502  * Flushes queue of sessions that are to be notified of new data
503  * enqueued events.
504  *
505  * @param thread_index Thread index for which the flush is to be performed.
506  * @return 0 on success or a positive number indicating the number of
507  * failures due to API queue being full.
508  */
509 int
510 session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index)
511 {
513  u32 *indices;
514  stream_session_t *s;
515  int i, errors = 0;
516 
517  indices = smm->session_to_enqueue[transport_proto][thread_index];
518 
519  for (i = 0; i < vec_len (indices); i++)
520  {
521  s = session_get_if_valid (indices[i], thread_index);
522  if (s == 0 || session_enqueue_notify (s, 0 /* don't block */ ))
523  errors++;
524  }
525 
526  vec_reset_length (indices);
527  smm->session_to_enqueue[transport_proto][thread_index] = indices;
528  smm->current_enqueue_epoch[transport_proto][thread_index]++;
529 
530  return errors;
531 }
532 
533 /**
534  * Init fifo tail and head pointers
535  *
536  * Useful if transport uses absolute offsets for tracking ooo segments.
537  */
538 void
540  u32 rx_pointer, u32 tx_pointer)
541 {
542  stream_session_t *s;
543  s = session_get (tc->s_index, tc->thread_index);
544  svm_fifo_init_pointers (s->server_rx_fifo, rx_pointer);
545  svm_fifo_init_pointers (s->server_tx_fifo, tx_pointer);
546 }
547 
548 int
550 {
551  u32 opaque = 0, new_ti, new_si;
552  stream_session_t *new_s = 0;
553  segment_manager_t *sm;
554  application_t *app;
555  u8 alloc_fifos;
556  int error = 0;
557  u64 handle;
558 
559  /*
560  * Find connection handle and cleanup half-open table
561  */
562  handle = session_lookup_half_open_handle (tc);
563  if (handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
564  {
565  SESSION_DBG ("half-open was removed!");
566  return -1;
567  }
569 
570  /* Get the app's index from the handle we stored when opening connection
571  * and the opaque (api_context for external apps) from transport session
572  * index */
573  app = application_get_if_valid (handle >> 32);
574  if (!app)
575  return -1;
576  opaque = tc->s_index;
577 
578  /*
579  * Allocate new session with fifos (svm segments are allocated if needed)
580  */
581  if (!is_fail)
582  {
584  alloc_fifos = !application_is_builtin_proxy (app);
585  if (session_alloc_and_init (sm, tc, alloc_fifos, &new_s))
586  {
587  is_fail = 1;
588  error = -1;
589  }
590  else
591  {
592  new_s->app_index = app->index;
593  new_si = new_s->session_index;
594  new_ti = new_s->thread_index;
595  }
596  }
597 
598  /*
599  * Notify client application
600  */
601  if (app->cb_fns.session_connected_callback (app->index, opaque, new_s,
602  is_fail))
603  {
604  SESSION_DBG ("failed to notify app");
605  if (!is_fail)
606  {
607  new_s = session_get (new_si, new_ti);
609  }
610  }
611  else
612  {
613  if (!is_fail)
614  {
615  new_s = session_get (new_si, new_ti);
616  new_s->session_state = SESSION_STATE_READY;
617  }
618  }
619 
620  return error;
621 }
622 
623 typedef struct _session_switch_pool_args
624 {
625  u32 session_index;
626  u32 thread_index;
627  u32 new_thread_index;
628  u32 new_session_index;
630 
631 static void
632 session_switch_pool (void *cb_args)
633 {
636  stream_session_t *s;
637  ASSERT (args->thread_index == vlib_get_thread_index ());
638  s = session_get (args->session_index, args->thread_index);
639  s->server_tx_fifo->master_session_index = args->new_session_index;
640  s->server_tx_fifo->master_thread_index = args->new_thread_index;
642  tp_vfts[tp].cleanup (s->connection_index, s->thread_index);
643  session_free (s);
644  clib_mem_free (cb_args);
645 }
646 
647 /**
648  * Move dgram session to the right thread
649  */
650 int
652  u32 old_thread_index,
653  stream_session_t ** new_session)
654 {
655  stream_session_t *new_s;
656  session_switch_pool_args_t *rpc_args;
657 
658  /*
659  * Clone half-open session to the right thread.
660  */
661  new_s = session_clone_safe (tc->s_index, old_thread_index);
662  new_s->connection_index = tc->c_index;
663  new_s->server_rx_fifo->master_session_index = new_s->session_index;
664  new_s->server_rx_fifo->master_thread_index = new_s->thread_index;
665  new_s->session_state = SESSION_STATE_READY;
667 
668  /*
669  * Ask thread owning the old session to clean it up and make us the tx
670  * fifo owner
671  */
672  rpc_args = clib_mem_alloc (sizeof (*rpc_args));
673  rpc_args->new_session_index = new_s->session_index;
674  rpc_args->new_thread_index = new_s->thread_index;
675  rpc_args->session_index = tc->s_index;
676  rpc_args->thread_index = old_thread_index;
677  session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
678  rpc_args);
679 
680  tc->s_index = new_s->session_index;
681  new_s->connection_index = tc->c_index;
682  *new_session = new_s;
683  return 0;
684 }
685 
686 void
688 {
689  application_t *server;
690  stream_session_t *s;
691 
692  s = session_get (tc->s_index, tc->thread_index);
693  server = application_get (s->app_index);
694  server->cb_fns.session_accept_callback (s);
695 }
696 
697 /**
698  * Notification from transport that connection is being closed.
699  *
700  * A disconnect is sent to application but state is not removed. Once
701  * disconnect is acknowledged by application, session disconnect is called.
702  * Ultimately this leads to close being called on transport (passive close).
703  */
704 void
706 {
707  application_t *server;
708  stream_session_t *s;
709 
710  s = session_get (tc->s_index, tc->thread_index);
711  server = application_get (s->app_index);
712  server->cb_fns.session_disconnect_callback (s);
713 }
714 
715 /**
716  * Cleans up session and lookup table.
717  */
718 void
720 {
721  int rv;
722 
723  /* Delete from the main lookup table. */
724  if ((rv = session_lookup_del_session (s)))
725  clib_warning ("hash delete error, rv %d", rv);
726 
727  /* Cleanup fifo segments */
728  segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo,
729  s->server_tx_fifo);
730  session_free (s);
731 }
732 
733 /**
734  * Notification from transport that connection is being deleted
735  *
736  * This removes the session if it is still valid. It should be called only on
737  * previously fully established sessions. For instance failed connects should
738  * call stream_session_connect_notify and indicate that the connect has
739  * failed.
740  */
741 void
743 {
744  stream_session_t *s;
745 
746  /* App might've been removed already */
747  s = session_get_if_valid (tc->s_index, tc->thread_index);
748  if (!s)
749  return;
751 }
752 
753 /**
754  * Notify application that connection has been reset.
755  */
756 void
758 {
759  stream_session_t *s;
760  application_t *app;
761  s = session_get (tc->s_index, tc->thread_index);
762 
763  app = application_get (s->app_index);
764  app->cb_fns.session_reset_callback (s);
765 }
766 
767 /**
768  * Accept a stream session. Optionally ping the server by callback.
769  */
770 int
772  u8 notify)
773 {
774  application_t *server;
775  stream_session_t *s, *listener;
776  segment_manager_t *sm;
777  session_type_t sst;
778  int rv;
779 
780  sst = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
781 
782  /* Find the server */
783  listener = listen_session_get (sst, listener_index);
784  server = application_get (listener->app_index);
785 
786  sm = application_get_listen_segment_manager (server, listener);
787  if ((rv = session_alloc_and_init (sm, tc, 1, &s)))
788  return rv;
789 
790  s->app_index = server->index;
791  s->listener_index = listener_index;
792  s->session_state = SESSION_STATE_ACCEPTING;
793 
794  /* Shoulder-tap the server */
795  if (notify)
796  {
797  server->cb_fns.session_accept_callback (s);
798  }
799 
800  return 0;
801 }
802 
803 int
804 session_open_cl (u32 app_index, session_endpoint_t * rmt, u32 opaque)
805 {
808  segment_manager_t *sm;
809  stream_session_t *s;
810  application_t *app;
811  int rv;
812 
813  tep = session_endpoint_to_transport (rmt);
814  rv = tp_vfts[rmt->transport_proto].open (tep);
815  if (rv < 0)
816  {
817  SESSION_DBG ("Transport failed to open connection.");
818  return VNET_API_ERROR_SESSION_CONNECT;
819  }
820 
821  tc = tp_vfts[rmt->transport_proto].get_half_open ((u32) rv);
822 
823  /* For dgram type of service, allocate session and fifos now.
824  */
825  app = application_get (app_index);
827 
828  if (session_alloc_and_init (sm, tc, 1, &s))
829  return -1;
830  s->app_index = app->index;
831  s->session_state = SESSION_STATE_CONNECTING_READY;
832 
833  /* Tell the app about the new event fifo for this session */
834  app->cb_fns.session_connected_callback (app->index, opaque, s, 0);
835 
836  return 0;
837 }
838 
839 int
840 session_open_vc (u32 app_index, session_endpoint_t * rmt, u32 opaque)
841 {
844  u64 handle;
845  int rv;
846 
847  /* TODO until udp is fixed */
848  if (rmt->transport_proto == TRANSPORT_PROTO_UDP)
849  return session_open_cl (app_index, rmt, opaque);
850 
851  tep = session_endpoint_to_transport (rmt);
852  rv = tp_vfts[rmt->transport_proto].open (tep);
853  if (rv < 0)
854  {
855  SESSION_DBG ("Transport failed to open connection.");
856  return VNET_API_ERROR_SESSION_CONNECT;
857  }
858 
859  tc = tp_vfts[rmt->transport_proto].get_half_open ((u32) rv);
860 
861  /* If transport offers a stream service, only allocate session once the
862  * connection has been established.
863  * Add connection to half-open table and save app and tc index. The
864  * latter is needed to help establish the connection while the former
865  * is needed when the connect notify comes and we have to notify the
866  * external app
867  */
868  handle = (((u64) app_index) << 32) | (u64) tc->c_index;
869  session_lookup_add_half_open (tc, handle);
870 
871  /* Store api_context (opaque) for when the reply comes. Not the nicest
872  * thing but better than allocating a separate half-open pool.
873  */
874  tc->s_index = opaque;
875  return 0;
876 }
877 
878 int
879 session_open_app (u32 app_index, session_endpoint_t * rmt, u32 opaque)
880 {
882  sep->app_index = app_index;
883  sep->opaque = opaque;
884 
885  return tp_vfts[rmt->transport_proto].open ((transport_endpoint_t *) sep);
886 }
887 
889 
890 /* *INDENT-OFF* */
895 };
896 /* *INDENT-ON* */
897 
898 /**
899  * Ask transport to open connection to remote transport endpoint.
900  *
901  * Stores handle for matching request with reply since the call can be
902  * asynchronous. For instance, for TCP the 3-way handshake must complete
903  * before reply comes. Session is only created once connection is established.
904  *
905  * @param app_index Index of the application requesting the connect
906  * @param st Session type requested.
907  * @param tep Remote transport endpoint
908  * @param opaque Opaque data (typically, api_context) the application expects
909  * on open completion.
910  */
911 int
912 session_open (u32 app_index, session_endpoint_t * rmt, u32 opaque)
913 {
914  transport_service_type_t tst = tp_vfts[rmt->transport_proto].service_type;
915  return session_open_srv_fns[tst] (app_index, rmt, opaque);
916 }
917 
918 /**
919  * Ask transport to listen on local transport endpoint.
920  *
921  * @param s Session for which listen will be called. Note that unlike
922  * established sessions, listen sessions are not associated to a
923  * thread.
924  * @param tep Local endpoint to be listened on.
925  */
926 int
928 {
930  u32 tci;
931 
932  /* Transport bind/listen */
933  tci = tp_vfts[sep->transport_proto].bind (s->session_index,
935  (sep));
936 
937  if (tci == (u32) ~ 0)
938  return -1;
939 
940  /* Attach transport to session */
941  s->connection_index = tci;
942  tc = tp_vfts[sep->transport_proto].get_listener (tci);
943 
944  /* Weird but handle it ... */
945  if (tc == 0)
946  return -1;
947 
948  /* Add to the main lookup table */
949  session_lookup_add_connection (tc, s->session_index);
950  return 0;
951 }
952 
953 int
955 {
957  clib_memcpy (&esep, sep, sizeof (*sep));
958  esep.app_index = s->app_index;
959 
960  return tp_vfts[sep->transport_proto].bind (s->session_index,
961  (transport_endpoint_t *) & esep);
962 }
963 
966 
967 /* *INDENT-OFF* */
973 };
974 /* *INDENT-ON* */
975 
976 int
978 {
979  transport_service_type_t tst = tp_vfts[sep->transport_proto].service_type;
980  return session_listen_srv_fns[tst] (s, sep);
981 }
982 
983 /**
984  * Ask transport to stop listening on local transport endpoint.
985  *
986  * @param s Session to stop listening on. It must be in state LISTENING.
987  */
988 int
990 {
993  if (s->session_state != SESSION_STATE_LISTENING)
994  {
995  clib_warning ("not a listening session");
996  return -1;
997  }
998 
999  tc = tp_vfts[tp].get_listener (s->connection_index);
1000  if (!tc)
1001  {
1002  clib_warning ("no transport");
1003  return VNET_API_ERROR_ADDRESS_NOT_IN_USE;
1004  }
1005 
1007  tp_vfts[tp].unbind (s->connection_index);
1008  return 0;
1009 }
1010 
1011 /**
1012  * Initialize session disconnect.
1013  *
1014  * Request is always sent to session node to ensure that all outstanding
1015  * requests are served before transport is notified.
1016  */
1017 void
1019 {
1020  if (!s || s->session_state == SESSION_STATE_CLOSED)
1021  return;
1022  s->session_state = SESSION_STATE_CLOSED;
1024  FIFO_EVENT_DISCONNECT, s->thread_index);
1025 }
1026 
1027 /**
1028  * Notify transport the session can be disconnected. This should eventually
1029  * result in a delete notification that allows us to cleanup session state.
1030  * Called for both active/passive disconnects.
1031  *
1032  * Must be called from the session's thread.
1033  */
1034 void
1036 {
1037  s->session_state = SESSION_STATE_CLOSED;
1038  tp_vfts[session_get_transport_proto (s)].close (s->connection_index,
1039  s->thread_index);
1040 }
1041 
1042 /**
1043  * Cleanup transport and session state.
1044  *
1045  * Notify transport of the cleanup, wait for a delete notify to actually
1046  * remove the session state.
1047  */
1048 void
1050 {
1051  int rv;
1052 
1053  s->session_state = SESSION_STATE_CLOSED;
1054 
1055  /* Delete from the main lookup table to avoid more enqueues */
1056  rv = session_lookup_del_session (s);
1057  if (rv)
1058  clib_warning ("hash delete error, rv %d", rv);
1059 
1060  tp_vfts[session_get_transport_proto (s)].cleanup (s->connection_index,
1061  s->thread_index);
1062 }
1063 
1064 /**
1065  * Allocate event queues in the shared-memory segment
1066  *
1067  * That can either be a newly created memfd segment, that will need to be
1068  * mapped by all stack users, or the binary api's svm region. The latter is
1069  * assumed to be already mapped. NOTE that this assumption DOES NOT hold if
1070  * api clients bootstrap shm api over sockets (i.e. use memfd segments) and
1071  * vpp uses api svm region for event queues.
1072  */
1073 void
1075 {
1076  u32 evt_q_length = 2048, evt_size = sizeof (session_fifo_event_t);
1077  ssvm_private_t *eqs = &smm->evt_qs_segment;
1078  api_main_t *am = &api_main;
1079  u64 eqs_size = 64 << 20;
1080  pid_t vpp_pid = getpid ();
1081  void *oldheap;
1082  int i;
1083 
1084  if (smm->configured_event_queue_length)
1085  evt_q_length = smm->configured_event_queue_length;
1086 
1087  if (smm->evt_qs_use_memfd_seg)
1088  {
1089  if (smm->evt_qs_segment_size)
1090  eqs_size = smm->evt_qs_segment_size;
1091 
1092  eqs->ssvm_size = eqs_size;
1093  eqs->i_am_master = 1;
1094  eqs->my_pid = vpp_pid;
1095  eqs->name = format (0, "%s%c", "evt-qs-segment", 0);
1096  eqs->requested_va = smm->session_baseva;
1097 
1099  }
1100 
1101  if (smm->evt_qs_use_memfd_seg)
1102  oldheap = ssvm_push_heap (eqs->sh);
1103  else
1104  oldheap = svm_push_data_heap (am->vlib_rp);
1105 
1106  for (i = 0; i < vec_len (smm->vpp_event_queues); i++)
1107  {
1108  smm->vpp_event_queues[i] = svm_queue_init (evt_q_length, evt_size,
1109  vpp_pid, 0);
1110  }
1111 
1112  if (smm->evt_qs_use_memfd_seg)
1113  ssvm_pop_heap (oldheap);
1114  else
1115  svm_pop_heap (oldheap);
1116 }
1117 
1120 {
1122  if (smm->evt_qs_use_memfd_seg)
1123  return &smm->evt_qs_segment;
1124  return 0;
1125 }
1126 
1127 /* *INDENT-OFF* */
1132 };
1133 /* *INDENT-ON* */
1134 
1135 /**
1136  * Initialize session layer for given transport proto and ip version
1137  *
1138  * Allocates per session type (transport proto + ip version) data structures
1139  * and adds arc from session queue node to session type output node.
1140  */
1141 void
1143  const transport_proto_vft_t * vft, u8 is_ip4,
1144  u32 output_node)
1145 {
1147  session_type_t session_type;
1148  u32 next_index = ~0;
1149 
1150  session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1151 
1152  vec_validate (smm->session_type_to_next, session_type);
1153  vec_validate (smm->listen_sessions, session_type);
1154  vec_validate (smm->session_tx_fns, session_type);
1155 
1156  /* *INDENT-OFF* */
1157  if (output_node != ~0)
1158  {
1159  foreach_vlib_main (({
1160  next_index = vlib_node_add_next (this_vlib_main,
1161  session_queue_node.index,
1162  output_node);
1163  }));
1164  }
1165  /* *INDENT-ON* */
1166 
1167  smm->session_type_to_next[session_type] = next_index;
1168  smm->session_tx_fns[session_type] = session_tx_fns[vft->tx_type];
1169 }
1170 
1173 {
1174  transport_proto_t tp;
1175  if (s->session_state != SESSION_STATE_LISTENING)
1176  {
1177  tp = session_get_transport_proto (s);
1178  return tp_vfts[tp].get_connection (s->connection_index,
1179  s->thread_index);
1180  }
1181  return 0;
1182 }
1183 
1186 {
1188  return tp_vfts[tp].get_listener (s->connection_index);
1189 }
1190 
1191 int
1193  session_endpoint_t * sep)
1194 {
1197  tc = tp_vfts[tp].get_listener (listener->connection_index);
1198  if (!tc)
1199  {
1200  clib_warning ("no transport");
1201  return -1;
1202  }
1203 
1204  /* N.B. The ip should not be copied because this is the local endpoint */
1205  sep->port = tc->lcl_port;
1206  sep->transport_proto = tc->proto;
1207  sep->is_ip4 = tc->is_ip4;
1208  return 0;
1209 }
1210 
1211 static clib_error_t *
1213 {
1214  segment_manager_main_init_args_t _sm_args = { 0 }, *sm_args = &_sm_args;
1217  u32 num_threads, preallocated_sessions_per_worker;
1218  int i, j;
1219 
1220  num_threads = 1 /* main thread */ + vtm->n_threads;
1221 
1222  if (num_threads < 1)
1223  return clib_error_return (0, "n_thread_stacks not set");
1224 
1225  /* configure per-thread ** vectors */
1226  vec_validate (smm->sessions, num_threads - 1);
1227  vec_validate (smm->tx_buffers, num_threads - 1);
1228  vec_validate (smm->pending_event_vector, num_threads - 1);
1229  vec_validate (smm->pending_disconnects, num_threads - 1);
1230  vec_validate (smm->free_event_vector, num_threads - 1);
1231  vec_validate (smm->vpp_event_queues, num_threads - 1);
1232  vec_validate (smm->peekers_rw_locks, num_threads - 1);
1233 
1234  for (i = 0; i < TRANSPORT_N_PROTO; i++)
1235  for (j = 0; j < num_threads; j++)
1236  {
1237  vec_validate (smm->session_to_enqueue[i], num_threads - 1);
1238  vec_validate (smm->current_enqueue_epoch[i], num_threads - 1);
1239  }
1240 
1241  for (i = 0; i < num_threads; i++)
1242  {
1243  vec_validate (smm->free_event_vector[i], 0);
1244  _vec_len (smm->free_event_vector[i]) = 0;
1245  vec_validate (smm->pending_event_vector[i], 0);
1246  _vec_len (smm->pending_event_vector[i]) = 0;
1247  vec_validate (smm->pending_disconnects[i], 0);
1248  _vec_len (smm->pending_disconnects[i]) = 0;
1249  if (num_threads > 1)
1250  clib_rwlock_init (&smm->peekers_rw_locks[i]);
1251  }
1252 
1253 #if SESSION_DBG
1254  vec_validate (smm->last_event_poll_by_thread, num_threads - 1);
1255 #endif
1256 
1257  /* Allocate vpp event queues segment and queue */
1259 
1260  /* Initialize fifo segment main baseva and timeout */
1261  sm_args->baseva = smm->session_baseva + smm->evt_qs_segment_size;
1262  sm_args->size = smm->session_va_space_size;
1263  segment_manager_main_init (sm_args);
1264 
1265  /* Preallocate sessions */
1266  if (smm->preallocated_sessions)
1267  {
1268  if (num_threads == 1)
1269  {
1270  pool_init_fixed (smm->sessions[0], smm->preallocated_sessions);
1271  }
1272  else
1273  {
1274  int j;
1275  preallocated_sessions_per_worker =
1276  (1.1 * (f64) smm->preallocated_sessions /
1277  (f64) (num_threads - 1));
1278 
1279  for (j = 1; j < num_threads; j++)
1280  {
1281  pool_init_fixed (smm->sessions[j],
1282  preallocated_sessions_per_worker);
1283  }
1284  }
1285  }
1286 
1289  transport_init ();
1290 
1291  smm->is_enabled = 1;
1292 
1293  /* Enable transports */
1294  transport_enable_disable (vm, 1);
1295 
1296  return 0;
1297 }
1298 
1299 void
1301 {
1302  u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
1303  /* *INDENT-OFF* */
1304  foreach_vlib_main (({
1305  vlib_node_set_state (this_vlib_main, session_queue_node.index,
1306  state);
1307  }));
1308  /* *INDENT-ON* */
1309 }
1310 
1311 clib_error_t *
1313 {
1314  clib_error_t *error = 0;
1315  if (is_en)
1316  {
1317  if (session_manager_main.is_enabled)
1318  return 0;
1319 
1321  error = session_manager_main_enable (vm);
1322  }
1323  else
1324  {
1325  session_manager_main.is_enabled = 0;
1327  }
1328 
1329  return error;
1330 }
1331 
1332 clib_error_t *
1334 {
1336  smm->session_baseva = 0x200000000ULL;
1337  smm->session_va_space_size = (u64) 128 << 30;
1338  smm->evt_qs_segment_size = 64 << 20;
1339  smm->is_enabled = 0;
1340  return 0;
1341 }
1342 
1344 
1345 static clib_error_t *
1347 {
1349  u32 nitems;
1350  uword tmp;
1351 
1352  while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1353  {
1354  if (unformat (input, "event-queue-length %d", &nitems))
1355  {
1356  if (nitems >= 2048)
1357  smm->configured_event_queue_length = nitems;
1358  else
1359  clib_warning ("event queue length %d too small, ignored", nitems);
1360  }
1361  else if (unformat (input, "preallocated-sessions %d",
1362  &smm->preallocated_sessions))
1363  ;
1364  else if (unformat (input, "v4-session-table-buckets %d",
1365  &smm->configured_v4_session_table_buckets))
1366  ;
1367  else if (unformat (input, "v4-halfopen-table-buckets %d",
1368  &smm->configured_v4_halfopen_table_buckets))
1369  ;
1370  else if (unformat (input, "v6-session-table-buckets %d",
1371  &smm->configured_v6_session_table_buckets))
1372  ;
1373  else if (unformat (input, "v6-halfopen-table-buckets %d",
1374  &smm->configured_v6_halfopen_table_buckets))
1375  ;
1376  else if (unformat (input, "v4-session-table-memory %U",
1377  unformat_memory_size, &tmp))
1378  {
1379  if (tmp >= 0x100000000)
1380  return clib_error_return (0, "memory size %llx (%lld) too large",
1381  tmp, tmp);
1382  smm->configured_v4_session_table_memory = tmp;
1383  }
1384  else if (unformat (input, "v4-halfopen-table-memory %U",
1385  unformat_memory_size, &tmp))
1386  {
1387  if (tmp >= 0x100000000)
1388  return clib_error_return (0, "memory size %llx (%lld) too large",
1389  tmp, tmp);
1390  smm->configured_v4_halfopen_table_memory = tmp;
1391  }
1392  else if (unformat (input, "v6-session-table-memory %U",
1393  unformat_memory_size, &tmp))
1394  {
1395  if (tmp >= 0x100000000)
1396  return clib_error_return (0, "memory size %llx (%lld) too large",
1397  tmp, tmp);
1398  smm->configured_v6_session_table_memory = tmp;
1399  }
1400  else if (unformat (input, "v6-halfopen-table-memory %U",
1401  unformat_memory_size, &tmp))
1402  {
1403  if (tmp >= 0x100000000)
1404  return clib_error_return (0, "memory size %llx (%lld) too large",
1405  tmp, tmp);
1406  smm->configured_v6_halfopen_table_memory = tmp;
1407  }
1408  else if (unformat (input, "local-endpoints-table-memory %U",
1409  unformat_memory_size, &tmp))
1410  {
1411  if (tmp >= 0x100000000)
1412  return clib_error_return (0, "memory size %llx (%lld) too large",
1413  tmp, tmp);
1414  smm->local_endpoints_table_memory = tmp;
1415  }
1416  else if (unformat (input, "local-endpoints-table-buckets %d",
1417  &smm->local_endpoints_table_buckets))
1418  ;
1419  else if (unformat (input, "evt_qs_memfd_seg"))
1420  smm->evt_qs_use_memfd_seg = 1;
1421  else
1422  return clib_error_return (0, "unknown input `%U'",
1423  format_unformat_error, input);
1424  }
1425  return 0;
1426 }
1427 
1429 
1430 /*
1431  * fd.io coding-style-patch-verification: ON
1432  *
1433  * Local Variables:
1434  * eval: (c-set-style "gnu")
1435  * End:
1436  */
#define vec_validate(V, I)
Make sure vector is long enough for given index (no header, unspecified alignment) ...
Definition: vec.h:434
u64 ssvm_size
Definition: ssvm.h:84
static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES]
Definition: session.c:891
static svm_queue_t * session_manager_get_vpp_event_queue(u32 thread_index)
Definition: session.h:505
int session_open_vc(u32 app_index, session_endpoint_t *rmt, u32 opaque)
Definition: session.c:840
static session_listen_service_fn session_listen_srv_fns[TRANSPORT_N_SERVICES]
Definition: session.c:969
int session_lookup_del_connection(transport_connection_t *tc)
Delete transport connection from session table.
static void svm_pop_heap(void *oldheap)
Definition: svm.h:94
uword requested_va
Definition: ssvm.h:87
int svm_queue_add(svm_queue_t *q, u8 *elem, int nowait)
Definition: queue.c:184
void svm_fifo_init_pointers(svm_fifo_t *f, u32 pointer)
Set fifo pointers to requested offset.
Definition: svm_fifo.c:829
static void clib_rwlock_writer_lock(clib_rwlock_t *p)
Definition: lock.h:177
transport_proto_vft_t * tp_vfts
Per-type vector of transport protocol virtual function tables.
Definition: transport.c:23
struct _transport_connection transport_connection_t
#define PREDICT_TRUE(x)
Definition: clib.h:106
session_manager_main_t session_manager_main
Definition: session.c:27
static int session_enqueue_notify(stream_session_t *s, u8 block)
Notify session peer that new data has been enqueued.
Definition: session.c:442
#define pool_get_aligned_will_expand(P, YESNO, A)
See if pool_get will expand the pool or not.
Definition: pool.h:230
struct _transport_proto_vft transport_proto_vft_t
#define vec_add1(V, E)
Add 1 element to end of vector (unspecified alignment).
Definition: vec.h:520
void session_send_rpc_evt_to_thread(u32 thread_index, void *fp, void *rpc_args)
Definition: session.c:67
int i
transport_connection_t * session_get_transport(stream_session_t *s)
Definition: session.c:1172
static u32 svm_fifo_max_enqueue(svm_fifo_t *f)
Definition: svm_fifo.h:106
ssvm_shared_header_t * sh
Definition: ssvm.h:83
u8 * format(u8 *s, const char *fmt,...)
Definition: format.c:419
int session_lookup_del_session(stream_session_t *s)
void segment_manager_dealloc_fifos(u32 segment_index, svm_fifo_t *rx_fifo, svm_fifo_t *tx_fifo)
int session_enqueue_stream_connection(transport_connection_t *tc, vlib_buffer_t *b, u32 offset, u8 queue_event, u8 is_in_order)
Definition: session.c:304
u64 session_lookup_half_open_handle(transport_connection_t *tc)
void session_node_enable_disable(u8 is_en)
Definition: session.c:1300
vlib_node_registration_t session_queue_node
(constructor) VLIB_REGISTER_NODE (session_queue_node)
Definition: session_node.c:46
enum transport_service_type_ transport_service_type_t
static uword vlib_node_add_next(vlib_main_t *vm, uword node, uword next_node)
Definition: node_funcs.h:1110
session_fifo_rx_fn session_tx_fifo_peek_and_snd
#define vec_reset_length(v)
Reset vector length to zero NULL-pointer tolerant.
static session_fifo_rx_fn * session_tx_fns[TRANSPORT_TX_N_FNS]
Definition: session.c:1128
static void session_send_evt_to_thread(u64 session_handle, fifo_event_type_t evt_type, u32 thread_index, void *fp, void *rpc_args)
Definition: session.c:31
struct _svm_fifo svm_fifo_t
void session_free(stream_session_t *s)
Definition: session.c:107
svm_queue_t * svm_queue_init(int nels, int elsize, int consumer_pid, int signal_when_queue_non_empty)
Definition: queue.c:51
int session_open_app(u32 app_index, session_endpoint_t *rmt, u32 opaque)
Definition: session.c:879
fifo_event_type_t
Definition: session.h:33
segment_manager_t * application_get_listen_segment_manager(application_t *app, stream_session_t *s)
Definition: application.c:547
void app_namespaces_init(void)
static clib_error_t * session_config_fn(vlib_main_t *vm, unformat_input_t *input)
Definition: session.c:1346
#define VLIB_INIT_FUNCTION(x)
Definition: init.h:111
static void * svm_push_data_heap(svm_region_t *rp)
Definition: svm.h:86
void stream_session_accept_notify(transport_connection_t *tc)
Definition: session.c:687
#define always_inline
Definition: clib.h:92
static u32 svm_fifo_max_dequeue(svm_fifo_t *f)
Definition: svm_fifo.h:100
static stream_session_t * session_alloc_for_connection(transport_connection_t *tc)
Definition: session.c:139
int session_listen_vc(stream_session_t *s, session_endpoint_t *sep)
Ask transport to listen on local transport endpoint.
Definition: session.c:927
static void * ssvm_push_heap(ssvm_shared_header_t *sh)
Definition: ssvm.h:144
#define clib_error_return(e, args...)
Definition: error.h:99
svm_region_t * vlib_rp
Current binary api segment descriptor.
Definition: api_common.h:252
int svm_fifo_enqueue_nowait(svm_fifo_t *f, u32 max_bytes, const u8 *copy_from_here)
Definition: svm_fifo.c:534
unsigned long u64
Definition: types.h:89
void stream_session_delete_notify(transport_connection_t *tc)
Notification from transport that connection is being deleted.
Definition: session.c:742
void stream_session_cleanup(stream_session_t *s)
Cleanup transport and session state.
Definition: session.c:1049
void stream_session_delete(stream_session_t *s)
Cleans up session and lookup table.
Definition: session.c:719
struct _stream_session_t stream_session_t
void session_vpp_event_queues_allocate(session_manager_main_t *smm)
Allocate event queues in the shared-memory segment.
Definition: session.c:1074
int ssvm_master_init(ssvm_private_t *ssvm, ssvm_segment_type_t type)
Definition: ssvm.c:375
static void ssvm_pop_heap(void *oldheap)
Definition: ssvm.h:152
void session_send_session_evt_to_thread(u64 session_handle, fifo_event_type_t evt_type, u32 thread_index)
Definition: session.c:59
int session_lookup_del_half_open(transport_connection_t *tc)
static u32 vlib_get_buffer_index(vlib_main_t *vm, void *p)
Translate buffer pointer into buffer index.
Definition: buffer_funcs.h:74
static clib_error_t * session_manager_main_enable(vlib_main_t *vm)
Definition: session.c:1212
#define session_endpoint_to_transport(_sep)
static transport_proto_t session_get_transport_proto(stream_session_t *s)
Definition: session.h:324
u32 stream_session_dequeue_drop(transport_connection_t *tc, u32 max_bytes)
Definition: session.c:427
int session_open(u32 app_index, session_endpoint_t *rmt, u32 opaque)
Ask transport to open connection to remote transport endpoint.
Definition: session.c:912
int segment_manager_alloc_session_fifos(segment_manager_t *sm, svm_fifo_t **rx_fifo, svm_fifo_t **tx_fifo, u32 *fifo_segment_index)
int session_dgram_connect_notify(transport_connection_t *tc, u32 old_thread_index, stream_session_t **new_session)
Move dgram session to the right thread.
Definition: session.c:651
static session_manager_main_t * vnet_get_session_manager_main()
Definition: session.h:217
static void clib_rwlock_init(clib_rwlock_t *p)
Definition: lock.h:122
u16 current_length
Nbytes between current data and the end of this buffer.
Definition: buffer.h:108
struct _session_endpoint session_endpoint_t
struct _unformat_input_t unformat_input_t
static session_handle_t session_handle(stream_session_t *s)
Definition: session.h:261
static void * vlib_buffer_get_current(vlib_buffer_t *b)
Get pointer to current data to process.
Definition: buffer.h:209
#define pool_put(P, E)
Free an object E in pool P.
Definition: pool.h:273
#define PREDICT_FALSE(x)
Definition: clib.h:105
#define VLIB_CONFIG_FUNCTION(x, n,...)
Definition: init.h:119
clib_error_t * session_manager_main_init(vlib_main_t *vm)
Definition: session.c:1333
static stream_session_t * session_get_if_valid(u64 si, u32 thread_index)
Definition: session.h:248
struct _session_manager_main session_manager_main_t
Definition: session.h:107
#define foreach_vlib_main(body)
Definition: threads.h:244
int stream_session_accept(transport_connection_t *tc, u32 listener_index, u8 notify)
Accept a stream session.
Definition: session.c:771
clib_error_t * vnet_session_enable_disable(vlib_main_t *vm, u8 is_en)
Definition: session.c:1312
static stream_session_t * session_get(u32 si, u32 thread_index)
Definition: session.h:241
struct _session_switch_pool_args session_switch_pool_args_t
API main structure, used by both vpp and binary API clients.
Definition: api_common.h:199
int session_alloc_fifos(segment_manager_t *sm, stream_session_t *s)
Definition: session.c:115
#define pool_get_aligned(P, E, A)
Allocate an object E from a pool P (general version).
Definition: pool.h:188
#define SESSION_DBG(_fmt, _args...)
static u8 svm_fifo_set_event(svm_fifo_t *f)
Sets fifo event flag.
Definition: svm_fifo.h:123
static void clib_rwlock_writer_unlock(clib_rwlock_t *p)
Definition: lock.h:185
#define SESSION_EVT_DBG(_evt, _args...)
void transport_init(void)
Definition: transport.c:374
session_fifo_rx_fn session_tx_fifo_dequeue_and_snd
#define UNFORMAT_END_OF_INPUT
Definition: format.h:143
static_always_inline uword vlib_get_thread_index(void)
Definition: threads.h:221
static session_type_t session_type_from_proto_and_ip(transport_proto_t proto, u8 is_ip4)
Definition: session.h:337
vlib_main_t * vm
Definition: buffer.c:294
u32 stream_session_tx_fifo_max_dequeue(transport_connection_t *tc)
Definition: session.c:410
segment_manager_t * application_get_connect_segment_manager(application_t *app)
Definition: application.c:540
#define clib_warning(format, args...)
Definition: error.h:59
void stream_session_init_fifos_pointers(transport_connection_t *tc, u32 rx_pointer, u32 tx_pointer)
Init fifo tail and head pointers.
Definition: session.c:539
#define clib_memcpy(a, b, c)
Definition: string.h:75
struct _application application_t
u32 my_pid
Definition: ssvm.h:85
void transport_enable_disable(vlib_main_t *vm, u8 is_en)
Definition: transport.c:363
#define pool_init_fixed(pool, max_elts)
initialize a fixed-size, preallocated pool
Definition: pool.h:86
int session_enqueue_dgram_connection(stream_session_t *s, vlib_buffer_t *b, u8 proto, u8 queue_event)
Definition: session.c:359
void stream_session_disconnect_notify(transport_connection_t *tc)
Notification from transport that connection is being closed.
Definition: session.c:705
ssvm_private_t * session_manager_get_evt_q_segment(void)
Definition: session.c:1119
#define ASSERT(truth)
unsigned int u32
Definition: types.h:88
u8 session_type_t
vhost_vring_state_t state
Definition: vhost-user.h:82
u32 next_buffer
Next buffer for this linked-list of buffers.
Definition: buffer.h:126
void segment_manager_main_init(segment_manager_main_init_args_t *a)
static void clib_mem_free(void *p)
Definition: mem.h:179
int listen_session_get_local_session_endpoint(stream_session_t *listener, session_endpoint_t *sep)
Definition: session.c:1192
void stream_session_disconnect(stream_session_t *s)
Initialize session disconnect.
Definition: session.c:1018
static void vlib_buffer_advance(vlib_buffer_t *b, word l)
Advance current data pointer by the supplied (signed!) amount.
Definition: buffer.h:222
void stream_session_reset_notify(transport_connection_t *tc)
Notify application that connection has been reset.
Definition: session.c:757
static void vlib_node_set_state(vlib_main_t *vm, u32 node_index, vlib_node_state_t new_state)
Set node dispatch state.
Definition: node_funcs.h:147
int session_listen_app(stream_session_t *s, session_endpoint_t *sep)
Definition: session.c:954
int svm_fifo_enqueue_with_offset(svm_fifo_t *f, u32 offset, u32 required_bytes, u8 *copy_from_here)
Definition: svm_fifo.c:610
int stream_session_stop_listen(stream_session_t *s)
Ask transport to stop listening on local transport endpoint.
Definition: session.c:989
static void * clib_mem_alloc(uword size)
Definition: mem.h:112
static vlib_main_t * vlib_get_main(void)
Definition: global_funcs.h:23
u8 * name
Definition: ssvm.h:86
stream_session_t * session_alloc(u32 thread_index)
Definition: session.c:80
u64 uword
Definition: types.h:112
#define HALF_OPEN_LOOKUP_INVALID_VALUE
Definition: session.h:25
static void session_enqueue_discard_chain_bytes(vlib_main_t *vm, vlib_buffer_t *b, vlib_buffer_t **chain_b, u32 n_bytes_to_drop)
Discards bytes from buffer chain.
Definition: session.c:185
u32 total_length_not_including_first_buffer
Only valid for first buffer in chain.
Definition: buffer.h:159
template key/value backing page structure
Definition: bihash_doc.h:44
static void session_switch_pool(void *cb_args)
Definition: session.c:632
int svm_fifo_dequeue_drop(svm_fifo_t *f, u32 max_bytes)
Definition: svm_fifo.c:774
unsigned short u16
Definition: types.h:57
enum _transport_proto transport_proto_t
transport_connection_t * listen_session_get_transport(stream_session_t *s)
Definition: session.c:1185
void session_lookup_init(void)
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
double f64
Definition: types.h:142
static stream_session_t * listen_session_get(session_type_t type, u32 index)
Definition: session.h:559
unsigned char u8
Definition: types.h:56
session_fifo_rx_fn session_tx_fifo_dequeue_internal
static stream_session_t * session_clone_safe(u32 session_index, u32 thread_index)
Definition: session.h:435
application_t * application_get(u32 index)
Definition: application.c:386
struct _transport_endpoint transport_endpoint_t
struct _segment_manager segment_manager_t
struct _svm_queue svm_queue_t
unformat_function_t unformat_memory_size
Definition: format.h:294
int session_lookup_add_connection(transport_connection_t *tc, u64 value)
Add transport connection to a session table.
int session_manager_flush_enqueue_events(u8 transport_proto, u32 thread_index)
Flushes queue of sessions that are to be notified of new data enqueued events.
Definition: session.c:510
u8 * format_unformat_error(u8 *s, va_list *va)
Definition: unformat.c:91
int session_stream_connect_notify(transport_connection_t *tc, u8 is_fail)
Definition: session.c:549
static vlib_thread_main_t * vlib_get_thread_main()
Definition: global_funcs.h:32
static u32 vlib_num_workers()
Definition: threads.h:368
static int session_enqueue_chain_tail(stream_session_t *s, vlib_buffer_t *b, u32 offset, u8 is_in_order)
Enqueue buffer chain tail.
Definition: session.c:216
int(* session_open_service_fn)(u32, session_endpoint_t *, u32)
Definition: session.c:888
int session_open_cl(u32 app_index, session_endpoint_t *rmt, u32 opaque)
Definition: session.c:804
int application_is_builtin_proxy(application_t *app)
Definition: application.c:589
int( session_fifo_rx_fn)(vlib_main_t *vm, vlib_node_runtime_t *node, session_manager_main_t *smm, session_fifo_event_t *e0, stream_session_t *s0, u32 thread_index, int *n_tx_pkts)
Definition: session.h:110
int(* session_listen_service_fn)(stream_session_t *, session_endpoint_t *)
Definition: session.c:964
#define CLIB_CACHE_LINE_BYTES
Definition: cache.h:59
u32 flags
buffer flags: VLIB_BUFFER_FREE_LIST_INDEX_MASK: bits used to store free list index, VLIB_BUFFER_IS_TRACED: trace this buffer.
Definition: buffer.h:111
static int session_alloc_and_init(segment_manager_t *sm, transport_connection_t *tc, u8 alloc_fifos, stream_session_t **ret_s)
Definition: session.c:158
int svm_fifo_peek(svm_fifo_t *f, u32 relative_offset, u32 max_bytes, u8 *copy_here)
Definition: svm_fifo.c:758
void session_register_transport(transport_proto_t transport_proto, const transport_proto_vft_t *vft, u8 is_ip4, u32 output_node)
Initialize session layer for given transport proto and ip version.
Definition: session.c:1142
int session_lookup_add_half_open(transport_connection_t *tc, u64 value)
api_main_t api_main
Definition: api_shared.c:35
int stream_session_peek_bytes(transport_connection_t *tc, u8 *buffer, u32 offset, u32 max_bytes)
Definition: session.c:419
static vlib_buffer_t * vlib_get_buffer(vlib_main_t *vm, u32 buffer_index)
Translate buffer index into buffer pointer.
Definition: buffer_funcs.h:57
int i_am_master
Definition: ssvm.h:88
void stream_session_disconnect_transport(stream_session_t *s)
Notify transport the session can be disconnected.
Definition: session.c:1035
struct _session_endpoint_extended session_endpoint_extended_t
application_t * application_get_if_valid(u32 index)
Definition: application.c:394
u8 stream_session_no_space(transport_connection_t *tc, u32 thread_index, u16 data_len)
Check if we have space in rx fifo to push more bytes.
Definition: session.c:395
uword unformat(unformat_input_t *i, const char *fmt,...)
Definition: unformat.c:972
static uword unformat_check_input(unformat_input_t *i)
Definition: format.h:169
int stream_session_listen(stream_session_t *s, session_endpoint_t *sep)
Definition: session.c:977