FD.io VPP  v21.01.1
Vector Packet Processing
session.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19 
20 #include <vnet/session/session.h>
22 #include <vnet/dpo/load_balance.h>
23 #include <vnet/fib/ip4_fib.h>
24 
26 
27 static inline int
28 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
29  session_evt_type_t evt_type)
30 {
31  session_event_t *evt;
32  svm_msg_q_msg_t msg;
33  svm_msg_q_t *mq;
34 
35  mq = session_main_get_vpp_event_queue (thread_index);
36  if (PREDICT_FALSE (svm_msg_q_lock (mq)))
37  return -1;
40  {
41  svm_msg_q_unlock (mq);
42  return -2;
43  }
44  switch (evt_type)
45  {
48  evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
49  evt->rpc_args.fp = data;
50  evt->rpc_args.arg = args;
51  break;
52  case SESSION_IO_EVT_RX:
53  case SESSION_IO_EVT_TX:
57  evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
58  evt->session_index = *(u32 *) data;
59  break;
64  evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
65  evt->session_handle = session_handle ((session_t *) data);
66  break;
67  default:
68  clib_warning ("evt unhandled!");
69  svm_msg_q_unlock (mq);
70  return -1;
71  }
72  evt->event_type = evt_type;
73 
74  svm_msg_q_add_and_unlock (mq, &msg);
75  return 0;
76 }
77 
78 int
80 {
81  return session_send_evt_to_thread (&f->master_session_index, 0,
82  f->master_thread_index, evt_type);
83 }
84 
85 int
87  session_evt_type_t evt_type)
88 {
89  return session_send_evt_to_thread (data, 0, thread_index, evt_type);
90 }
91 
92 int
94 {
95  /* only events supported are disconnect and reset */
96  ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE
97  || evt_type == SESSION_CTRL_EVT_RESET);
98  return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
99 }
100 
101 void
103  void *rpc_args)
104 {
105  session_send_evt_to_thread (fp, rpc_args, thread_index,
107 }
108 
109 void
110 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
111 {
112  if (thread_index != vlib_get_thread_index ())
113  session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
114  else
115  {
116  void (*fnp) (void *) = fp;
117  fnp (rpc_args);
118  }
119 }
120 
121 void
123 {
124  session_t *s;
125 
126  s = session_get (tc->s_index, tc->thread_index);
128  ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
129  if (!(s->flags & SESSION_F_CUSTOM_TX))
130  {
131  s->flags |= SESSION_F_CUSTOM_TX;
132  if (svm_fifo_set_event (s->tx_fifo)
134  {
135  session_worker_t *wrk;
136  session_evt_elt_t *elt;
137  wrk = session_main_get_worker (tc->thread_index);
138  if (has_prio)
139  elt = session_evt_alloc_new (wrk);
140  else
141  elt = session_evt_alloc_old (wrk);
142  elt->evt.session_index = tc->s_index;
143  elt->evt.event_type = SESSION_IO_EVT_TX;
144  tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED;
145  }
146  }
147 }
148 
149 void
151 {
152  session_worker_t *wrk = session_main_get_worker (tc->thread_index);
153  session_evt_elt_t *elt;
154 
155  ASSERT (tc->thread_index == vlib_get_thread_index ());
156 
157  elt = session_evt_alloc_new (wrk);
158  elt->evt.session_index = tc->s_index;
159  elt->evt.event_type = SESSION_IO_EVT_TX;
160 }
161 
162 static void
164 {
165  u32 thread_index = vlib_get_thread_index ();
166  session_evt_elt_t *elt;
167  session_worker_t *wrk;
168 
169  /* If we are in the handler thread, or being called with the worker barrier
170  * held, just append a new event to pending disconnects vector. */
171  if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
172  {
174  elt = session_evt_alloc_ctrl (wrk);
175  clib_memset (&elt->evt, 0, sizeof (session_event_t));
176  elt->evt.session_handle = session_handle (s);
177  elt->evt.event_type = evt;
178  }
179  else
181 }
182 
183 session_t *
184 session_alloc (u32 thread_index)
185 {
186  session_worker_t *wrk = &session_main.wrk[thread_index];
187  session_t *s;
188  u8 will_expand = 0;
189  pool_get_aligned_will_expand (wrk->sessions, will_expand,
191  /* If we have peekers, let them finish */
192  if (PREDICT_FALSE (will_expand && vlib_num_workers ()))
193  {
197  }
198  else
199  {
201  }
202  clib_memset (s, 0, sizeof (*s));
203  s->session_index = s - wrk->sessions;
204  s->thread_index = thread_index;
206  return s;
207 }
208 
209 void
211 {
212  if (CLIB_DEBUG)
213  {
214  u8 thread_index = s->thread_index;
215  clib_memset (s, 0xFA, sizeof (*s));
216  pool_put (session_main.wrk[thread_index].sessions, s);
217  return;
218  }
219  SESSION_EVT (SESSION_EVT_FREE, s);
220  pool_put (session_main.wrk[s->thread_index].sessions, s);
221 }
222 
223 u8
224 session_is_valid (u32 si, u8 thread_index)
225 {
226  session_t *s;
228 
229  s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si);
230 
231  if (!s)
232  return 1;
233 
234  if (s->thread_index != thread_index || s->session_index != si)
235  return 0;
236 
237  if (s->session_state == SESSION_STATE_TRANSPORT_DELETED
238  || s->session_state <= SESSION_STATE_LISTENING)
239  return 1;
240 
241  tc = session_get_transport (s);
242  if (s->connection_index != tc->c_index
243  || s->thread_index != tc->thread_index || tc->s_index != si)
244  return 0;
245 
246  return 1;
247 }
248 
249 static void
251 {
252  app_worker_t *app_wrk;
253 
254  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
255  if (!app_wrk)
256  return;
257  app_worker_cleanup_notify (app_wrk, s, ntf);
258 }
259 
260 void
262 {
265  session_free (s);
266 }
267 
268 /**
269  * Cleans up session and lookup table.
270  *
271  * Transport connection must still be valid.
272  */
273 static void
275 {
276  int rv;
277 
278  /* Delete from the main lookup table. */
279  if ((rv = session_lookup_del_session (s)))
280  clib_warning ("session %u hash delete rv %d", s->session_index, rv);
281 
283 }
284 
285 void
287 {
289 }
290 
291 void
293  session_handle_t ho_handle)
294 {
295  app_worker_t *app_wrk = app_worker_get (session_handle_data (ho_handle));
296  app_worker_del_half_open (app_wrk, tp, ho_handle);
297 }
298 
299 session_t *
301 {
302  session_t *s;
303  u32 thread_index = tc->thread_index;
304 
305  ASSERT (thread_index == vlib_get_thread_index ()
306  || transport_protocol_is_cl (tc->proto));
307 
308  s = session_alloc (thread_index);
309  s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
310  s->session_state = SESSION_STATE_CLOSED;
311 
312  /* Attach transport to session and vice versa */
313  s->connection_index = tc->c_index;
314  tc->s_index = s->session_index;
315  return s;
316 }
317 
318 /**
319  * Discards bytes from buffer chain
320  *
321  * It discards n_bytes_to_drop starting at first buffer after chain_b
322  */
323 always_inline void
325  vlib_buffer_t ** chain_b,
326  u32 n_bytes_to_drop)
327 {
328  vlib_buffer_t *next = *chain_b;
329  u32 to_drop = n_bytes_to_drop;
330  ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
331  while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
332  {
333  next = vlib_get_buffer (vm, next->next_buffer);
334  if (next->current_length > to_drop)
335  {
336  vlib_buffer_advance (next, to_drop);
337  to_drop = 0;
338  }
339  else
340  {
341  to_drop -= next->current_length;
342  next->current_length = 0;
343  }
344  }
345  *chain_b = next;
346 
347  if (to_drop == 0)
348  b->total_length_not_including_first_buffer -= n_bytes_to_drop;
349 }
350 
351 /**
352  * Enqueue buffer chain tail
353  */
354 always_inline int
356  u32 offset, u8 is_in_order)
357 {
358  vlib_buffer_t *chain_b;
359  u32 chain_bi, len, diff;
361  u8 *data;
362  u32 written = 0;
363  int rv = 0;
364 
365  if (is_in_order && offset)
366  {
367  diff = offset - b->current_length;
369  return 0;
370  chain_b = b;
371  session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
372  chain_bi = vlib_get_buffer_index (vm, chain_b);
373  }
374  else
375  chain_bi = b->next_buffer;
376 
377  do
378  {
379  chain_b = vlib_get_buffer (vm, chain_bi);
380  data = vlib_buffer_get_current (chain_b);
381  len = chain_b->current_length;
382  if (!len)
383  continue;
384  if (is_in_order)
385  {
386  rv = svm_fifo_enqueue (s->rx_fifo, len, data);
387  if (rv == len)
388  {
389  written += rv;
390  }
391  else if (rv < len)
392  {
393  return (rv > 0) ? (written + rv) : written;
394  }
395  else if (rv > len)
396  {
397  written += rv;
398 
399  /* written more than what was left in chain */
401  return written;
402 
403  /* drop the bytes that have already been delivered */
404  session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
405  }
406  }
407  else
408  {
409  rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
410  if (rv)
411  {
412  clib_warning ("failed to enqueue multi-buffer seg");
413  return -1;
414  }
415  offset += len;
416  }
417  }
418  while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
419  ? chain_b->next_buffer : 0));
420 
421  if (is_in_order)
422  return written;
423 
424  return 0;
425 }
426 
427 void
430 {
431  if (s->flags & SESSION_F_CUSTOM_FIFO_TUNING)
432  {
433  app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
434  app_worker_session_fifo_tuning (app_wrk, s, f, act, len);
435  if (CLIB_ASSERT_ENABLE)
436  {
437  segment_manager_t *sm;
438  sm = segment_manager_get (f->segment_manager);
439  ASSERT (f->size >= 4096);
440  ASSERT (f->size <= sm->max_fifo_size);
441  }
442  }
443 }
444 
445 /*
446  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
447  * event but on request can queue notification events for later delivery by
448  * calling stream_server_flush_enqueue_events().
449  *
450  * @param tc Transport connection which is to be enqueued data
451  * @param b Buffer to be enqueued
452  * @param offset Offset at which to start enqueueing if out-of-order
453  * @param queue_event Flag to indicate if peer is to be notified or if event
454  * is to be queued. The former is useful when more data is
455  * enqueued and only one event is to be generated.
456  * @param is_in_order Flag to indicate if data is in order
457  * @return Number of bytes enqueued or a negative value if enqueueing failed.
458  */
459 int
462  u8 queue_event, u8 is_in_order)
463 {
464  session_t *s;
465  int enqueued = 0, rv, in_order_off;
466 
467  s = session_get (tc->s_index, tc->thread_index);
468 
469  if (is_in_order)
470  {
471  enqueued = svm_fifo_enqueue (s->rx_fifo,
472  b->current_length,
474  if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
475  && enqueued >= 0))
476  {
477  in_order_off = enqueued > b->current_length ? enqueued : 0;
478  rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
479  if (rv > 0)
480  enqueued += rv;
481  }
482  }
483  else
484  {
485  rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
486  b->current_length,
488  if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
489  session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
490  /* if something was enqueued, report even this as success for ooo
491  * segment handling */
492  return rv;
493  }
494 
495  if (queue_event)
496  {
497  /* Queue RX event on this fifo. Eventually these will need to be flushed
498  * by calling stream_server_flush_enqueue_events () */
499  session_worker_t *wrk;
500 
502  if (!(s->flags & SESSION_F_RX_EVT))
503  {
504  s->flags |= SESSION_F_RX_EVT;
505  vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
506  }
507 
509  }
510 
511  return enqueued;
512 }
513 
514 int
516  session_dgram_hdr_t * hdr,
517  vlib_buffer_t * b, u8 proto, u8 queue_event)
518 {
519  int rv;
520 
522  >= b->current_length + sizeof (*hdr));
523 
524  if (PREDICT_TRUE (!(b->flags & VLIB_BUFFER_NEXT_PRESENT)))
525  {
526  /* *INDENT-OFF* */
527  svm_fifo_seg_t segs[2] = {
528  { (u8 *) hdr, sizeof (*hdr) },
530  };
531  /* *INDENT-ON* */
532 
533  rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, 2,
534  0 /* allow_partial */ );
535  }
536  else
537  {
539  svm_fifo_seg_t *segs = 0, *seg;
540  vlib_buffer_t *it = b;
541  u32 n_segs = 1;
542 
543  vec_add2 (segs, seg, 1);
544  seg->data = (u8 *) hdr;
545  seg->len = sizeof (*hdr);
546  while (it)
547  {
548  vec_add2 (segs, seg, 1);
549  seg->data = vlib_buffer_get_current (it);
550  seg->len = it->current_length;
551  n_segs++;
552  if (!(it->flags & VLIB_BUFFER_NEXT_PRESENT))
553  break;
554  it = vlib_get_buffer (vm, it->next_buffer);
555  }
556  rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, n_segs,
557  0 /* allow partial */ );
558  vec_free (segs);
559  }
560 
561  if (queue_event && rv > 0)
562  {
563  /* Queue RX event on this fifo. Eventually these will need to be flushed
564  * by calling stream_server_flush_enqueue_events () */
565  session_worker_t *wrk;
566 
568  if (!(s->flags & SESSION_F_RX_EVT))
569  {
570  s->flags |= SESSION_F_RX_EVT;
571  vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
572  }
573 
575  }
576  return rv > 0 ? rv : 0;
577 }
578 
579 int
581  u32 offset, u32 max_bytes)
582 {
583  session_t *s = session_get (tc->s_index, tc->thread_index);
584  return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
585 }
586 
587 u32
589 {
590  session_t *s = session_get (tc->s_index, tc->thread_index);
591  u32 rv;
592 
593  rv = svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
595 
596  if (svm_fifo_needs_deq_ntf (s->tx_fifo, max_bytes))
598 
599  return rv;
600 }
601 
602 static inline int
604  svm_fifo_t * f, session_evt_type_t evt_type)
605 {
606  app_worker_t *app_wrk;
607  application_t *app;
608  int i;
609 
610  app = application_get (app_index);
611  if (!app)
612  return -1;
613 
614  for (i = 0; i < f->n_subscribers; i++)
615  {
616  app_wrk = application_get_worker (app, f->subscribers[i]);
617  if (!app_wrk)
618  continue;
619  if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
620  return -1;
621  }
622 
623  return 0;
624 }
625 
626 /**
627  * Notify session peer that new data has been enqueued.
628  *
629  * @param s Stream session for which the event is to be generated.
630  * @param lock Flag to indicate if call should lock message queue.
631  *
632  * @return 0 on success or negative number if failed to send notification.
633  */
634 static inline int
636 {
637  app_worker_t *app_wrk;
638  u32 session_index;
639  u8 n_subscribers;
640 
641  session_index = s->session_index;
642  n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
643 
644  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
645  if (PREDICT_FALSE (!app_wrk))
646  {
647  SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
648  return 0;
649  }
650 
651  SESSION_EVT (SESSION_EVT_ENQ, s, svm_fifo_max_dequeue_prod (s->rx_fifo));
652 
653  s->flags &= ~SESSION_F_RX_EVT;
654 
655  /* Application didn't confirm accept yet */
656  if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING))
657  return 0;
658 
661  return -1;
662 
663  if (PREDICT_FALSE (n_subscribers))
664  {
665  s = session_get (session_index, vlib_get_thread_index ());
666  return session_notify_subscribers (app_wrk->app_index, s,
668  }
669 
670  return 0;
671 }
672 
673 int
675 {
677 }
678 
679 static void
681 {
682  u32 session_index = pointer_to_uword (arg);
683  session_t *s;
684 
685  s = session_get_if_valid (session_index, vlib_get_thread_index ());
686  if (!s)
687  return;
688 
690 }
691 
692 /**
693  * Like session_enqueue_notify, but can be called from a thread that does not
694  * own the session.
695  */
696 void
698 {
699  u32 thread_index = session_thread_from_handle (sh);
700  u32 session_index = session_index_from_handle (sh);
701 
702  /*
703  * Pass session index (u32) as opposed to handle (u64) in case pointers
704  * are not 64-bit.
705  */
706  session_send_rpc_evt_to_thread (thread_index,
708  uword_to_pointer (session_index, void *));
709 }
710 
711 int
713 {
714  app_worker_t *app_wrk;
715 
717 
718  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
719  if (PREDICT_FALSE (!app_wrk))
720  return -1;
721 
724  return -1;
725 
726  if (PREDICT_FALSE (s->tx_fifo->n_subscribers))
727  return session_notify_subscribers (app_wrk->app_index, s,
729 
730  return 0;
731 }
732 
733 /**
734  * Flushes queue of sessions that are to be notified of new data
735  * enqueued events.
736  *
737  * @param thread_index Thread index for which the flush is to be performed.
738  * @return 0 on success or a positive number indicating the number of
739  * failures due to API queue being full.
740  */
741 int
743 {
744  session_worker_t *wrk = session_main_get_worker (thread_index);
745  session_t *s;
746  int i, errors = 0;
747  u32 *indices;
748 
749  indices = wrk->session_to_enqueue[transport_proto];
750 
751  for (i = 0; i < vec_len (indices); i++)
752  {
753  s = session_get_if_valid (indices[i], thread_index);
754  if (PREDICT_FALSE (!s))
755  {
756  errors++;
757  continue;
758  }
759 
761  0 /* TODO/not needed */ );
762 
764  errors++;
765  }
766 
767  vec_reset_length (indices);
768  wrk->session_to_enqueue[transport_proto] = indices;
769 
770  return errors;
771 }
772 
773 int
775 {
777  int i, errors = 0;
778  for (i = 0; i < 1 + vtm->n_threads; i++)
779  errors += session_main_flush_enqueue_events (transport_proto, i);
780  return errors;
781 }
782 
783 int
785  session_error_t err)
786 {
787  session_handle_t ho_handle, wrk_handle;
788  u32 opaque = 0, new_ti, new_si;
789  app_worker_t *app_wrk;
790  session_t *s = 0;
791 
792  /*
793  * Find connection handle and cleanup half-open table
794  */
795  ho_handle = session_lookup_half_open_handle (tc);
796  if (ho_handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
797  {
798  SESSION_DBG ("half-open was removed!");
799  return -1;
800  }
802 
803  /* Get the app's index from the handle we stored when opening connection
804  * and the opaque (api_context for external apps) from transport session
805  * index */
806  app_wrk = app_worker_get_if_valid (session_handle_data (ho_handle));
807  if (!app_wrk)
808  return -1;
809 
810  wrk_handle = app_worker_lookup_half_open (app_wrk, tc->proto, ho_handle);
811  if (wrk_handle == SESSION_INVALID_HANDLE)
812  return -1;
813 
814  /* Make sure this is the same half-open index */
815  if (session_handle_index (wrk_handle) != session_handle_index (ho_handle))
816  return -1;
817 
818  opaque = session_handle_data (wrk_handle);
819 
820  if (err)
821  return app_worker_connect_notify (app_wrk, s, err, opaque);
822 
824  s->session_state = SESSION_STATE_CONNECTING;
825  s->app_wrk_index = app_wrk->wrk_index;
826  new_si = s->session_index;
827  new_ti = s->thread_index;
828 
829  if ((err = app_worker_init_connected (app_wrk, s)))
830  {
831  session_free (s);
832  app_worker_connect_notify (app_wrk, 0, err, opaque);
833  return -1;
834  }
835 
836  s = session_get (new_si, new_ti);
837  s->session_state = SESSION_STATE_READY;
839 
840  if (app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, opaque))
841  {
843  /* Avoid notifying app about rejected session cleanup */
844  s = session_get (new_si, new_ti);
846  session_free (s);
847  return -1;
848  }
849 
850  return 0;
851 }
852 
853 static void
855 {
856  u32 session_index = pointer_to_uword (arg);
857  segment_manager_t *sm;
858  app_worker_t *app_wrk;
859  session_t *s;
860 
861  s = session_get_if_valid (session_index, vlib_get_thread_index ());
862  if (!s)
863  return;
864 
865  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
866  if (!app_wrk)
867  return;
868 
869  /* Attach fifos to the right session and segment slice */
873 
874  /* Notify app that it has data on the new session */
876 }
877 
878 typedef struct _session_switch_pool_args
879 {
880  u32 session_index;
881  u32 thread_index;
882  u32 new_thread_index;
883  u32 new_session_index;
885 
886 /**
887  * Notify old thread of the session pool switch
888  */
889 static void
890 session_switch_pool (void *cb_args)
891 {
893  session_handle_t new_sh;
894  segment_manager_t *sm;
895  app_worker_t *app_wrk;
896  session_t *s;
897  void *rargs;
898 
899  ASSERT (args->thread_index == vlib_get_thread_index ());
900  s = session_get (args->session_index, args->thread_index);
901 
903  s->thread_index);
904 
905  new_sh = session_make_handle (args->new_session_index,
906  args->new_thread_index);
907 
908  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
909  if (app_wrk)
910  {
911  /* Cleanup fifo segment slice state for fifos */
915 
916  /* Notify app, using old session, about the migration event */
917  app_worker_migrate_notify (app_wrk, s, new_sh);
918  }
919 
920  /* Trigger app read and fifo updates on the new thread */
921  rargs = uword_to_pointer (args->new_session_index, void *);
922  session_send_rpc_evt_to_thread (args->new_thread_index,
924 
925  session_free (s);
926  clib_mem_free (cb_args);
927 }
928 
929 /**
930  * Move dgram session to the right thread
931  */
932 int
934  u32 old_thread_index, session_t ** new_session)
935 {
936  session_t *new_s;
937  session_switch_pool_args_t *rpc_args;
938 
939  /*
940  * Clone half-open session to the right thread.
941  */
942  new_s = session_clone_safe (tc->s_index, old_thread_index);
943  new_s->connection_index = tc->c_index;
944  new_s->session_state = SESSION_STATE_READY;
945  new_s->flags |= SESSION_F_IS_MIGRATING;
946 
948 
949  /*
950  * Ask thread owning the old session to clean it up and make us the tx
951  * fifo owner
952  */
953  rpc_args = clib_mem_alloc (sizeof (*rpc_args));
954  rpc_args->new_session_index = new_s->session_index;
955  rpc_args->new_thread_index = new_s->thread_index;
956  rpc_args->session_index = tc->s_index;
957  rpc_args->thread_index = old_thread_index;
958  session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
959  rpc_args);
960 
961  tc->s_index = new_s->session_index;
962  new_s->connection_index = tc->c_index;
963  *new_session = new_s;
964  return 0;
965 }
966 
967 /**
968  * Notification from transport that connection is being closed.
969  *
970  * A disconnect is sent to application but state is not removed. Once
971  * disconnect is acknowledged by application, session disconnect is called.
972  * Ultimately this leads to close being called on transport (passive close).
973  */
974 void
976 {
977  app_worker_t *app_wrk;
978  session_t *s;
979 
980  s = session_get (tc->s_index, tc->thread_index);
981  if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
982  return;
983  s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
984  app_wrk = app_worker_get (s->app_wrk_index);
985  app_worker_close_notify (app_wrk, s);
986 }
987 
988 /**
989  * Notification from transport that connection is being deleted
990  *
991  * This removes the session if it is still valid. It should be called only on
992  * previously fully established sessions. For instance failed connects should
993  * call stream_session_connect_notify and indicate that the connect has
994  * failed.
995  */
996 void
998 {
999  session_t *s;
1000 
1001  /* App might've been removed already */
1002  if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1003  return;
1004 
1005  switch (s->session_state)
1006  {
1007  case SESSION_STATE_CREATED:
1008  /* Session was created but accept notification was not yet sent to the
1009  * app. Cleanup everything. */
1012  session_free (s);
1013  break;
1014  case SESSION_STATE_ACCEPTING:
1015  case SESSION_STATE_TRANSPORT_CLOSING:
1016  case SESSION_STATE_CLOSING:
1017  case SESSION_STATE_TRANSPORT_CLOSED:
1018  /* If transport finishes or times out before we get a reply
1019  * from the app, mark transport as closed and wait for reply
1020  * before removing the session. Cleanup session table in advance
1021  * because transport will soon be closed and closed sessions
1022  * are assumed to have been removed from the lookup table */
1024  s->session_state = SESSION_STATE_TRANSPORT_DELETED;
1027  break;
1028  case SESSION_STATE_APP_CLOSED:
1029  /* Cleanup lookup table as transport needs to still be valid.
1030  * Program transport close to ensure that all session events
1031  * have been cleaned up. Once transport close is called, the
1032  * session is just removed because both transport and app have
1033  * confirmed the close*/
1035  s->session_state = SESSION_STATE_TRANSPORT_DELETED;
1039  break;
1040  case SESSION_STATE_TRANSPORT_DELETED:
1041  break;
1042  case SESSION_STATE_CLOSED:
1044  session_delete (s);
1045  break;
1046  default:
1047  clib_warning ("session state %u", s->session_state);
1049  session_delete (s);
1050  break;
1051  }
1052 }
1053 
1054 /**
1055  * Notification from transport that it is closed
1056  *
1057  * Should be called by transport, prior to calling delete notify, once it
1058  * knows that no more data will be exchanged. This could serve as an
1059  * early acknowledgment of an active close especially if transport delete
1060  * can be delayed a long time, e.g., tcp time-wait.
1061  */
1062 void
1064 {
1065  app_worker_t *app_wrk;
1066  session_t *s;
1067 
1068  if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1069  return;
1070 
1071  /* Transport thinks that app requested close but it actually didn't.
1072  * Can happen for tcp if fin and rst are received in close succession. */
1073  if (s->session_state == SESSION_STATE_READY)
1074  {
1077  s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
1078  }
1079  /* If app close has not been received or has not yet resulted in
1080  * a transport close, only mark the session transport as closed */
1081  else if (s->session_state <= SESSION_STATE_CLOSING)
1082  {
1083  s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
1084  }
1085  /* If app also closed, switch to closed */
1086  else if (s->session_state == SESSION_STATE_APP_CLOSED)
1087  s->session_state = SESSION_STATE_CLOSED;
1088 
1089  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1090  if (app_wrk)
1092 }
1093 
1094 /**
1095  * Notify application that connection has been reset.
1096  */
1097 void
1099 {
1100  app_worker_t *app_wrk;
1101  session_t *s;
1102 
1103  s = session_get (tc->s_index, tc->thread_index);
1105  if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1106  return;
1107  s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1108  app_wrk = app_worker_get (s->app_wrk_index);
1109  app_worker_reset_notify (app_wrk, s);
1110 }
1111 
1112 int
1114 {
1115  app_worker_t *app_wrk;
1116  session_t *s;
1117 
1118  s = session_get (tc->s_index, tc->thread_index);
1119  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1120  if (!app_wrk)
1121  return -1;
1122  s->session_state = SESSION_STATE_ACCEPTING;
1123  if (app_worker_accept_notify (app_wrk, s))
1124  {
1125  /* On transport delete, no notifications should be sent. Unless, the
1126  * accept is retried and successful. */
1127  s->session_state = SESSION_STATE_CREATED;
1128  return -1;
1129  }
1130  return 0;
1131 }
1132 
1133 /**
1134  * Accept a stream session. Optionally ping the server by callback.
1135  */
1136 int
1138  u32 thread_index, u8 notify)
1139 {
1140  session_t *s;
1141  int rv;
1142 
1144  s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1145  s->session_state = SESSION_STATE_CREATED;
1146 
1147  if ((rv = app_worker_init_accepted (s)))
1148  {
1149  session_free (s);
1150  return rv;
1151  }
1152 
1154 
1155  /* Shoulder-tap the server */
1156  if (notify)
1157  {
1158  app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
1159  if ((rv = app_worker_accept_notify (app_wrk, s)))
1160  {
1163  session_free (s);
1164  return rv;
1165  }
1166  }
1167 
1168  return 0;
1169 }
1170 
1171 int
1173  u32 thread_index)
1174 {
1175  app_worker_t *app_wrk;
1176  session_t *s;
1177  int rv;
1178 
1180  s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1181 
1182  if ((rv = app_worker_init_accepted (s)))
1183  {
1184  session_free (s);
1185  return rv;
1186  }
1187 
1189 
1190  app_wrk = app_worker_get (s->app_wrk_index);
1191  if ((rv = app_worker_accept_notify (app_wrk, s)))
1192  {
1195  session_free (s);
1196  return rv;
1197  }
1198 
1199  s->session_state = SESSION_STATE_READY;
1200 
1201  return 0;
1202 }
1203 
1204 int
1205 session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1206 {
1209  app_worker_t *app_wrk;
1210  session_handle_t sh;
1211  session_t *s;
1212  int rv;
1213 
1215  rv = transport_connect (rmt->transport_proto, tep);
1216  if (rv < 0)
1217  {
1218  SESSION_DBG ("Transport failed to open connection.");
1219  return rv;
1220  }
1221 
1222  tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1223 
1224  /* For dgram type of service, allocate session and fifos now */
1225  app_wrk = app_worker_get (app_wrk_index);
1227  s->app_wrk_index = app_wrk->wrk_index;
1228  s->session_state = SESSION_STATE_OPENED;
1229  if (app_worker_init_connected (app_wrk, s))
1230  {
1231  session_free (s);
1232  return -1;
1233  }
1234 
1235  sh = session_handle (s);
1237  return app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, opaque);
1238 }
1239 
1240 int
1241 session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1242 {
1245  u64 handle, wrk_handle;
1246  int rv;
1247 
1249  rv = transport_connect (rmt->transport_proto, tep);
1250  if (rv < 0)
1251  {
1252  SESSION_DBG ("Transport failed to open connection.");
1253  return rv;
1254  }
1255 
1256  tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1257 
1258  /* If transport offers a stream service, only allocate session once the
1259  * connection has been established.
1260  * Add connection to half-open table and save app and tc index. The
1261  * latter is needed to help establish the connection while the former
1262  * is needed when the connect notify comes and we have to notify the
1263  * external app
1264  */
1265  handle = session_make_handle (tc->c_index, app_wrk_index);
1266  session_lookup_add_half_open (tc, handle);
1267 
1268  /* Store the half-open handle in the connection. Transport will use it
1269  * when cleaning up @ref session_half_open_delete_notify
1270  */
1271  tc->s_ho_handle = handle;
1272 
1273  /* Track the half-open connections in case we want to forcefully
1274  * clean them up @ref session_cleanup_half_open
1275  */
1276  wrk_handle = session_make_handle (tc->c_index, opaque);
1277  app_worker_add_half_open (app_worker_get (app_wrk_index),
1278  rmt->transport_proto, handle, wrk_handle);
1279 
1280  return 0;
1281 }
1282 
1283 int
1284 session_open_app (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1285 {
1288 
1289  sep->app_wrk_index = app_wrk_index;
1290  sep->opaque = opaque;
1291 
1292  return transport_connect (rmt->transport_proto, tep_cfg);
1293 }
1294 
1296 
1297 /* *INDENT-OFF* */
1302 };
1303 /* *INDENT-ON* */
1304 
1305 /**
1306  * Ask transport to open connection to remote transport endpoint.
1307  *
1308  * Stores handle for matching request with reply since the call can be
1309  * asynchronous. For instance, for TCP the 3-way handshake must complete
1310  * before reply comes. Session is only created once connection is established.
1311  *
1312  * @param app_index Index of the application requesting the connect
1313  * @param st Session type requested.
1314  * @param tep Remote transport endpoint
1315  * @param opaque Opaque data (typically, api_context) the application expects
1316  * on open completion.
1317  */
1318 int
1319 session_open (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1320 {
1322  tst = transport_protocol_service_type (rmt->transport_proto);
1323  return session_open_srv_fns[tst] (app_wrk_index, rmt, opaque);
1324 }
1325 
1326 /**
1327  * Ask transport to listen on session endpoint.
1328  *
1329  * @param s Session for which listen will be called. Note that unlike
1330  * established sessions, listen sessions are not associated to a
1331  * thread.
1332  * @param sep Local endpoint to be listened on.
1333  */
1334 int
1336 {
1337  transport_endpoint_t *tep;
1338  int tc_index;
1339  u32 s_index;
1340 
1341  /* Transport bind/listen */
1342  tep = session_endpoint_to_transport (sep);
1343  s_index = ls->session_index;
1345  s_index, tep);
1346 
1347  if (tc_index < 0)
1348  return tc_index;
1349 
1350  /* Attach transport to session. Lookup tables are populated by the app
1351  * worker because local tables (for ct sessions) are not backed by a fib */
1352  ls = listen_session_get (s_index);
1353  ls->connection_index = tc_index;
1354 
1355  return 0;
1356 }
1357 
1358 /**
1359  * Ask transport to stop listening on local transport endpoint.
1360  *
1361  * @param s Session to stop listening on. It must be in state LISTENING.
1362  */
1363 int
1365 {
1368 
1369  if (s->session_state != SESSION_STATE_LISTENING)
1370  return SESSION_E_NOLISTEN;
1371 
1373 
1374  /* If no transport, assume everything was cleaned up already */
1375  if (!tc)
1376  return SESSION_E_NONE;
1377 
1378  if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1380 
1382  return 0;
1383 }
1384 
1385 /**
1386  * Initialize session closing procedure.
1387  *
1388  * Request is always sent to session node to ensure that all outstanding
1389  * requests are served before transport is notified.
1390  */
1391 void
1393 {
1394  if (!s)
1395  return;
1396 
1397  if (s->session_state >= SESSION_STATE_CLOSING)
1398  {
1399  /* Session will only be removed once both app and transport
1400  * acknowledge the close */
1401  if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED
1402  || s->session_state == SESSION_STATE_TRANSPORT_DELETED)
1404  return;
1405  }
1406 
1407  s->session_state = SESSION_STATE_CLOSING;
1409 }
1410 
1411 /**
1412  * Force a close without waiting for data to be flushed
1413  */
1414 void
1416 {
1417  if (s->session_state >= SESSION_STATE_CLOSING)
1418  return;
1419  /* Drop all outstanding tx data */
1421  s->session_state = SESSION_STATE_CLOSING;
1423 }
1424 
1425 /**
1426  * Notify transport the session can be disconnected. This should eventually
1427  * result in a delete notification that allows us to cleanup session state.
1428  * Called for both active/passive disconnects.
1429  *
1430  * Must be called from the session's thread.
1431  */
1432 void
1434 {
1435  if (s->session_state >= SESSION_STATE_APP_CLOSED)
1436  {
1437  if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1438  s->session_state = SESSION_STATE_CLOSED;
1439  /* If transport is already deleted, just free the session */
1440  else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1442  return;
1443  }
1444 
1445  /* If the tx queue wasn't drained, the transport can continue to try
1446  * sending the outstanding data (in closed state it cannot). It MUST however
1447  * at one point, either after sending everything or after a timeout, call
1448  * delete notify. This will finally lead to the complete cleanup of the
1449  * session.
1450  */
1451  s->session_state = SESSION_STATE_APP_CLOSED;
1452 
1454  s->thread_index);
1455 }
1456 
1457 /**
1458  * Force transport close
1459  */
1460 void
1462 {
1463  if (s->session_state >= SESSION_STATE_APP_CLOSED)
1464  {
1465  if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1466  s->session_state = SESSION_STATE_CLOSED;
1467  else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1469  return;
1470  }
1471 
1472  s->session_state = SESSION_STATE_APP_CLOSED;
1474  s->thread_index);
1475 }
1476 
1477 /**
1478  * Cleanup transport and session state.
1479  *
1480  * Notify transport of the cleanup and free the session. This should
1481  * be called only if transport reported some error and is already
1482  * closed.
1483  */
1484 void
1486 {
1487  /* Delete from main lookup table before we axe the the transport */
1489  if (s->session_state != SESSION_STATE_TRANSPORT_DELETED)
1491  s->thread_index);
1492  /* Since we called cleanup, no delete notification will come. So, make
1493  * sure the session is properly freed. */
1495  session_free (s);
1496 }
1497 
1498 /**
1499  * Allocate event queues in the shared-memory segment
1500  *
1501  * That can only be a newly created memfd segment, that must be
1502  * mapped by all apps/stack users.
1503  */
1504 void
1506 {
1507  u32 evt_q_length = 2048, evt_size = sizeof (session_event_t);
1508  ssvm_private_t *eqs = &smm->evt_qs_segment;
1509  uword eqs_size = 64 << 20;
1510  pid_t vpp_pid = getpid ();
1511  void *oldheap;
1512  int i;
1513 
1515  evt_q_length = smm->configured_event_queue_length;
1516 
1517  if (smm->evt_qs_segment_size)
1518  eqs_size = smm->evt_qs_segment_size;
1519 
1520  eqs->ssvm_size = eqs_size;
1521  eqs->my_pid = vpp_pid;
1522  eqs->name = format (0, "%s%c", "session: evt-qs-segment", 0);
1523  /* clib_mem_vm_map_shared consumes first page before requested_va */
1525 
1527  {
1528  clib_warning ("failed to initialize queue segment");
1529  return;
1530  }
1531 
1532  oldheap = ssvm_push_heap (eqs->sh);
1533 
1534  for (i = 0; i < vec_len (smm->wrk); i++)
1535  {
1536  svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1538  {evt_q_length, evt_size, 0}
1539  ,
1540  {evt_q_length >> 1, 256, 0}
1541  };
1542  cfg->consumer_pid = 0;
1543  cfg->n_rings = 2;
1544  cfg->q_nitems = evt_q_length;
1545  cfg->ring_cfgs = rc;
1546  smm->wrk[i].vpp_event_queue = svm_msg_q_alloc (cfg);
1548  clib_warning ("eventfd returned");
1549  }
1550 
1551  ssvm_pop_heap (oldheap);
1552 }
1553 
1556 {
1557  return &session_main.evt_qs_segment;
1558 }
1559 
1560 u64
1562 {
1563  svm_fifo_t *f;
1564 
1565  if (!s->rx_fifo)
1566  return SESSION_INVALID_HANDLE;
1567 
1568  f = s->rx_fifo;
1569  return segment_manager_make_segment_handle (f->segment_manager,
1570  f->segment_index);
1571 }
1572 
1573 /* *INDENT-OFF* */
1578  session_tx_fifo_dequeue_and_snd
1579 };
1580 /* *INDENT-ON* */
1581 
1582 void
1584  const transport_proto_vft_t * vft, u8 is_ip4,
1585  u32 output_node)
1586 {
1587  session_main_t *smm = &session_main;
1588  session_type_t session_type;
1589  u32 next_index = ~0;
1590 
1591  session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1592 
1593  vec_validate (smm->session_type_to_next, session_type);
1594  vec_validate (smm->session_tx_fns, session_type);
1595 
1596  /* *INDENT-OFF* */
1597  if (output_node != ~0)
1598  {
1599  foreach_vlib_main (({
1600  next_index = vlib_node_add_next (this_vlib_main,
1601  session_queue_node.index,
1602  output_node);
1603  }));
1604  }
1605  /* *INDENT-ON* */
1606 
1607  smm->session_type_to_next[session_type] = next_index;
1608  smm->session_tx_fns[session_type] =
1609  session_tx_fns[vft->transport_options.tx_type];
1610 }
1611 
1614 {
1615  session_main_t *smm = &session_main;
1616  session_worker_t *wrk;
1617  u32 thread;
1618 
1619  smm->last_transport_proto_type += 1;
1620 
1621  for (thread = 0; thread < vec_len (smm->wrk); thread++)
1622  {
1623  wrk = session_main_get_worker (thread);
1625  }
1626 
1627  return smm->last_transport_proto_type;
1628 }
1629 
1632 {
1633  if (s->session_state != SESSION_STATE_LISTENING)
1636  else
1638  s->connection_index);
1639 }
1640 
1641 void
1643 {
1644  if (s->session_state != SESSION_STATE_LISTENING)
1646  s->connection_index, s->thread_index, tep,
1647  is_lcl);
1648  else
1650  s->connection_index, tep, is_lcl);
1651 }
1652 
1655 {
1657  s->connection_index);
1658 }
1659 
1660 void
1662 {
1663  ASSERT (vlib_get_thread_index () == 0);
1666 }
1667 
1668 static clib_error_t *
1670 {
1671  segment_manager_main_init_args_t _sm_args = { 0 }, *sm_args = &_sm_args;
1672  session_main_t *smm = &session_main;
1674  u32 num_threads, preallocated_sessions_per_worker;
1675  uword margin = 8 << 12;
1676  session_worker_t *wrk;
1677  int i;
1678 
1679  /* We only initialize once and do not de-initialized on disable */
1680  if (smm->is_initialized)
1681  goto done;
1682 
1683  num_threads = 1 /* main thread */ + vtm->n_threads;
1684 
1685  if (num_threads < 1)
1686  return clib_error_return (0, "n_thread_stacks not set");
1687 
1688  /* Allocate cache line aligned worker contexts */
1689  vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1690 
1691  for (i = 0; i < num_threads; i++)
1692  {
1693  wrk = &smm->wrk[i];
1694  wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list);
1695  wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
1696  wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
1697  wrk->vm = vlib_mains[i];
1698  wrk->last_vlib_time = vlib_time_now (vm);
1701 
1702  if (num_threads > 1)
1704  }
1705 
1706  /* Allocate vpp event queues segment and queue */
1708 
1709  /* Initialize fifo segment main baseva and timeout */
1710  sm_args->baseva = smm->session_baseva + smm->evt_qs_segment_size + margin;
1711  sm_args->size = smm->session_va_space_size;
1712  segment_manager_main_init (sm_args);
1713 
1714  /* Preallocate sessions */
1715  if (smm->preallocated_sessions)
1716  {
1717  if (num_threads == 1)
1718  {
1720  }
1721  else
1722  {
1723  int j;
1724  preallocated_sessions_per_worker =
1725  (1.1 * (f64) smm->preallocated_sessions /
1726  (f64) (num_threads - 1));
1727 
1728  for (j = 1; j < num_threads; j++)
1729  {
1730  pool_init_fixed (smm->wrk[j].sessions,
1731  preallocated_sessions_per_worker);
1732  }
1733  }
1734  }
1735 
1738  transport_init ();
1739  smm->is_initialized = 1;
1740 
1741 done:
1742 
1743  smm->is_enabled = 1;
1744 
1745  /* Enable transports */
1746  transport_enable_disable (vm, 1);
1747  session_debug_init ();
1748 
1749  return 0;
1750 }
1751 
1752 static void
1754 {
1755  transport_enable_disable (vm, 0 /* is_en */ );
1756 }
1757 
1758 void
1760 {
1761  u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
1763  u8 have_workers = vtm->n_threads != 0;
1764 
1765  /* *INDENT-OFF* */
1766  foreach_vlib_main (({
1767  if (have_workers && ii == 0)
1768  {
1769  if (is_en)
1770  {
1771  vlib_node_set_state (this_vlib_main,
1772  session_queue_process_node.index, state);
1773  vlib_node_t *n = vlib_get_node (this_vlib_main,
1775  vlib_start_process (this_vlib_main, n->runtime_index);
1776  }
1777  else
1778  {
1779  vlib_process_signal_event_mt (this_vlib_main,
1782  }
1783  if (!session_main.poll_main)
1784  continue;
1785  }
1786  vlib_node_set_state (this_vlib_main, session_queue_node.index,
1787  state);
1788  }));
1789  /* *INDENT-ON* */
1790 }
1791 
1792 clib_error_t *
1794 {
1795  clib_error_t *error = 0;
1796  if (is_en)
1797  {
1798  if (session_main.is_enabled)
1799  return 0;
1800 
1801  error = session_manager_main_enable (vm);
1803  }
1804  else
1805  {
1806  session_main.is_enabled = 0;
1809  }
1810 
1811  return error;
1812 }
1813 
1814 clib_error_t *
1816 {
1817  session_main_t *smm = &session_main;
1818 
1819  smm->is_enabled = 0;
1820  smm->session_enable_asap = 0;
1821  smm->poll_main = 0;
1823 
1824 #if (HIGH_SEGMENT_BASEVA > (4ULL << 30))
1825  smm->session_va_space_size = 128ULL << 30;
1826  smm->evt_qs_segment_size = 64 << 20;
1827 #else
1828  smm->session_va_space_size = 128 << 20;
1829  smm->evt_qs_segment_size = 1 << 20;
1830 #endif
1831 
1832  smm->last_transport_proto_type = TRANSPORT_PROTO_QUIC;
1833 
1834  return 0;
1835 }
1836 
1837 static clib_error_t *
1839 {
1840  session_main_t *smm = &session_main;
1841  if (smm->session_enable_asap)
1842  {
1844  vnet_session_enable_disable (vm, 1 /* is_en */ );
1846  }
1847  return 0;
1848 }
1849 
1852 
1853 static clib_error_t *
1855 {
1856  session_main_t *smm = &session_main;
1857  u32 nitems;
1858  uword tmp;
1859 
1860  while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1861  {
1862  if (unformat (input, "event-queue-length %d", &nitems))
1863  {
1864  if (nitems >= 2048)
1865  smm->configured_event_queue_length = nitems;
1866  else
1867  clib_warning ("event queue length %d too small, ignored", nitems);
1868  }
1869  else if (unformat (input, "preallocated-sessions %d",
1870  &smm->preallocated_sessions))
1871  ;
1872  else if (unformat (input, "v4-session-table-buckets %d",
1874  ;
1875  else if (unformat (input, "v4-halfopen-table-buckets %d",
1877  ;
1878  else if (unformat (input, "v6-session-table-buckets %d",
1880  ;
1881  else if (unformat (input, "v6-halfopen-table-buckets %d",
1883  ;
1884  else if (unformat (input, "v4-session-table-memory %U",
1885  unformat_memory_size, &tmp))
1886  {
1887  if (tmp >= 0x100000000)
1888  return clib_error_return (0, "memory size %llx (%lld) too large",
1889  tmp, tmp);
1891  }
1892  else if (unformat (input, "v4-halfopen-table-memory %U",
1893  unformat_memory_size, &tmp))
1894  {
1895  if (tmp >= 0x100000000)
1896  return clib_error_return (0, "memory size %llx (%lld) too large",
1897  tmp, tmp);
1899  }
1900  else if (unformat (input, "v6-session-table-memory %U",
1901  unformat_memory_size, &tmp))
1902  {
1903  if (tmp >= 0x100000000)
1904  return clib_error_return (0, "memory size %llx (%lld) too large",
1905  tmp, tmp);
1907  }
1908  else if (unformat (input, "v6-halfopen-table-memory %U",
1909  unformat_memory_size, &tmp))
1910  {
1911  if (tmp >= 0x100000000)
1912  return clib_error_return (0, "memory size %llx (%lld) too large",
1913  tmp, tmp);
1915  }
1916  else if (unformat (input, "local-endpoints-table-memory %U",
1917  unformat_memory_size, &tmp))
1918  {
1919  if (tmp >= 0x100000000)
1920  return clib_error_return (0, "memory size %llx (%lld) too large",
1921  tmp, tmp);
1922  smm->local_endpoints_table_memory = tmp;
1923  }
1924  else if (unformat (input, "local-endpoints-table-buckets %d",
1926  ;
1927  /* Deprecated but maintained for compatibility */
1928  else if (unformat (input, "evt_qs_memfd_seg"))
1929  ;
1930  else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
1931  &smm->evt_qs_segment_size))
1932  ;
1933  else if (unformat (input, "enable"))
1934  smm->session_enable_asap = 1;
1935  else if (unformat (input, "segment-baseva 0x%lx", &smm->session_baseva))
1936  ;
1937  else if (unformat (input, "use-app-socket-api"))
1938  appns_sapi_enable ();
1939  else if (unformat (input, "poll-main"))
1940  smm->poll_main = 1;
1941  else
1942  return clib_error_return (0, "unknown input `%U'",
1943  format_unformat_error, input);
1944  }
1945  return 0;
1946 }
1947 
1949 
1950 /*
1951  * fd.io coding-style-patch-verification: ON
1952  *
1953  * Local Variables:
1954  * eval: (c-set-style "gnu")
1955  * End:
1956  */
#define vec_validate(V, I)
Make sure vector is long enough for given index (no header, unspecified alignment) ...
Definition: vec.h:509
u32 flags
buffer flags: VLIB_BUFFER_FREE_LIST_INDEX_MASK: bits used to store free list index, VLIB_BUFFER_IS_TRACED: trace this buffer.
Definition: buffer.h:124
#define session_endpoint_to_transport_cfg(_sep)
Definition: session_types.h:96
u32 preallocated_sessions
Preallocate session config parameter.
Definition: session.h:200
int app_worker_lock_and_send_event(app_worker_t *app, session_t *s, u8 evt_type)
Send event to application.
void transport_close(transport_proto_t tp, u32 conn_index, u8 thread_index)
Definition: transport.c:321
static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES]
Definition: session.c:1298
u32 connection_index
Index of the transport connection associated to the session.
uword evt_qs_segment_size
Definition: session.h:183
int app_worker_init_accepted(session_t *s)
void session_transport_reset(session_t *s)
Force transport close.
Definition: session.c:1461
int session_lookup_del_connection(transport_connection_t *tc)
Delete transport connection from session table.
int app_worker_reset_notify(app_worker_t *app_wrk, session_t *s)
void * svm_msg_q_msg_data(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Get data for message in queue.
static u8 svm_msg_q_ring_is_full(svm_msg_q_t *mq, u32 ring_index)
int session_open(u32 app_wrk_index, session_endpoint_t *rmt, u32 opaque)
Ask transport to open connection to remote transport endpoint.
Definition: session.c:1319
session_type_t session_type
Type built from transport and network protocol types.
uword requested_va
Definition: ssvm.h:85
svm_msg_q_t * vpp_event_queue
vpp event message queue for worker
Definition: session.h:80
void transport_get_listener_endpoint(transport_proto_t tp, u32 conn_index, transport_endpoint_t *tep, u8 is_lcl)
Definition: transport.c:389
static u32 svm_fifo_max_enqueue_prod(svm_fifo_t *f)
Maximum number of bytes that can be enqueued into fifo.
Definition: svm_fifo.h:550
static void clib_rwlock_writer_lock(clib_rwlock_t *p)
Definition: lock.h:192
uword ssvm_size
Definition: ssvm.h:84
transport_proto
Definition: session.api:22
svm_fifo_t * tx_fifo
int app_worker_connect_notify(app_worker_t *app_wrk, session_t *s, session_error_t err, u32 opaque)
session_main_t session_main
Definition: session.c:25
int session_tx_fifo_peek_bytes(transport_connection_t *tc, u8 *buffer, u32 offset, u32 max_bytes)
Definition: session.c:580
#define VLIB_MAIN_LOOP_ENTER_FUNCTION(x)
Definition: init.h:176
void session_queue_run_on_main_thread(vlib_main_t *vm)
Definition: session.c:1661
#define PREDICT_TRUE(x)
Definition: clib.h:122
u32 session_index
Index in thread pool where session was allocated.
int session_stop_listen(session_t *s)
Ask transport to stop listening on local transport endpoint.
Definition: session.c:1364
static void session_switch_pool_reply(void *arg)
Definition: session.c:854
int app_worker_del_half_open(app_worker_t *app_wrk, transport_proto_t tp, session_handle_t ho_handle)
unsigned long u64
Definition: types.h:89
transport_connection_t * listen_session_get_transport(session_t *s)
Definition: session.c:1654
static svm_msg_q_t * session_main_get_vpp_event_queue(u32 thread_index)
Definition: session.h:645
transport_proto_t session_add_transport_proto(void)
Definition: session.c:1613
u32 configured_v4_halfopen_table_buckets
Definition: session.h:188
u8 session_enable_asap
Enable session manager at startup.
Definition: session.h:172
clib_memset(h->entries, 0, sizeof(h->entries[0]) *entries)
void session_transport_delete_notify(transport_connection_t *tc)
Notification from transport that connection is being deleted.
Definition: session.c:997
transport_connection_t * session_get_transport(session_t *s)
Definition: session.c:1631
static f64 vlib_time_now(vlib_main_t *vm)
Definition: main.h:334
svm_fifo_t * rx_fifo
Pointers to rx/tx buffers.
session_worker_t * wrk
Worker contexts.
Definition: session.h:143
static session_t * session_get_if_valid(u64 si, u32 thread_index)
Definition: session.h:314
void session_add_self_custom_tx_evt(transport_connection_t *tc, u8 has_prio)
Definition: session.c:122
u32 ** session_to_enqueue
Per-proto vector of sessions to enqueue.
Definition: session.h:92
#define pool_get_aligned_will_expand(P, YESNO, A)
See if pool_get will expand the pool or not.
Definition: pool.h:257
int session_enqueue_dgram_connection(session_t *s, session_dgram_hdr_t *hdr, vlib_buffer_t *b, u8 proto, u8 queue_event)
Definition: session.c:515
u16 current_length
Nbytes between current data and the end of this buffer.
Definition: buffer.h:113
static int session_enqueue_notify_inline(session_t *s)
Notify session peer that new data has been enqueued.
Definition: session.c:635
int session_main_flush_enqueue_events(u8 transport_proto, u32 thread_index)
Flushes queue of sessions that are to be notified of new data enqueued events.
Definition: session.c:742
session_evt_type_t
#define vec_add1(V, E)
Add 1 element to end of vector (unspecified alignment).
Definition: vec.h:592
static transport_proto_t session_get_transport_proto(session_t *s)
void session_send_rpc_evt_to_thread(u32 thread_index, void *fp, void *rpc_args)
Definition: session.c:110
int svm_fifo_peek(svm_fifo_t *f, u32 offset, u32 len, u8 *dst)
Peek data from fifo.
Definition: svm_fifo.c:1127
void session_transport_reset_notify(transport_connection_t *tc)
Notify application that connection has been reset.
Definition: session.c:1098
#define vec_add2(V, P, N)
Add N elements to end of vector V, return pointer to new elements in P.
Definition: vec.h:630
int session_open_vc(u32 app_wrk_index, session_endpoint_t *rmt, u32 opaque)
Definition: session.c:1241
void svm_fifo_dequeue_drop_all(svm_fifo_t *f)
Drop all data from fifo.
Definition: svm_fifo.c:1189
vlib_main_t * vm
Definition: in2out_ed.c:1580
ssvm_shared_header_t * sh
Definition: ssvm.h:83
u32 transport_start_listen(transport_proto_t tp, u32 session_index, transport_endpoint_t *tep)
Definition: transport.c:336
static session_t * session_get(u32 si, u32 thread_index)
Definition: session.h:307
session_evt_elt_t * event_elts
Pool of session event list elements.
Definition: session.h:101
#define vec_validate_aligned(V, I, A)
Make sure vector is long enough for given index (no header, specified alignment)
Definition: vec.h:520
u32 flags
Session flags.
int session_enqueue_stream_connection(transport_connection_t *tc, vlib_buffer_t *b, u32 offset, u8 queue_event, u8 is_in_order)
Definition: session.c:460
u64 session_lookup_half_open_handle(transport_connection_t *tc)
static session_t * session_clone_safe(u32 session_index, u32 thread_index)
Definition: session.h:404
u32 local_endpoints_table_memory
Transport table (preallocation) size parameters.
Definition: session.h:196
static u32 session_handle_data(session_handle_t ho_handle)
void session_node_enable_disable(u8 is_en)
Definition: session.c:1759
vlib_node_registration_t session_queue_node
(constructor) VLIB_REGISTER_NODE (session_queue_node)
static_always_inline uword clib_mem_get_page_size(void)
Definition: mem.h:468
vlib_main_t ** vlib_mains
Definition: buffer.c:332
static uword vlib_node_add_next(vlib_main_t *vm, uword node, uword next_node)
Definition: node_funcs.h:1173
unsigned char u8
Definition: types.h:56
app_worker_t * application_get_worker(application_t *app, u32 wrk_map_index)
Definition: application.c:652
session_fifo_rx_fn session_tx_fifo_peek_and_snd
clib_error_t * session_main_init(vlib_main_t *vm)
Definition: session.c:1815
u8 data[128]
Definition: ipsec_types.api:90
session_t * sessions
Worker session pool.
Definition: session.h:77
#define vec_reset_length(v)
Reset vector length to zero NULL-pointer tolerant.
double f64
Definition: types.h:142
static session_fifo_rx_fn * session_tx_fns[TRANSPORT_TX_N_FNS]
Definition: session.c:1574
#define vlib_worker_thread_barrier_sync(X)
Definition: threads.h:205
static session_handle_t session_handle(session_t *s)
void session_get_endpoint(session_t *s, transport_endpoint_t *tep, u8 is_lcl)
Definition: session.c:1642
u8 session_type_t
void segment_manager_attach_fifo(segment_manager_t *sm, svm_fifo_t *f, session_t *s)
void session_transport_closing_notify(transport_connection_t *tc)
Notification from transport that connection is being closed.
Definition: session.c:975
u64 segment_manager_make_segment_handle(u32 segment_manager_index, u32 segment_index)
enum session_ft_action_ session_ft_action_t
int session_open_cl(u32 app_wrk_index, session_endpoint_t *rmt, u32 opaque)
Definition: session.c:1205
#define clib_llist_make_head(LP, name)
Initialize llist head.
Definition: llist.h:106
int ssvm_server_init(ssvm_private_t *ssvm, ssvm_segment_type_t type)
Definition: ssvm.c:433
static session_worker_t * session_main_get_worker(u32 thread_index)
Definition: session.h:631
void session_free_w_fifos(session_t *s)
Definition: session.c:261
#define session_endpoint_to_transport(_sep)
Definition: session_types.h:95
void app_namespaces_init(void)
vlib_main_t * vm
Convenience pointer to this worker&#39;s vlib_main.
Definition: session.h:89
static clib_error_t * session_config_fn(vlib_main_t *vm, unformat_input_t *input)
Definition: session.c:1854
svm_msg_q_t * svm_msg_q_alloc(svm_msg_q_cfg_t *cfg)
Allocate message queue.
Definition: message_queue.c:41
int session_send_ctrl_evt_to_thread(session_t *s, session_evt_type_t evt_type)
Definition: session.c:93
void session_fifo_tuning(session_t *s, svm_fifo_t *f, session_ft_action_t act, u32 len)
Definition: session.c:428
#define VLIB_INIT_FUNCTION(x)
Definition: init.h:173
void segment_manager_dealloc_fifos(svm_fifo_t *rx_fifo, svm_fifo_t *tx_fifo)
int svm_fifo_enqueue_segments(svm_fifo_t *f, const svm_fifo_seg_t segs[], u32 n_segs, u8 allow_partial)
Enqueue array of svm_fifo_seg_t in order.
Definition: svm_fifo.c:959
static void session_manager_main_disable(vlib_main_t *vm)
Definition: session.c:1753
clib_llist_index_t new_head
Head of list of elements.
Definition: session.h:110
description fragment has unexpected format
Definition: map.api:433
u32 * session_type_to_next
Per session type output nodes.
Definition: session.h:157
void session_cleanup_half_open(transport_proto_t tp, session_handle_t ho_handle)
Definition: session.c:286
static int session_enqueue_chain_tail(session_t *s, vlib_buffer_t *b, u32 offset, u8 is_in_order)
Enqueue buffer chain tail.
Definition: session.c:355
static void * ssvm_push_heap(ssvm_shared_header_t *sh)
Definition: ssvm.h:143
#define clib_error_return(e, args...)
Definition: error.h:99
uword session_baseva
Session ssvm segment configs.
Definition: session.h:181
const cJSON *const b
Definition: cJSON.h:255
pthread_t thread[MAX_CONNS]
Definition: main.c:142
void session_half_open_delete_notify(transport_proto_t tp, session_handle_t ho_handle)
Definition: session.c:292
unsigned int u32
Definition: types.h:88
int session_send_io_evt_to_thread(svm_fifo_t *f, session_evt_type_t evt_type)
Definition: session.c:79
u8 session_is_valid(u32 si, u8 thread_index)
Definition: session.c:224
static void ssvm_pop_heap(void *oldheap)
Definition: ssvm.h:151
void transport_reset(transport_proto_t tp, u32 conn_index, u8 thread_index)
Definition: transport.c:327
#define HALF_OPEN_LOOKUP_INVALID_VALUE
#define SESSION_INVALID_HANDLE
Definition: session_types.h:23
int session_lookup_del_half_open(transport_connection_t *tc)
static u32 vlib_get_buffer_index(vlib_main_t *vm, void *p)
Translate buffer pointer into buffer index.
Definition: buffer_funcs.h:293
static clib_error_t * session_manager_main_enable(vlib_main_t *vm)
Definition: session.c:1669
struct _transport_proto_vft transport_proto_vft_t
u8 is_initialized
Session manager initialized (not necessarily enabled)
Definition: session.h:169
int session_dequeue_notify(session_t *s)
Definition: session.c:712
struct _session_endpoint_cfg session_endpoint_cfg_t
u32 configured_v6_halfopen_table_memory
Definition: session.h:193
Definition: cJSON.c:84
u32 configured_v6_session_table_buckets
Definition: session.h:190
static session_type_t session_type_from_proto_and_ip(transport_proto_t proto, u8 is_ip4)
#define pool_elt_at_index(p, i)
Returns pointer to element at given index.
Definition: pool.h:546
static void clib_rwlock_init(clib_rwlock_t *p)
Definition: lock.h:152
#define CLIB_US_TIME_FREQ
Definition: time.h:207
int svm_msg_q_alloc_consumer_eventfd(svm_msg_q_t *mq)
Allocate event fd for queue consumer.
session_event_t evt
Definition: session.h:64
transport_service_type_t transport_protocol_service_type(transport_proto_t tp)
Definition: transport.c:290
uword session_va_space_size
Definition: session.h:182
void session_send_rpc_evt_to_thread_force(u32 thread_index, void *fp, void *rpc_args)
Definition: session.c:102
u32 configured_v4_session_table_buckets
Session table size parameters.
Definition: session.h:186
void transport_get_endpoint(transport_proto_t tp, u32 conn_index, u32 thread_index, transport_endpoint_t *tep, u8 is_lcl)
Definition: transport.c:373
int app_worker_accept_notify(app_worker_t *app_wrk, session_t *s)
vl_api_ip_proto_t proto
Definition: acl_types.api:51
u32 app_index
Index of application that owns the listener.
struct _unformat_input_t unformat_input_t
u32 configured_v6_halfopen_table_buckets
Definition: session.h:192
u32 configured_event_queue_length
vpp fifo event queue configured length
Definition: session.h:178
u8 is_enabled
Session manager is enabled.
Definition: session.h:166
static void * vlib_buffer_get_current(vlib_buffer_t *b)
Get pointer to current data to process.
Definition: buffer.h:233
#define pool_put(P, E)
Free an object E in pool P.
Definition: pool.h:301
#define APP_INVALID_INDEX
Definition: application.h:178
int svm_fifo_enqueue(svm_fifo_t *f, u32 len, const u8 *src)
Enqueue data to fifo.
Definition: svm_fifo.c:840
segment_manager_t * app_worker_get_connect_segment_manager(app_worker_t *)
static clib_error_t * session_main_loop_init(vlib_main_t *vm)
Definition: session.c:1838
enum transport_service_type_ transport_service_type_t
#define PREDICT_FALSE(x)
Definition: clib.h:121
#define VLIB_CONFIG_FUNCTION(x, n,...)
Definition: init.h:182
#define always_inline
Definition: ipsec.h:28
int session_lookup_del_session(session_t *s)
u32 wrk_index
Worker index in global worker pool.
Definition: application.h:37
app_worker_t * app_worker_get_if_valid(u32 wrk_index)
int app_worker_session_fifo_tuning(app_worker_t *app_wrk, session_t *s, svm_fifo_t *f, session_ft_action_t act, u32 len)
static u32 svm_fifo_max_dequeue_prod(svm_fifo_t *f)
Fifo max bytes to dequeue optimized for producer.
Definition: svm_fifo.h:444
session_fifo_rx_fn ** session_tx_fns
Per transport rx function that can either dequeue or peek.
Definition: session.h:152
u64 session_segment_handle(session_t *s)
Definition: session.c:1561
#define foreach_vlib_main(body)
Definition: threads.h:242
static session_evt_elt_t * session_evt_alloc_old(session_worker_t *wrk)
Definition: session.h:290
void transport_cleanup(transport_proto_t tp, u32 conn_index, u8 thread_index)
Definition: transport.c:302
clib_error_t * vnet_session_enable_disable(vlib_main_t *vm, u8 is_en)
Definition: session.c:1793
u32 transport_stop_listen(transport_proto_t tp, u32 conn_index)
Definition: transport.c:343
struct _session_switch_pool_args session_switch_pool_args_t
u8 len
Definition: ip_types.api:103
clib_rwlock_t peekers_rw_locks
Peekers rw lock.
Definition: session.h:116
#define pool_get_aligned(P, E, A)
Allocate an object E from a pool P with alignment A.
Definition: pool.h:245
#define SESSION_DBG(_fmt, _args...)
static void clib_rwlock_writer_unlock(clib_rwlock_t *p)
Definition: lock.h:206
static u8 svm_fifo_set_event(svm_fifo_t *f)
Set fifo event flag.
Definition: svm_fifo.h:727
u32 n_rings
number of msg rings
Definition: message_queue.h:54
void transport_init(void)
Definition: transport.c:813
int app_worker_transport_closed_notify(app_worker_t *app_wrk, session_t *s)
session_fifo_rx_fn session_tx_fifo_dequeue_and_snd
ssvm_private_t evt_qs_segment
Event queues memfd segment.
Definition: session.h:146
transport_proto_t last_transport_proto_type
Definition: session.h:159
static u8 svm_fifo_needs_deq_ntf(svm_fifo_t *f, u32 n_last_deq)
Check if fifo needs dequeue notification.
Definition: svm_fifo.h:818
#define UNFORMAT_END_OF_INPUT
Definition: format.h:144
u32 runtime_index
Definition: node.h:283
session_handle_t listener_handle
Parent listener session index if the result of an accept.
static_always_inline uword vlib_get_thread_index(void)
Definition: threads.h:219
int transport_connect(transport_proto_t tp, transport_endpoint_cfg_t *tep)
Definition: transport.c:315
static void svm_msg_q_unlock(svm_msg_q_t *mq)
Unlock message queue.
void segment_manager_detach_fifo(segment_manager_t *sm, svm_fifo_t *f)
static transport_connection_t * transport_get_connection(transport_proto_t tp, u32 conn_index, u8 thread_index)
Definition: transport.h:150
sll srl srl sll sra u16x4 i
Definition: vector_sse42.h:317
#define vec_free(V)
Free vector&#39;s memory (no header).
Definition: vec.h:380
static void vlib_process_signal_event_mt(vlib_main_t *vm, uword node_index, uword type_opaque, uword data)
Signal event to process from any thread.
Definition: node_funcs.h:1039
static void session_cleanup_notify(session_t *s, session_cleanup_ntf_t ntf)
Definition: session.c:250
void session_free(session_t *s)
Definition: session.c:210
char * buffer
Definition: cJSON.h:163
int app_worker_cleanup_notify(app_worker_t *app_wrk, session_t *s, session_cleanup_ntf_t ntf)
#define clib_warning(format, args...)
Definition: error.h:59
static int session_send_evt_to_thread(void *data, void *args, u32 thread_index, session_evt_type_t evt_type)
Definition: session.c:28
int session_dgram_connect_notify(transport_connection_t *tc, u32 old_thread_index, session_t **new_session)
Move dgram session to the right thread.
Definition: session.c:933
Don&#39;t register connection in lookup.
struct _transport_connection transport_connection_t
#define HIGH_SEGMENT_BASEVA
Definition: svm_common.h:87
int app_worker_init_connected(app_worker_t *app_wrk, session_t *s)
static session_evt_elt_t * session_evt_alloc_new(session_worker_t *wrk)
Definition: session.h:280
u32 my_pid
Definition: ssvm.h:86
void transport_enable_disable(vlib_main_t *vm, u8 is_en)
Definition: transport.c:802
static u8 vlib_thread_is_main_w_barrier(void)
Definition: threads.h:530
#define pool_init_fixed(pool, max_elts)
initialize a fixed-size, preallocated pool
Definition: pool.h:85
application_t * application_get(u32 app_index)
Definition: application.c:426
static u32 session_thread_from_handle(session_handle_t handle)
int session_listen(session_t *ls, session_endpoint_cfg_t *sep)
Ask transport to listen on session endpoint.
Definition: session.c:1335
static u32 session_index_from_handle(session_handle_t handle)
#define uword_to_pointer(u, type)
Definition: types.h:136
void session_vpp_event_queues_allocate(session_main_t *smm)
Allocate event queues in the shared-memory segment.
Definition: session.c:1505
#define ASSERT(truth)
static int session_notify_subscribers(u32 app_index, session_t *s, svm_fifo_t *f, session_evt_type_t evt_type)
Definition: session.c:603
void svm_msg_q_add_and_unlock(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Producer enqueue one message to queue with mutex held.
int app_worker_add_half_open(app_worker_t *app_wrk, transport_proto_t tp, session_handle_t ho_handle, session_handle_t wrk_handle)
void sesssion_reschedule_tx(transport_connection_t *tc)
Definition: session.c:150
int session_main_flush_all_enqueue_events(u8 transport_proto)
Definition: session.c:774
#define CLIB_ASSERT_ENABLE
void segment_manager_main_init(segment_manager_main_init_args_t *a)
static void clib_mem_free(void *p)
Definition: mem.h:311
enum _transport_proto transport_proto_t
static void svm_fifo_clear_deq_ntf(svm_fifo_t *f)
Clear the want notification flag and set has notification.
Definition: svm_fifo.h:785
int svm_fifo_enqueue_with_offset(svm_fifo_t *f, u32 offset, u32 len, u8 *src)
Enqueue a future segment.
Definition: svm_fifo.c:898
static void vlib_buffer_advance(vlib_buffer_t *b, word l)
Advance current data pointer by the supplied (signed!) amount.
Definition: buffer.h:252
static void vlib_node_set_state(vlib_main_t *vm, u32 node_index, vlib_node_state_t new_state)
Set node dispatch state.
Definition: node_funcs.h:174
static session_handle_t session_make_handle(u32 session_index, u32 data)
clib_time_type_t last_vlib_time
vlib_time_now last time around the track
Definition: session.h:83
u32 configured_v4_halfopen_table_memory
Definition: session.h:189
int app_worker_migrate_notify(app_worker_t *app_wrk, session_t *s, session_handle_t new_sh)
u32 configured_v4_session_table_memory
Definition: session.h:187
int session_stream_accept_notify(transport_connection_t *tc)
Definition: session.c:1113
static void * clib_mem_alloc(uword size)
Definition: mem.h:253
void session_enqueue_notify_thread(session_handle_t sh)
Like session_enqueue_notify, but can be called from a thread that does not own the session...
Definition: session.c:697
static uword pointer_to_uword(const void *p)
Definition: types.h:131
svm_msg_q_ring_cfg_t * ring_cfgs
array of ring cfgs
Definition: message_queue.h:55
static vlib_main_t * vlib_get_main(void)
Definition: global_funcs.h:23
u8 * name
Definition: ssvm.h:87
int() session_fifo_rx_fn(session_worker_t *wrk, vlib_node_runtime_t *node, session_evt_elt_t *e, int *n_tx_packets)
Definition: session.h:130
u8 thread_index
Index of the thread that allocated the session.
session_t * session_alloc(u32 thread_index)
Definition: session.c:184
void session_transport_cleanup(session_t *s)
Cleanup transport and session state.
Definition: session.c:1485
static void session_enqueue_discard_chain_bytes(vlib_main_t *vm, vlib_buffer_t *b, vlib_buffer_t **chain_b, u32 n_bytes_to_drop)
Discards bytes from buffer chain.
Definition: session.c:324
template key/value backing page structure
Definition: bihash_doc.h:44
static void session_switch_pool(void *cb_args)
Notify old thread of the session pool switch.
Definition: session.c:890
static transport_connection_t * transport_get_half_open(transport_proto_t tp, u32 conn_index)
Definition: transport.h:163
static u32 session_handle_index(session_handle_t ho_handle)
u64 app_worker_lookup_half_open(app_worker_t *app_wrk, transport_proto_t tp, session_handle_t ho_handle)
u32 local_endpoints_table_buckets
Definition: session.h:197
app_worker_t * app_worker_get(u32 wrk_index)
u32 q_nitems
msg queue size (not rings)
Definition: message_queue.h:53
u64 session_handle_t
void appns_sapi_enable(void)
void session_lookup_init(void)
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
session_fifo_rx_fn session_tx_fifo_dequeue_internal
clib_llist_index_t old_head
Head of list of pending events.
Definition: session.h:113
u32 next_buffer
Next buffer for this linked-list of buffers.
Definition: buffer.h:140
volatile u8 session_state
State in session layer state machine.
int session_enqueue_notify(session_t *s)
Definition: session.c:674
void session_transport_closed_notify(transport_connection_t *tc)
Notification from transport that it is closed.
Definition: session.c:1063
void session_debug_init(void)
void session_close(session_t *s)
Initialize session closing procedure.
Definition: session.c:1392
VLIB buffer representation.
Definition: buffer.h:102
u64 uword
Definition: types.h:112
int session_stream_connect_notify(transport_connection_t *tc, session_error_t err)
Definition: session.c:784
u8 poll_main
Poll session node in main thread.
Definition: session.h:175
void session_reset(session_t *s)
Force a close without waiting for data to be flushed.
Definition: session.c:1415
ssvm_private_t * session_main_get_evt_q_segment(void)
Definition: session.c:1555
struct _segment_manager segment_manager_t
segment_manager_t * segment_manager_get(u32 index)
session_cleanup_ntf_t
int session_send_io_evt_to_thread_custom(void *data, u32 thread_index, session_evt_type_t evt_type)
Definition: session.c:86
unformat_function_t unformat_memory_size
Definition: format.h:295
static session_evt_elt_t * session_evt_alloc_ctrl(session_worker_t *wrk)
Definition: session.h:256
u32 app_index
Index of owning app.
Definition: application.h:43
static u8 svm_fifo_n_subscribers(svm_fifo_t *f)
Fifo number of subscribers getter.
Definition: svm_fifo.h:644
int session_lookup_add_connection(transport_connection_t *tc, u64 value)
Add transport connection to a session table.
u8 * format_unformat_error(u8 *s, va_list *va)
Definition: unformat.c:91
Connection descheduled by the session layer.
void vlib_worker_thread_barrier_release(vlib_main_t *vm)
Definition: threads.c:1561
u32 configured_v6_session_table_memory
Definition: session.h:191
clib_us_time_t last_vlib_us_time
vlib_time_now rounded to us precision and as u64
Definition: session.h:86
static vlib_thread_main_t * vlib_get_thread_main()
Definition: global_funcs.h:32
vlib_node_registration_t session_queue_process_node
(constructor) VLIB_REGISTER_NODE (session_queue_process_node)
vl_api_dhcp_client_state_t state
Definition: dhcp.api:201
static u32 vlib_num_workers()
Definition: threads.h:377
static vlib_node_t * vlib_get_node(vlib_main_t *vm, u32 i)
Get vlib node by index.
Definition: node_funcs.h:85
int(* session_open_service_fn)(u32, session_endpoint_t *, u32)
Definition: session.c:1295
enum session_error_ session_error_t
save_rewrite_length must be aligned so that reass doesn t overwrite it
Definition: buffer.h:401
u32 app_wrk_index
Index of the app worker that owns the session.
void transport_cleanup_half_open(transport_proto_t tp, u32 conn_index)
Definition: transport.c:308
u32 session_tx_fifo_dequeue_drop(transport_connection_t *tc, u32 max_bytes)
Definition: session.c:588
static void session_delete(session_t *s)
Cleans up session and lookup table.
Definition: session.c:274
u8 si
Definition: lisp_types.api:47
void session_transport_close(session_t *s)
Notify transport the session can be disconnected.
Definition: session.c:1433
clib_llist_index_t ctrl_head
Head of control events list.
Definition: session.h:107
int session_dgram_accept(transport_connection_t *tc, u32 listener_index, u32 thread_index)
Definition: session.c:1172
int app_worker_close_notify(app_worker_t *app_wrk, session_t *s)
struct _session_endpoint session_endpoint_t
int session_stream_accept(transport_connection_t *tc, u32 listener_index, u32 thread_index, u8 notify)
Accept a stream session.
Definition: session.c:1137
int consumer_pid
pid of msg consumer
Definition: message_queue.h:52
#define CLIB_CACHE_LINE_BYTES
Definition: cache.h:59
static transport_connection_t * transport_get_listener(transport_proto_t tp, u32 conn_index)
Definition: transport.h:157
u32 total_length_not_including_first_buffer
Only valid for first buffer in chain.
Definition: buffer.h:167
int svm_fifo_dequeue_drop(svm_fifo_t *f, u32 len)
Dequeue and drop bytes from fifo.
Definition: svm_fifo.c:1151
static int svm_msg_q_lock(svm_msg_q_t *mq)
Lock, or block trying, the message queue.
static void session_enqueue_notify_rpc(void *arg)
Definition: session.c:680
int session_open_app(u32 app_wrk_index, session_endpoint_t *rmt, u32 opaque)
Definition: session.c:1284
void vlib_start_process(vlib_main_t *vm, uword process_index)
Definition: main.c:1609
void session_register_transport(transport_proto_t transport_proto, const transport_proto_vft_t *vft, u8 is_ip4, u32 output_node)
Initialize session layer for given transport proto and ip version.
Definition: session.c:1583
int session_lookup_add_half_open(transport_connection_t *tc, u64 value)
struct _svm_fifo svm_fifo_t
u8 transport_protocol_is_cl(transport_proto_t tp)
Definition: transport.c:349
static u8 svm_msg_q_is_full(svm_msg_q_t *mq)
Check if message queue is full.
static vlib_buffer_t * vlib_get_buffer(vlib_main_t *vm, u32 buffer_index)
Translate buffer index into buffer pointer.
Definition: buffer_funcs.h:85
svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring(svm_msg_q_t *mq, u32 ring_index)
Allocate message buffer on ring.
#define SESSION_EVT(_evt, _args...)
session_t * session_alloc_for_connection(transport_connection_t *tc)
Definition: session.c:300
static void session_program_transport_ctrl_evt(session_t *s, session_evt_type_t evt)
Definition: session.c:163
static session_t * listen_session_get(u32 ls_index)
Definition: session.h:606
uword unformat(unformat_input_t *i, const char *fmt,...)
Definition: unformat.c:978
static uword unformat_check_input(unformat_input_t *i)
Definition: format.h:170
static u8 transport_connection_is_descheduled(transport_connection_t *tc)
Definition: transport.h:203