FD.io VPP  v17.10-9-gd594711
Vector Packet Processing
session.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19 
20 #include <vnet/session/session.h>
23 #include <vlibmemory/api.h>
24 #include <vnet/dpo/load_balance.h>
25 #include <vnet/fib/ip4_fib.h>
26 #include <vnet/tcp/tcp.h>
27 
30 
31 int
33  u8 alloc_fifos, stream_session_t ** ret_s)
34 {
36  svm_fifo_t *server_rx_fifo = 0, *server_tx_fifo = 0;
37  u32 fifo_segment_index;
38  u32 pool_index;
40  u64 value;
41  u32 thread_index = tc->thread_index;
42  int rv;
43 
44  ASSERT (thread_index == vlib_get_thread_index ());
45 
46  /* Create the session */
47  pool_get_aligned (smm->sessions[thread_index], s, CLIB_CACHE_LINE_BYTES);
48  memset (s, 0, sizeof (*s));
49  pool_index = s - smm->sessions[thread_index];
50 
51  /* Allocate fifos */
52  if (alloc_fifos)
53  {
54  if ((rv = segment_manager_alloc_session_fifos (sm, &server_rx_fifo,
55  &server_tx_fifo,
56  &fifo_segment_index)))
57  {
58  pool_put (smm->sessions[thread_index], s);
59  return rv;
60  }
61  /* Initialize backpointers */
62  server_rx_fifo->master_session_index = pool_index;
63  server_rx_fifo->master_thread_index = thread_index;
64 
65  server_tx_fifo->master_session_index = pool_index;
66  server_tx_fifo->master_thread_index = thread_index;
67 
68  s->server_rx_fifo = server_rx_fifo;
69  s->server_tx_fifo = server_tx_fifo;
70  s->svm_segment_index = fifo_segment_index;
71  }
72 
73  /* Initialize state machine, such as it is... */
74  s->session_type = session_type_from_proto_and_ip (tc->transport_proto,
75  tc->is_ip4);
76  s->session_state = SESSION_STATE_CONNECTING;
77  s->thread_index = thread_index;
78  s->session_index = pool_index;
79 
80  /* Attach transport to session */
81  s->connection_index = tc->c_index;
82 
83  /* Attach session to transport */
84  tc->s_index = s->session_index;
85 
86  /* Add to the main lookup table */
87  value = stream_session_handle (s);
89 
90  *ret_s = s;
91 
92  return 0;
93 }
94 
95 /**
96  * Discards bytes from buffer chain
97  *
98  * It discards n_bytes_to_drop starting at first buffer after chain_b
99  */
100 always_inline void
102  vlib_buffer_t ** chain_b,
103  u32 n_bytes_to_drop)
104 {
105  vlib_buffer_t *next = *chain_b;
106  u32 to_drop = n_bytes_to_drop;
108  while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
109  {
110  next = vlib_get_buffer (vm, next->next_buffer);
111  if (next->current_length > to_drop)
112  {
113  vlib_buffer_advance (next, to_drop);
114  to_drop = 0;
115  }
116  else
117  {
118  to_drop -= next->current_length;
119  next->current_length = 0;
120  }
121  }
122  *chain_b = next;
123 
124  if (to_drop == 0)
125  b->total_length_not_including_first_buffer -= n_bytes_to_drop;
126 }
127 
128 /**
129  * Enqueue buffer chain tail
130  */
131 always_inline int
133  u32 offset, u8 is_in_order)
134 {
135  vlib_buffer_t *chain_b;
136  u32 chain_bi, len, diff;
138  u8 *data;
139  u32 written = 0;
140  int rv = 0;
141 
142  if (is_in_order && offset)
143  {
144  diff = offset - b->current_length;
146  return 0;
147  chain_b = b;
148  session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
149  chain_bi = vlib_get_buffer_index (vm, chain_b);
150  }
151  else
152  chain_bi = b->next_buffer;
153 
154  do
155  {
156  chain_b = vlib_get_buffer (vm, chain_bi);
157  data = vlib_buffer_get_current (chain_b);
158  len = chain_b->current_length;
159  if (!len)
160  continue;
161  if (is_in_order)
162  {
163  rv = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data);
164  if (rv == len)
165  {
166  written += rv;
167  }
168  else if (rv < len)
169  {
170  return (rv > 0) ? (written + rv) : written;
171  }
172  else if (rv > len)
173  {
174  written += rv;
175 
176  /* written more than what was left in chain */
178  return written;
179 
180  /* drop the bytes that have already been delivered */
181  session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
182  }
183  }
184  else
185  {
186  rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset, len,
187  data);
188  if (rv)
189  {
190  clib_warning ("failed to enqueue multi-buffer seg");
191  return -1;
192  }
193  offset += len;
194  }
195  }
196  while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
197  ? chain_b->next_buffer : 0));
198 
199  if (is_in_order)
200  return written;
201 
202  return 0;
203 }
204 
205 /*
206  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
207  * event but on request can queue notification events for later delivery by
208  * calling stream_server_flush_enqueue_events().
209  *
210  * @param tc Transport connection which is to be enqueued data
211  * @param b Buffer to be enqueued
212  * @param offset Offset at which to start enqueueing if out-of-order
213  * @param queue_event Flag to indicate if peer is to be notified or if event
214  * is to be queued. The former is useful when more data is
215  * enqueued and only one event is to be generated.
216  * @param is_in_order Flag to indicate if data is in order
217  * @return Number of bytes enqueued or a negative value if enqueueing failed.
218  */
219 int
221  u32 offset, u8 queue_event, u8 is_in_order)
222 {
223  stream_session_t *s;
224  int enqueued = 0, rv, in_order_off;
225 
226  s = stream_session_get (tc->s_index, tc->thread_index);
227 
228  if (is_in_order)
229  {
230  enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo,
231  b->current_length,
234  && enqueued >= 0))
235  {
236  in_order_off = enqueued > b->current_length ? enqueued : 0;
237  rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
238  if (rv > 0)
239  enqueued += rv;
240  }
241  }
242  else
243  {
244  rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset,
245  b->current_length,
247  if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
248  session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
249  /* if something was enqueued, report even this as success for ooo
250  * segment handling */
251  return rv;
252  }
253 
254  if (queue_event)
255  {
256  /* Queue RX event on this fifo. Eventually these will need to be flushed
257  * by calling stream_server_flush_enqueue_events () */
259  u32 thread_index = s->thread_index;
260  u32 my_enqueue_epoch = smm->current_enqueue_epoch[thread_index];
261 
262  if (s->enqueue_epoch != my_enqueue_epoch)
263  {
264  s->enqueue_epoch = my_enqueue_epoch;
265  vec_add1 (smm->session_indices_to_enqueue_by_thread[thread_index],
266  s - smm->sessions[thread_index]);
267  }
268  }
269 
270  return enqueued;
271 }
272 
273 /** Check if we have space in rx fifo to push more bytes */
274 u8
276  u16 data_len)
277 {
278  stream_session_t *s = stream_session_get (tc->s_index, thread_index);
279 
280  if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY))
281  return 1;
282 
283  if (data_len > svm_fifo_max_enqueue (s->server_rx_fifo))
284  return 1;
285 
286  return 0;
287 }
288 
289 u32
291 {
292  stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
293  if (!s->server_tx_fifo)
294  return 0;
295  return svm_fifo_max_dequeue (s->server_tx_fifo);
296 }
297 
298 int
300  u32 offset, u32 max_bytes)
301 {
302  stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
303  return svm_fifo_peek (s->server_tx_fifo, offset, max_bytes, buffer);
304 }
305 
306 u32
308 {
309  stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
310  return svm_fifo_dequeue_drop (s->server_tx_fifo, max_bytes);
311 }
312 
313 /**
314  * Notify session peer that new data has been enqueued.
315  *
316  * @param s Stream session for which the event is to be generated.
317  * @param block Flag to indicate if call should block if event queue is full.
318  *
319  * @return 0 on succes or negative number if failed to send notification.
320  */
321 static int
323 {
324  application_t *app;
325  session_fifo_event_t evt;
327  static u32 serial_number;
328 
329  if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
330  {
331  /* Session is closed so app will never clean up. Flush rx fifo */
332  u32 to_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo);
333  if (to_dequeue)
334  svm_fifo_dequeue_drop (s->server_rx_fifo, to_dequeue);
335  return 0;
336  }
337 
338  /* Get session's server */
339  app = application_get_if_valid (s->app_index);
340 
341  if (PREDICT_FALSE (app == 0))
342  {
343  clib_warning ("invalid s->app_index = %d", s->app_index);
344  return 0;
345  }
346 
347  /* Built-in server? Hand event to the callback... */
348  if (app->cb_fns.builtin_server_rx_callback)
349  return app->cb_fns.builtin_server_rx_callback (s);
350 
351  /* If no event, send one */
352  if (svm_fifo_set_event (s->server_rx_fifo))
353  {
354  /* Fabricate event */
355  evt.fifo = s->server_rx_fifo;
356  evt.event_type = FIFO_EVENT_APP_RX;
357  evt.event_id = serial_number++;
358 
359  /* Add event to server's event queue */
360  q = app->event_queue;
361 
362  /* Based on request block (or not) for lack of space */
363  if (block || PREDICT_TRUE (q->cursize < q->maxsize))
364  unix_shared_memory_queue_add (app->event_queue, (u8 *) & evt,
365  0 /* do wait for mutex */ );
366  else
367  {
368  clib_warning ("fifo full");
369  return -1;
370  }
371  }
372 
373  /* *INDENT-OFF* */
374  SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({
375  ed->data[0] = evt.event_id;
376  ed->data[1] = svm_fifo_max_dequeue (s->server_rx_fifo);
377  }));
378  /* *INDENT-ON* */
379 
380  return 0;
381 }
382 
383 /**
384  * Flushes queue of sessions that are to be notified of new data
385  * enqueued events.
386  *
387  * @param thread_index Thread index for which the flush is to be performed.
388  * @return 0 on success or a positive number indicating the number of
389  * failures due to API queue being full.
390  */
391 int
393 {
395  u32 *session_indices_to_enqueue;
396  int i, errors = 0;
397 
398  session_indices_to_enqueue =
399  smm->session_indices_to_enqueue_by_thread[thread_index];
400 
401  for (i = 0; i < vec_len (session_indices_to_enqueue); i++)
402  {
403  stream_session_t *s0;
404 
405  /* Get session */
406  s0 = stream_session_get_if_valid (session_indices_to_enqueue[i],
407  thread_index);
408  if (s0 == 0 || stream_session_enqueue_notify (s0, 0 /* don't block */ ))
409  {
410  errors++;
411  }
412  }
413 
414  vec_reset_length (session_indices_to_enqueue);
415 
416  smm->session_indices_to_enqueue_by_thread[thread_index] =
417  session_indices_to_enqueue;
418 
419  /* Increment enqueue epoch for next round */
420  smm->current_enqueue_epoch[thread_index]++;
421 
422  return errors;
423 }
424 
425 /**
426  * Init fifo tail and head pointers
427  *
428  * Useful if transport uses absolute offsets for tracking ooo segments.
429  */
430 void
432  u32 rx_pointer, u32 tx_pointer)
433 {
434  stream_session_t *s;
435  s = stream_session_get (tc->s_index, tc->thread_index);
436  svm_fifo_init_pointers (s->server_rx_fifo, rx_pointer);
437  svm_fifo_init_pointers (s->server_tx_fifo, tx_pointer);
438 }
439 
440 int
442 {
443  application_t *app;
444  stream_session_t *new_s = 0;
445  u64 handle;
446  u32 opaque = 0;
447  int error = 0;
448  u8 st;
449 
450  st = session_type_from_proto_and_ip (tc->transport_proto, tc->is_ip4);
451  handle = stream_session_half_open_lookup_handle (&tc->lcl_ip, &tc->rmt_ip,
452  tc->lcl_port, tc->rmt_port,
453  st);
454  if (handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
455  {
456  TCP_DBG ("half-open was removed!");
457  return -1;
458  }
459 
460  /* Cleanup half-open table */
462 
463  /* Get the app's index from the handle we stored when opening connection
464  * and the opaque (api_context for external apps) from transport session
465  * index */
466  app = application_get_if_valid (handle >> 32);
467  if (!app)
468  return -1;
469 
470  opaque = tc->s_index;
471 
472  if (!is_fail)
473  {
474  segment_manager_t *sm;
475  u8 alloc_fifos;
477  alloc_fifos = application_is_proxy (app);
478  /* Create new session (svm segments are allocated if needed) */
479  if (stream_session_create_i (sm, tc, alloc_fifos, &new_s))
480  {
481  is_fail = 1;
482  error = -1;
483  }
484  else
485  new_s->app_index = app->index;
486  }
487 
488  /* Notify client application */
489  if (app->cb_fns.session_connected_callback (app->index, opaque, new_s,
490  is_fail))
491  {
492  clib_warning ("failed to notify app");
493  if (!is_fail)
495  }
496  else
497  {
498  if (!is_fail)
499  new_s->session_state = SESSION_STATE_READY;
500  }
501 
502  return error;
503 }
504 
505 void
507 {
508  application_t *server;
509  stream_session_t *s;
510 
511  s = stream_session_get (tc->s_index, tc->thread_index);
512  server = application_get (s->app_index);
513  server->cb_fns.session_accept_callback (s);
514 }
515 
516 /**
517  * Notification from transport that connection is being closed.
518  *
519  * A disconnect is sent to application but state is not removed. Once
520  * disconnect is acknowledged by application, session disconnect is called.
521  * Ultimately this leads to close being called on transport (passive close).
522  */
523 void
525 {
526  application_t *server;
527  stream_session_t *s;
528 
529  s = stream_session_get (tc->s_index, tc->thread_index);
530  server = application_get (s->app_index);
531  server->cb_fns.session_disconnect_callback (s);
532 }
533 
534 /**
535  * Cleans up session and lookup table.
536  */
537 void
539 {
541  int rv;
542 
543  /* Delete from the main lookup table. */
544  if ((rv = stream_session_table_del (s)))
545  clib_warning ("hash delete error, rv %d", rv);
546 
547  /* Cleanup fifo segments */
548  segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo,
549  s->server_tx_fifo);
550 
551  pool_put (smm->sessions[s->thread_index], s);
552  if (CLIB_DEBUG)
553  memset (s, 0xFA, sizeof (*s));
554 }
555 
556 /**
557  * Notification from transport that connection is being deleted
558  *
559  * This removes the session if it is still valid. It should be called only on
560  * previously fully established sessions. For instance failed connects should
561  * call stream_session_connect_notify and indicate that the connect has
562  * failed.
563  */
564 void
566 {
567  stream_session_t *s;
568 
569  /* App might've been removed already */
570  s = stream_session_get_if_valid (tc->s_index, tc->thread_index);
571  if (!s)
572  return;
574 }
575 
576 /**
577  * Notify application that connection has been reset.
578  */
579 void
581 {
582  stream_session_t *s;
583  application_t *app;
584  s = stream_session_get (tc->s_index, tc->thread_index);
585 
586  app = application_get (s->app_index);
587  app->cb_fns.session_reset_callback (s);
588 }
589 
590 /**
591  * Accept a stream session. Optionally ping the server by callback.
592  */
593 int
595  u8 sst, u8 notify)
596 {
597  application_t *server;
598  stream_session_t *s, *listener;
599  segment_manager_t *sm;
600 
601  int rv;
602 
603  /* Find the server */
604  listener = listen_session_get (sst, listener_index);
605  server = application_get (listener->app_index);
606 
607  sm = application_get_listen_segment_manager (server, listener);
608  if ((rv = stream_session_create_i (sm, tc, 1, &s)))
609  return rv;
610 
611  s->app_index = server->index;
612  s->listener_index = listener_index;
613  s->session_state = SESSION_STATE_ACCEPTING;
614 
615  /* Shoulder-tap the server */
616  if (notify)
617  {
618  server->cb_fns.session_accept_callback (s);
619  }
620 
621  return 0;
622 }
623 
624 /**
625  * Ask transport to open connection to remote transport endpoint.
626  *
627  * Stores handle for matching request with reply since the call can be
628  * asynchronous. For instance, for TCP the 3-way handshake must complete
629  * before reply comes. Session is only created once connection is established.
630  *
631  * @param app_index Index of the application requesting the connect
632  * @param st Session type requested.
633  * @param tep Remote transport endpoint
634  * @param res Resulting transport connection .
635  */
636 int
638  transport_endpoint_t * rmt,
639  transport_connection_t ** res)
640 {
642  int rv;
643  u64 handle;
644 
645  rv = tp_vfts[st].open (rmt);
646  if (rv < 0)
647  {
648  clib_warning ("Transport failed to open connection.");
649  return VNET_API_ERROR_SESSION_CONNECT_FAIL;
650  }
651 
652  tc = tp_vfts[st].get_half_open ((u32) rv);
653 
654  /* Save app and tc index. The latter is needed to help establish the
655  * connection while the former is needed when the connect notify comes
656  * and we have to notify the external app */
657  handle = (((u64) app_index) << 32) | (u64) tc->c_index;
658 
659  /* Add to the half-open lookup table */
661 
662  *res = tc;
663 
664  return 0;
665 }
666 
667 /**
668  * Ask transport to listen on local transport endpoint.
669  *
670  * @param s Session for which listen will be called. Note that unlike
671  * established sessions, listen sessions are not associated to a
672  * thread.
673  * @param tep Local endpoint to be listened on.
674  */
675 int
677 {
679  u32 tci;
680 
681  /* Transport bind/listen */
682  tci = tp_vfts[s->session_type].bind (s->session_index, tep);
683 
684  if (tci == (u32) ~ 0)
685  return -1;
686 
687  /* Attach transport to session */
688  s->connection_index = tci;
689  tc = tp_vfts[s->session_type].get_listener (tci);
690 
691  /* Weird but handle it ... */
692  if (tc == 0)
693  return -1;
694 
695  /* Add to the main lookup table */
696  stream_session_table_add_for_tc (tc, s->session_index);
697 
698  return 0;
699 }
700 
701 /**
702  * Ask transport to stop listening on local transport endpoint.
703  *
704  * @param s Session to stop listening on. It must be in state LISTENING.
705  */
706 int
708 {
710 
711  if (s->session_state != SESSION_STATE_LISTENING)
712  {
713  clib_warning ("not a listening session");
714  return -1;
715  }
716 
717  tc = tp_vfts[s->session_type].get_listener (s->connection_index);
718  if (!tc)
719  {
720  clib_warning ("no transport");
721  return VNET_API_ERROR_ADDRESS_NOT_IN_USE;
722  }
723 
725  tp_vfts[s->session_type].unbind (s->connection_index);
726  return 0;
727 }
728 
729 void
731  fifo_event_type_t evt_type,
732  u32 thread_index)
733 {
734  static u16 serial_number = 0;
735  u32 tries = 0;
736  session_fifo_event_t evt;
738 
739  /* Fabricate event */
740  evt.session_handle = session_handle;
741  evt.event_type = evt_type;
742  evt.event_id = serial_number++;
743 
744  q = session_manager_get_vpp_event_queue (thread_index);
745  while (unix_shared_memory_queue_add (q, (u8 *) & evt, 1))
746  {
747  if (tries++ == 3)
748  {
749  TCP_DBG ("failed to enqueue evt");
750  break;
751  }
752  }
753 }
754 
755 /**
756  * Disconnect session and propagate to transport. This should eventually
757  * result in a delete notification that allows us to cleanup session state.
758  * Called for both active/passive disconnects.
759  *
760  * Should be called from the session's thread.
761  */
762 void
764 {
765  s->session_state = SESSION_STATE_CLOSED;
766  tp_vfts[s->session_type].close (s->connection_index, s->thread_index);
767 }
768 
769 /**
770  * Cleanup transport and session state.
771  *
772  * Notify transport of the cleanup, wait for a delete notify to actually
773  * remove the session state.
774  */
775 void
777 {
778  int rv;
779 
780  s->session_state = SESSION_STATE_CLOSED;
781 
782  /* Delete from the main lookup table to avoid more enqueues */
783  rv = stream_session_table_del (s);
784  if (rv)
785  clib_warning ("hash delete error, rv %d", rv);
786 
787  tp_vfts[s->session_type].cleanup (s->connection_index, s->thread_index);
788 }
789 
790 /**
791  * Allocate vpp event queue (once) per worker thread
792  */
793 void
795  u32 thread_index)
796 {
797  api_main_t *am = &api_main;
798  void *oldheap;
799  u32 event_queue_length = 2048;
800 
801  if (smm->vpp_event_queues[thread_index] == 0)
802  {
803  /* Allocate event fifo in the /vpe-api shared-memory segment */
804  oldheap = svm_push_data_heap (am->vlib_rp);
805 
806  if (smm->configured_event_queue_length)
807  event_queue_length = smm->configured_event_queue_length;
808 
809  smm->vpp_event_queues[thread_index] =
811  (event_queue_length,
812  sizeof (session_fifo_event_t), 0 /* consumer pid */ ,
813  0 /* (do not) send signal when queue non-empty */ );
814 
815  svm_pop_heap (oldheap);
816  }
817 }
818 
821 {
822  if (proto == TRANSPORT_PROTO_TCP)
823  {
824  if (is_ip4)
825  return SESSION_TYPE_IP4_TCP;
826  else
827  return SESSION_TYPE_IP6_TCP;
828  }
829  else
830  {
831  if (is_ip4)
832  return SESSION_TYPE_IP4_UDP;
833  else
834  return SESSION_TYPE_IP6_UDP;
835  }
836 
837  return SESSION_N_TYPES;
838 }
839 
840 static clib_error_t *
842 {
845  u32 num_threads;
846  u32 preallocated_sessions_per_worker;
847  int i;
848 
849  num_threads = 1 /* main thread */ + vtm->n_threads;
850 
851  if (num_threads < 1)
852  return clib_error_return (0, "n_thread_stacks not set");
853 
854  /* $$$ config parameters */
855  svm_fifo_segment_init (0x200000000ULL /* first segment base VA */ ,
856  20 /* timeout in seconds */ );
857 
858  /* configure per-thread ** vectors */
859  vec_validate (smm->sessions, num_threads - 1);
860  vec_validate (smm->session_indices_to_enqueue_by_thread, num_threads - 1);
861  vec_validate (smm->tx_buffers, num_threads - 1);
862  vec_validate (smm->pending_event_vector, num_threads - 1);
863  vec_validate (smm->free_event_vector, num_threads - 1);
864  vec_validate (smm->current_enqueue_epoch, num_threads - 1);
865  vec_validate (smm->vpp_event_queues, num_threads - 1);
866 
867  for (i = 0; i < num_threads; i++)
868  {
869  vec_validate (smm->free_event_vector[i], 0);
870  _vec_len (smm->free_event_vector[i]) = 0;
871  vec_validate (smm->pending_event_vector[i], 0);
872  _vec_len (smm->pending_event_vector[i]) = 0;
873  }
874 
875 #if SESSION_DBG
876  vec_validate (smm->last_event_poll_by_thread, num_threads - 1);
877 #endif
878 
879  /* Allocate vpp event queues */
880  for (i = 0; i < vec_len (smm->vpp_event_queues); i++)
882 
883  /* Preallocate sessions */
884  if (smm->preallocated_sessions)
885  {
886  if (num_threads == 1)
887  {
888  pool_init_fixed (smm->sessions[0], smm->preallocated_sessions);
889  }
890  else
891  {
892  int j;
893  preallocated_sessions_per_worker =
894  (1.1 * (f64) smm->preallocated_sessions /
895  (f64) (num_threads - 1));
896 
897  for (j = 1; j < num_threads; j++)
898  {
899  pool_init_fixed (smm->sessions[j],
900  preallocated_sessions_per_worker);
901  }
902  }
903  }
904 
906 
907  smm->is_enabled = 1;
908 
909  /* Enable TCP transport */
910  vnet_tcp_enable_disable (vm, 1);
911 
912  return 0;
913 }
914 
915 void
917 {
918  u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
919  /* *INDENT-OFF* */
921  vlib_node_set_state (this_vlib_main, session_queue_node.index,
922  state);
923  }));
924  /* *INDENT-ON* */
925 }
926 
927 clib_error_t *
929 {
930  if (is_en)
931  {
932  if (session_manager_main.is_enabled)
933  return 0;
934 
936 
937  return session_manager_main_enable (vm);
938  }
939  else
940  {
941  session_manager_main.is_enabled = 0;
943  }
944 
945  return 0;
946 }
947 
948 clib_error_t *
950 {
952  smm->is_enabled = 0;
953  return 0;
954 }
955 
957 
958 static clib_error_t *
960 {
962  u32 nitems;
963  uword tmp;
964 
966  {
967  if (unformat (input, "event-queue-length %d", &nitems))
968  {
969  if (nitems >= 2048)
970  smm->configured_event_queue_length = nitems;
971  else
972  clib_warning ("event queue length %d too small, ignored", nitems);
973  }
974  else if (unformat (input, "preallocated-sessions %d",
975  &smm->preallocated_sessions))
976  ;
977  else if (unformat (input, "v4-session-table-buckets %d",
978  &smm->configured_v4_session_table_buckets))
979  ;
980  else if (unformat (input, "v4-halfopen-table-buckets %d",
981  &smm->configured_v4_halfopen_table_buckets))
982  ;
983  else if (unformat (input, "v6-session-table-buckets %d",
984  &smm->configured_v6_session_table_buckets))
985  ;
986  else if (unformat (input, "v6-halfopen-table-buckets %d",
987  &smm->configured_v6_halfopen_table_buckets))
988  ;
989  else if (unformat (input, "v4-session-table-memory %U",
990  unformat_memory_size, &tmp))
991  {
992  if (tmp >= 0x100000000)
993  return clib_error_return (0, "memory size %llx (%lld) too large",
994  tmp, tmp);
995  smm->configured_v4_session_table_memory = tmp;
996  }
997  else if (unformat (input, "v4-halfopen-table-memory %U",
998  unformat_memory_size, &tmp))
999  {
1000  if (tmp >= 0x100000000)
1001  return clib_error_return (0, "memory size %llx (%lld) too large",
1002  tmp, tmp);
1003  smm->configured_v4_halfopen_table_memory = tmp;
1004  }
1005  else if (unformat (input, "v6-session-table-memory %U",
1006  unformat_memory_size, &tmp))
1007  {
1008  if (tmp >= 0x100000000)
1009  return clib_error_return (0, "memory size %llx (%lld) too large",
1010  tmp, tmp);
1011  smm->configured_v6_session_table_memory = tmp;
1012  }
1013  else if (unformat (input, "v6-halfopen-table-memory %U",
1014  unformat_memory_size, &tmp))
1015  {
1016  if (tmp >= 0x100000000)
1017  return clib_error_return (0, "memory size %llx (%lld) too large",
1018  tmp, tmp);
1019  smm->configured_v6_halfopen_table_memory = tmp;
1020  }
1021  else
1022  return clib_error_return (0, "unknown input `%U'",
1023  format_unformat_error, input);
1024  }
1025  return 0;
1026 }
1027 
1029 
1030 /*
1031  * fd.io coding-style-patch-verification: ON
1032  *
1033  * Local Variables:
1034  * eval: (c-set-style "gnu")
1035  * End:
1036  */
#define vec_validate(V, I)
Make sure vector is long enough for given index (no header, unspecified alignment) ...
Definition: vec.h:432
int session_manager_flush_enqueue_events(u32 thread_index)
Flushes queue of sessions that are to be notified of new data enqueued events.
Definition: session.c:392
sll srl srl sll sra u16x4 i
Definition: vector_sse2.h:337
static void svm_pop_heap(void *oldheap)
Definition: svm.h:94
int segment_manager_alloc_session_fifos(segment_manager_t *sm, svm_fifo_t **server_rx_fifo, svm_fifo_t **server_tx_fifo, u32 *fifo_segment_index)
void svm_fifo_init_pointers(svm_fifo_t *f, u32 pointer)
Set fifo pointers to requested offset.
Definition: svm_fifo.c:827
int stream_session_accept(transport_connection_t *tc, u32 listener_index, u8 sst, u8 notify)
Accept a stream session.
Definition: session.c:594
transport_proto_vft_t * tp_vfts
Per-type vector of transport protocol virtual function tables.
struct _transport_connection transport_connection_t
#define PREDICT_TRUE(x)
Definition: clib.h:98
int stream_session_enqueue_data(transport_connection_t *tc, vlib_buffer_t *b, u32 offset, u8 queue_event, u8 is_in_order)
Definition: session.c:220
session_manager_main_t session_manager_main
Definition: session.c:28
struct _transport_proto_vft transport_proto_vft_t
#define vec_add1(V, E)
Add 1 element to end of vector (unspecified alignment).
Definition: vec.h:518
int stream_session_table_del(stream_session_t *s)
int stream_session_table_del_for_tc(transport_connection_t *tc)
static u32 svm_fifo_max_enqueue(svm_fifo_t *f)
Definition: svm_fifo.h:106
void session_node_enable_disable(u8 is_en)
Definition: session.c:916
vlib_node_registration_t session_queue_node
(constructor) VLIB_REGISTER_NODE (session_queue_node)
Definition: session_node.c:46
#define vec_reset_length(v)
Reset vector length to zero NULL-pointer tolerant.
void stream_session_half_open_table_del(transport_connection_t *tc)
struct _svm_fifo svm_fifo_t
fifo_event_type_t
Definition: session.h:31
segment_manager_t * application_get_listen_segment_manager(application_t *app, stream_session_t *s)
Definition: application.c:378
int application_is_proxy(application_t *app)
Definition: application.c:403
#define VLIB_BUFFER_NEXT_PRESENT
Definition: buffer.h:95
static clib_error_t * session_config_fn(vlib_main_t *vm, unformat_input_t *input)
Definition: session.c:959
#define VLIB_INIT_FUNCTION(x)
Definition: init.h:111
static void * svm_push_data_heap(svm_region_t *rp)
Definition: svm.h:86
void stream_session_accept_notify(transport_connection_t *tc)
Definition: session.c:506
#define always_inline
Definition: clib.h:84
static u32 svm_fifo_max_dequeue(svm_fifo_t *f)
Definition: svm_fifo.h:100
#define clib_error_return(e, args...)
Definition: error.h:99
svm_region_t * vlib_rp
Binary api segment descriptor.
Definition: api_common.h:235
unsigned long u64
Definition: types.h:89
void stream_session_delete_notify(transport_connection_t *tc)
Notification from transport that connection is being deleted.
Definition: session.c:565
int stream_session_open(u32 app_index, session_type_t st, transport_endpoint_t *rmt, transport_connection_t **res)
Ask transport to open connection to remote transport endpoint.
Definition: session.c:637
void stream_session_cleanup(stream_session_t *s)
Cleanup transport and session state.
Definition: session.c:776
void stream_session_delete(stream_session_t *s)
Cleans up session and lookup table.
Definition: session.c:538
struct _stream_session_t stream_session_t
void session_send_session_evt_to_thread(u64 session_handle, fifo_event_type_t evt_type, u32 thread_index)
Definition: session.c:730
static u32 vlib_get_buffer_index(vlib_main_t *vm, void *p)
Translate buffer pointer into buffer index.
Definition: buffer_funcs.h:74
static clib_error_t * session_manager_main_enable(vlib_main_t *vm)
Definition: session.c:841
int unix_shared_memory_queue_add(unix_shared_memory_queue_t *q, u8 *elem, int nowait)
u32 stream_session_dequeue_drop(transport_connection_t *tc, u32 max_bytes)
Definition: session.c:307
static session_manager_main_t * vnet_get_session_manager_main()
Definition: session.h:197
u16 current_length
Nbytes between current data and the end of this buffer.
Definition: buffer.h:72
struct _unformat_input_t unformat_input_t
static void * vlib_buffer_get_current(vlib_buffer_t *b)
Get pointer to current data to process.
Definition: buffer.h:193
#define pool_put(P, E)
Free an object E in pool P.
Definition: pool.h:270
session_type_t
void svm_fifo_segment_init(u64 baseva, u32 timeout_in_seconds)
#define PREDICT_FALSE(x)
Definition: clib.h:97
#define VLIB_CONFIG_FUNCTION(x, n,...)
Definition: init.h:119
clib_error_t * vnet_tcp_enable_disable(vlib_main_t *vm, u8 is_en)
Definition: tcp.c:1392
int stream_session_create_i(segment_manager_t *sm, transport_connection_t *tc, u8 alloc_fifos, stream_session_t **ret_s)
Definition: session.c:32
clib_error_t * session_manager_main_init(vlib_main_t *vm)
Definition: session.c:949
struct _session_manager_main session_manager_main_t
Definition: session.h:113
#define foreach_vlib_main(body)
Definition: threads.h:244
clib_error_t * vnet_session_enable_disable(vlib_main_t *vm, u8 is_en)
Definition: session.c:928
#define TCP_DBG(_fmt, _args...)
Definition: tcp_debug.h:85
API main structure, used by both vpp and binary API clients.
Definition: api_common.h:182
#define pool_get_aligned(P, E, A)
Allocate an object E from a pool P (general version).
Definition: pool.h:188
static u8 svm_fifo_set_event(svm_fifo_t *f)
Sets fifo event flag.
Definition: svm_fifo.h:123
#define SESSION_EVT_DBG(_evt, _args...)
static unix_shared_memory_queue_t * session_manager_get_vpp_event_queue(u32 thread_index)
Definition: session.h:345
api_main_t api_main
Definition: api_shared.c:35
#define UNFORMAT_END_OF_INPUT
Definition: format.h:143
static_always_inline uword vlib_get_thread_index(void)
Definition: threads.h:221
vlib_main_t * vm
Definition: buffer.c:283
int stream_session_connect_notify(transport_connection_t *tc, u8 is_fail)
Definition: session.c:441
u32 stream_session_tx_fifo_max_dequeue(transport_connection_t *tc)
Definition: session.c:290
segment_manager_t * application_get_connect_segment_manager(application_t *app)
Definition: application.c:371
#define clib_warning(format, args...)
Definition: error.h:59
void stream_session_init_fifos_pointers(transport_connection_t *tc, u32 rx_pointer, u32 tx_pointer)
Init fifo tail and head pointers.
Definition: session.c:431
void stream_session_half_open_table_add(transport_connection_t *tc, u64 value)
struct _application application_t
int svm_fifo_enqueue_nowait(svm_fifo_t *f, u32 max_bytes, u8 *copy_from_here)
Definition: svm_fifo.c:533
static stream_session_t * stream_session_get_if_valid(u64 si, u32 thread_index)
Definition: session.h:224
#define pool_init_fixed(pool, max_elts)
initialize a fixed-size, preallocated pool
Definition: pool.h:86
void stream_session_disconnect_notify(transport_connection_t *tc)
Notification from transport that connection is being closed.
Definition: session.c:524
#define ASSERT(truth)
unsigned int u32
Definition: types.h:88
unix_shared_memory_queue_t * unix_shared_memory_queue_init(int nels, int elsize, int consumer_pid, int signal_when_queue_non_empty)
vhost_vring_state_t state
Definition: vhost-user.h:82
u32 next_buffer
Next buffer for this linked-list of buffers.
Definition: buffer.h:109
void stream_session_table_add_for_tc(transport_connection_t *tc, u64 value)
void stream_session_disconnect(stream_session_t *s)
Disconnect session and propagate to transport.
Definition: session.c:763
static void vlib_buffer_advance(vlib_buffer_t *b, word l)
Advance current data pointer by the supplied (signed!) amount.
Definition: buffer.h:206
void stream_session_reset_notify(transport_connection_t *tc)
Notify application that connection has been reset.
Definition: session.c:580
static void vlib_node_set_state(vlib_main_t *vm, u32 node_index, vlib_node_state_t new_state)
Set node dispatch state.
Definition: node_funcs.h:147
int svm_fifo_enqueue_with_offset(svm_fifo_t *f, u32 offset, u32 required_bytes, u8 *copy_from_here)
Definition: svm_fifo.c:608
int stream_session_stop_listen(stream_session_t *s)
Ask transport to stop listening on local transport endpoint.
Definition: session.c:707
session_type_t session_type_from_proto_and_ip(transport_proto_t proto, u8 is_ip4)
Definition: session.c:820
static vlib_main_t * vlib_get_main(void)
Definition: global_funcs.h:23
u64 uword
Definition: types.h:112
#define HALF_OPEN_LOOKUP_INVALID_VALUE
Definition: session.h:25
static void session_enqueue_discard_chain_bytes(vlib_main_t *vm, vlib_buffer_t *b, vlib_buffer_t **chain_b, u32 n_bytes_to_drop)
Discards bytes from buffer chain.
Definition: session.c:101
u32 total_length_not_including_first_buffer
Only valid for first buffer in chain.
Definition: buffer.h:141
static u64 stream_session_handle(stream_session_t *s)
Definition: session.h:237
template key/value backing page structure
Definition: bihash_doc.h:44
static int stream_session_enqueue_notify(stream_session_t *s, u8 block)
Notify session peer that new data has been enqueued.
Definition: session.c:322
int svm_fifo_dequeue_drop(svm_fifo_t *f, u32 max_bytes)
Definition: svm_fifo.c:772
unsigned short u16
Definition: types.h:57
void segment_manager_dealloc_fifos(u32 svm_segment_index, svm_fifo_t *rx_fifo, svm_fifo_t *tx_fifo)
void session_vpp_event_queue_allocate(session_manager_main_t *smm, u32 thread_index)
Allocate vpp event queue (once) per worker thread.
Definition: session.c:794
enum _transport_proto transport_proto_t
void session_lookup_init(void)
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
double f64
Definition: types.h:142
static stream_session_t * listen_session_get(session_type_t type, u32 index)
Definition: session.h:392
unsigned char u8
Definition: types.h:56
application_t * application_get(u32 index)
Definition: application.c:224
struct _transport_endpoint transport_endpoint_t
struct _segment_manager segment_manager_t
u64 stream_session_half_open_lookup_handle(ip46_address_t *lcl, ip46_address_t *rmt, u16 lcl_port, u16 rmt_port, u8 proto)
unformat_function_t unformat_memory_size
Definition: format.h:294
int stream_session_listen(stream_session_t *s, transport_endpoint_t *tep)
Ask transport to listen on local transport endpoint.
Definition: session.c:676
u8 * format_unformat_error(u8 *s, va_list *va)
Definition: unformat.c:91
static vlib_thread_main_t * vlib_get_thread_main()
Definition: global_funcs.h:32
static int session_enqueue_chain_tail(stream_session_t *s, vlib_buffer_t *b, u32 offset, u8 is_in_order)
Enqueue buffer chain tail.
Definition: session.c:132
#define CLIB_CACHE_LINE_BYTES
Definition: cache.h:67
u32 flags
buffer flags: VLIB_BUFFER_FREE_LIST_INDEX_MASK: bits used to store free list index, VLIB_BUFFER_IS_TRACED: trace this buffer.
Definition: buffer.h:75
int svm_fifo_peek(svm_fifo_t *f, u32 relative_offset, u32 max_bytes, u8 *copy_here)
Definition: svm_fifo.c:756
int stream_session_peek_bytes(transport_connection_t *tc, u8 *buffer, u32 offset, u32 max_bytes)
Definition: session.c:299
static vlib_buffer_t * vlib_get_buffer(vlib_main_t *vm, u32 buffer_index)
Translate buffer index into buffer pointer.
Definition: buffer_funcs.h:57
application_t * application_get_if_valid(u32 index)
Definition: application.c:230
u8 stream_session_no_space(transport_connection_t *tc, u32 thread_index, u16 data_len)
Check if we have space in rx fifo to push more bytes.
Definition: session.c:275
uword unformat(unformat_input_t *i, const char *fmt,...)
Definition: unformat.c:972
static uword unformat_check_input(unformat_input_t *i)
Definition: format.h:169
struct _unix_shared_memory_queue unix_shared_memory_queue_t
static stream_session_t * stream_session_get(u32 si, u32 thread_index)
Definition: session.h:217