FD.io VPP  v17.04.2-2-ga8f93f8
Vector Packet Processing
node.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <vlib/vlib.h>
17 #include <vnet/vnet.h>
18 #include <vnet/pg/pg.h>
19 #include <vnet/ip/ip.h>
20 
21 #include <vnet/tcp/tcp.h>
22 
23 #include <vppinfra/hash.h>
24 #include <vppinfra/error.h>
25 #include <vppinfra/elog.h>
27 
28 #include <vnet/udp/udp_packet.h>
29 #include <math.h>
31 
33 
34 typedef struct
35 {
39 
40 /* packet trace format function */
41 static u8 *
42 format_session_queue_trace (u8 * s, va_list * args)
43 {
44  CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
45  CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
46  session_queue_trace_t *t = va_arg (*args, session_queue_trace_t *);
47 
48  s = format (s, "SESSION_QUEUE: session index %d, server thread index %d",
50  return s;
51 }
52 
54 
55 #define foreach_session_queue_error \
56 _(TX, "Packets transmitted") \
57 _(TIMER, "Timer events")
58 
59 typedef enum
60 {
61 #define _(sym,str) SESSION_QUEUE_ERROR_##sym,
63 #undef _
66 
67 static char *session_queue_error_strings[] = {
68 #define _(sym,string) string,
70 #undef _
71 };
72 
78 };
79 
80 always_inline int
83  session_fifo_event_t * e0,
84  stream_session_t * s0, u32 thread_index,
85  int *n_tx_packets, u8 peek_data)
86 {
87  u32 n_trace = vlib_get_trace_count (vm, node);
88  u32 left_to_snd0, max_len_to_snd0, len_to_deq0, n_bufs, snd_space0;
89  u32 n_frame_bytes, n_frames_per_evt;
91  transport_proto_vft_t *transport_vft;
92  u32 next_index, next0, *to_next, n_left_to_next, bi0;
93  vlib_buffer_t *b0;
94  u32 rx_offset;
95  u16 snd_mss0;
96  u8 *data0;
97  int i;
98 
99  next_index = next0 = session_type_to_next[s0->session_type];
100 
101  transport_vft = session_get_transport_vft (s0->session_type);
102  tc0 = transport_vft->get_connection (s0->connection_index, thread_index);
103 
104  /* Make sure we have space to send and there's something to dequeue */
105  snd_space0 = transport_vft->send_space (tc0);
106  snd_mss0 = transport_vft->send_mss (tc0);
107 
108  /* Can't make any progress */
109  if (snd_space0 == 0 || svm_fifo_max_dequeue (s0->server_tx_fifo) == 0
110  || snd_mss0 == 0)
111  {
112  vec_add1 (smm->evts_partially_read[thread_index], *e0);
113  return 0;
114  }
115 
116  ASSERT (e0->enqueue_length > 0);
117 
118  /* Ensure we're not writing more than transport window allows */
119  max_len_to_snd0 = clib_min (e0->enqueue_length, snd_space0);
120 
121  if (peek_data)
122  {
123  /* Offset in rx fifo from where to peek data */
124  rx_offset = transport_vft->tx_fifo_offset (tc0);
125  }
126 
127  /* TODO check if transport is willing to send len_to_snd0
128  * bytes (Nagle) */
129 
130  n_frame_bytes = snd_mss0 * VLIB_FRAME_SIZE;
131  n_frames_per_evt = ceil ((double) max_len_to_snd0 / n_frame_bytes);
132 
133  n_bufs = vec_len (smm->tx_buffers[thread_index]);
134  left_to_snd0 = max_len_to_snd0;
135  for (i = 0; i < n_frames_per_evt; i++)
136  {
137  /* Make sure we have at least one full frame of buffers ready */
138  if (PREDICT_FALSE (n_bufs < VLIB_FRAME_SIZE))
139  {
140  vec_validate (smm->tx_buffers[thread_index],
141  n_bufs + VLIB_FRAME_SIZE - 1);
142  n_bufs +=
143  vlib_buffer_alloc (vm, &smm->tx_buffers[thread_index][n_bufs],
144  VLIB_FRAME_SIZE);
145 
146  /* buffer shortage
147  * XXX 0.9 because when debugging we might not get a full frame */
148  if (PREDICT_FALSE (n_bufs < 0.9 * VLIB_FRAME_SIZE))
149  {
150  /* Keep track of how much we've dequeued and exit */
151  if (left_to_snd0 != max_len_to_snd0)
152  {
153  e0->enqueue_length -= max_len_to_snd0 - left_to_snd0;
154  vec_add1 (smm->evts_partially_read[thread_index], *e0);
155  }
156 
157  return -1;
158  }
159 
160  _vec_len (smm->tx_buffers[thread_index]) = n_bufs;
161  }
162 
163  vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
164  while (left_to_snd0 && n_left_to_next)
165  {
166  /* Get free buffer */
167  n_bufs--;
168  bi0 = smm->tx_buffers[thread_index][n_bufs];
169  _vec_len (smm->tx_buffers[thread_index]) = n_bufs;
170 
171  b0 = vlib_get_buffer (vm, bi0);
172  b0->error = 0;
175  b0->current_data = 0;
176 
177  /* RX on the local interface. tx in default fib */
178  vnet_buffer (b0)->sw_if_index[VLIB_RX] = 0;
179  vnet_buffer (b0)->sw_if_index[VLIB_TX] = (u32) ~ 0;
180 
181  /* usual speculation, or the enqueue_x1 macro will barf */
182  to_next[0] = bi0;
183  to_next += 1;
184  n_left_to_next -= 1;
185 
187  if (PREDICT_FALSE (n_trace > 0))
188  {
190  vlib_trace_buffer (vm, node, next_index, b0,
191  1 /* follow_chain */ );
192  vlib_set_trace_count (vm, node, --n_trace);
193  t0 = vlib_add_trace (vm, node, b0, sizeof (*t0));
194  t0->session_index = s0->session_index;
195  t0->server_thread_index = s0->thread_index;
196  }
197 
198  len_to_deq0 = (left_to_snd0 < snd_mss0) ? left_to_snd0 : snd_mss0;
199 
200  /* *INDENT-OFF* */
201  SESSION_EVT_DBG(s0, SESSION_EVT_DEQ, ({
202  ed->data[0] = e0->event_id;
203  ed->data[1] = e0->enqueue_length;
204  ed->data[2] = len_to_deq0;
205  ed->data[3] = left_to_snd0;
206  }));
207  /* *INDENT-ON* */
208 
209  /* Make room for headers */
211 
212  /* Dequeue the data
213  * TODO 1) peek instead of dequeue
214  * 2) buffer chains */
215  if (peek_data)
216  {
217  int n_bytes_read;
218  n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, s0->pid,
219  rx_offset, len_to_deq0, data0);
220  if (n_bytes_read < 0)
221  goto dequeue_fail;
222 
223  /* Keep track of progress locally, transport is also supposed to
224  * increment it independently when pushing header */
225  rx_offset += n_bytes_read;
226  }
227  else
228  {
229  if (svm_fifo_dequeue_nowait (s0->server_tx_fifo, s0->pid,
230  len_to_deq0, data0) < 0)
231  goto dequeue_fail;
232  }
233 
234  b0->current_length = len_to_deq0;
235 
236  /* Ask transport to push header */
237  transport_vft->push_header (tc0, b0);
238 
239  left_to_snd0 -= len_to_deq0;
240  *n_tx_packets = *n_tx_packets + 1;
241 
242  vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
243  to_next, n_left_to_next,
244  bi0, next0);
245  }
246  vlib_put_next_frame (vm, node, next_index, n_left_to_next);
247  }
248 
249  /* If we couldn't dequeue all bytes store progress */
250  if (max_len_to_snd0 < e0->enqueue_length)
251  {
252  e0->enqueue_length -= max_len_to_snd0;
253  vec_add1 (smm->evts_partially_read[thread_index], *e0);
254  }
255  return 0;
256 
257 dequeue_fail:
258  /* Can't read from fifo. Store event rx progress, save as partially read,
259  * return buff to free list and return */
260  e0->enqueue_length -= max_len_to_snd0 - left_to_snd0;
261  vec_add1 (smm->evts_partially_read[thread_index], *e0);
262 
263  to_next -= 1;
264  n_left_to_next += 1;
265  _vec_len (smm->tx_buffers[thread_index]) += 1;
266 
267  clib_warning ("dequeue fail");
268  return 0;
269 }
270 
271 int
274  session_fifo_event_t * e0,
275  stream_session_t * s0, u32 thread_index,
276  int *n_tx_pkts)
277 {
278  return session_tx_fifo_read_and_snd_i (vm, node, smm, e0, s0, thread_index,
279  n_tx_pkts, 1);
280 }
281 
282 int
285  session_fifo_event_t * e0,
286  stream_session_t * s0, u32 thread_index,
287  int *n_tx_pkts)
288 {
289  return session_tx_fifo_read_and_snd_i (vm, node, smm, e0, s0, thread_index,
290  n_tx_pkts, 0);
291 }
292 
293 static uword
295  vlib_frame_t * frame)
296 {
298  session_fifo_event_t *my_fifo_events, *e;
299  u32 n_to_dequeue, n_events;
301  int n_tx_packets = 0;
302  u32 my_thread_index = vm->cpu_index;
303  int i, rv;
304 
305  /*
306  * Update TCP time
307  */
308  tcp_update_time (vlib_time_now (vm), my_thread_index);
309 
310  /*
311  * Get vpp queue events
312  */
313  q = smm->vpp_event_queues[my_thread_index];
314  if (PREDICT_FALSE (q == 0))
315  return 0;
316 
317  /* min number of events we can dequeue without blocking */
318  n_to_dequeue = q->cursize;
319  my_fifo_events = smm->fifo_events[my_thread_index];
320 
321  if (n_to_dequeue == 0 && vec_len (my_fifo_events) == 0)
322  return 0;
323 
324  /*
325  * If we didn't manage to process previous events try going
326  * over them again without dequeuing new ones.
327  */
328  /* XXX: Block senders to sessions that can't keep up */
329  if (vec_len (my_fifo_events) >= 100)
330  goto skip_dequeue;
331 
332  /* See you in the next life, don't be late */
333  if (pthread_mutex_trylock (&q->mutex))
334  return 0;
335 
336  for (i = 0; i < n_to_dequeue; i++)
337  {
338  vec_add2 (my_fifo_events, e, 1);
340  }
341 
342  /* The other side of the connection is not polling */
343  if (q->cursize < (q->maxsize / 8))
344  (void) pthread_cond_broadcast (&q->condvar);
345  pthread_mutex_unlock (&q->mutex);
346 
347  smm->fifo_events[my_thread_index] = my_fifo_events;
348 
349 skip_dequeue:
350  n_events = vec_len (my_fifo_events);
351  for (i = 0; i < n_events; i++)
352  {
353  svm_fifo_t *f0; /* $$$ prefetch 1 ahead maybe */
354  stream_session_t *s0;
355  u32 server_session_index0, server_thread_index0;
356  session_fifo_event_t *e0;
357 
358  e0 = &my_fifo_events[i];
359  f0 = e0->fifo;
360  server_session_index0 = f0->server_session_index;
361  server_thread_index0 = f0->server_thread_index;
362 
363  /* $$$ add multiple event queues, per vpp worker thread */
364  ASSERT (server_thread_index0 == my_thread_index);
365 
366  s0 = stream_session_get_if_valid (server_session_index0,
367  my_thread_index);
368 
369  if (CLIB_DEBUG && !s0)
370  {
371  clib_warning ("It's dead, Jim!");
372  continue;
373  }
374 
375  if (PREDICT_FALSE (s0->session_state == SESSION_STATE_CLOSED))
376  continue;
377 
378  ASSERT (s0->thread_index == my_thread_index);
379 
380  switch (e0->event_type)
381  {
383  /* Spray packets in per session type frames, since they go to
384  * different nodes */
385  rv = (smm->session_tx_fns[s0->session_type]) (vm, node, smm, e0, s0,
386  my_thread_index,
387  &n_tx_packets);
388  if (rv < 0)
389  goto done;
390 
391  break;
392 
393  default:
394  clib_warning ("unhandled event type %d", e0->event_type);
395  }
396  }
397 
398 done:
399 
400  /* Couldn't process all events. Probably out of buffers */
401  if (PREDICT_FALSE (i < n_events))
402  {
403  session_fifo_event_t *partially_read =
404  smm->evts_partially_read[my_thread_index];
405  vec_add (partially_read, &my_fifo_events[i], n_events - i);
406  vec_free (my_fifo_events);
407  smm->fifo_events[my_thread_index] = partially_read;
408  smm->evts_partially_read[my_thread_index] = 0;
409  }
410  else
411  {
412  vec_free (smm->fifo_events[my_thread_index]);
413  smm->fifo_events[my_thread_index] =
414  smm->evts_partially_read[my_thread_index];
415  smm->evts_partially_read[my_thread_index] = 0;
416  }
417 
419  SESSION_QUEUE_ERROR_TX, n_tx_packets);
420 
421  return n_tx_packets;
422 }
423 
424 /* *INDENT-OFF* */
426 {
427  .function = session_queue_node_fn,
428  .name = "session-queue",
429  .format_trace = format_session_queue_trace,
430  .type = VLIB_NODE_TYPE_INPUT,
432  .error_strings = session_queue_error_strings,
433  .n_next_nodes = SESSION_QUEUE_N_NEXT,
434  .state = VLIB_NODE_STATE_DISABLED,
435  .next_nodes =
436  {
437  [SESSION_QUEUE_NEXT_DROP] = "error-drop",
438  [SESSION_QUEUE_NEXT_IP4_LOOKUP] = "ip4-lookup",
439  [SESSION_QUEUE_NEXT_IP6_LOOKUP] = "ip6-lookup",
440  [SESSION_QUEUE_NEXT_TCP_IP4_OUTPUT] = "tcp4-output",
441  [SESSION_QUEUE_NEXT_TCP_IP6_OUTPUT] = "tcp6-output",
442  },
443 };
444 /* *INDENT-ON* */
445 
446 /*
447  * fd.io coding-style-patch-verification: ON
448  *
449  * Local Variables:
450  * eval: (c-set-style "gnu")
451  * End:
452  */
#define vec_validate(V, I)
Make sure vector is long enough for given index (no header, unspecified alignment) ...
Definition: vec.h:436
sll srl srl sll sra u16x4 i
Definition: vector_sse2.h:343
#define clib_min(x, y)
Definition: clib.h:332
#define CLIB_UNUSED(x)
Definition: clib.h:79
static u32 vlib_get_trace_count(vlib_main_t *vm, vlib_node_runtime_t *rt)
Definition: trace_funcs.h:143
struct _transport_connection transport_connection_t
static f64 vlib_time_now(vlib_main_t *vm)
Definition: main.h:185
static u32 session_type_to_next[]
Definition: node.c:73
void vlib_put_next_frame(vlib_main_t *vm, vlib_node_runtime_t *r, u32 next_index, u32 n_vectors_left)
Release pointer to next frame vector data.
Definition: main.c:459
#define vec_add1(V, E)
Add 1 element to end of vector (unspecified alignment).
Definition: vec.h:522
struct _vlib_node_registration vlib_node_registration_t
#define vec_add2(V, P, N)
Add N elements to end of vector V, return pointer to new elements in P.
Definition: vec.h:561
u8 * format(u8 *s, const char *fmt,...)
Definition: format.c:418
session_queue_error_t
Definition: node.c:59
int session_tx_fifo_dequeue_and_snd(vlib_main_t *vm, vlib_node_runtime_t *node, session_manager_main_t *smm, session_fifo_event_t *e0, stream_session_t *s0, u32 thread_index, int *n_tx_pkts)
Definition: node.c:283
struct _stream_session_t stream_session_t
#define vec_add(V, E, N)
Add N elements to end of vector V (no header, unspecified alignment)
Definition: vec.h:599
static void vlib_trace_buffer(vlib_main_t *vm, vlib_node_runtime_t *r, u32 next_index, vlib_buffer_t *b, int follow_chain)
Definition: trace_funcs.h:104
i16 current_data
signed offset in data[], pre_data[] that we are currently processing.
Definition: buffer.h:67
static char * session_queue_error_strings[]
Definition: node.c:67
#define always_inline
Definition: clib.h:84
static u32 svm_fifo_max_dequeue(svm_fifo_t *f)
Definition: svm_fifo.h:98
#define VLIB_BUFFER_TOTAL_LENGTH_VALID
Definition: buffer.h:89
u32 cpu_index
Definition: main.h:159
int session_tx_fifo_peek_and_snd(vlib_main_t *vm, vlib_node_runtime_t *node, session_manager_main_t *smm, session_fifo_event_t *e0, stream_session_t *s0, u32 thread_index, int *n_tx_pkts)
Definition: node.c:272
struct _transport_proto_vft transport_proto_vft_t
static session_manager_main_t * vnet_get_session_manager_main()
Definition: session.h:230
u16 current_length
Nbytes between current data and the end of this buffer.
Definition: buffer.h:71
static void * vlib_buffer_make_headroom(vlib_buffer_t *b, u8 size)
Make head room, typically for packet headers.
Definition: buffer.h:288
#define MAX_HDRS_LEN
Definition: session.h:28
#define PREDICT_FALSE(x)
Definition: clib.h:97
int unix_shared_memory_queue_sub_raw(unix_shared_memory_queue_t *q, u8 *elem)
#define VLIB_FRAME_SIZE
Definition: node.h:328
struct _session_manager_main session_manager_main_t
Definition: session.h:157
#define vlib_validate_buffer_enqueue_x1(vm, node, next_index, to_next, n_left_to_next, bi0, next0)
Finish enqueueing one buffer forward in the graph.
Definition: buffer_node.h:216
#define vlib_get_next_frame(vm, node, next_index, vectors, n_vectors_left)
Get pointer to next frame vector data by (vlib_node_runtime_t, next_index).
Definition: node_funcs.h:350
vlib_error_t error
Error code for buffers to be enqueued to error handler.
Definition: buffer.h:113
static void vlib_node_increment_counter(vlib_main_t *vm, u32 node_index, u32 counter_index, u64 increment)
Definition: node_funcs.h:1115
#define foreach_session_queue_error
Definition: node.c:55
#define VNET_BUFFER_LOCALLY_ORIGINATED
Definition: buffer.h:68
vlib_main_t * vm
Definition: buffer.c:276
void tcp_update_time(f64 now, u32 thread_index)
Definition: tcp_input.c:2265
#define vec_free(V)
Free vector&#39;s memory (no header).
Definition: vec.h:340
#define clib_warning(format, args...)
Definition: error.h:59
#define ARRAY_LEN(x)
Definition: clib.h:59
static stream_session_t * stream_session_get_if_valid(u64 si, u32 thread_index)
Definition: session.h:309
#define ASSERT(truth)
unsigned int u32
Definition: types.h:88
u32 server_session_index
Definition: svm_fifo.h:55
u8 server_thread_index
Definition: svm_fifo.h:57
vlib_node_registration_t session_queue_node
(constructor) VLIB_REGISTER_NODE (session_queue_node)
Definition: node.c:32
u64 uword
Definition: types.h:112
static void * vlib_add_trace(vlib_main_t *vm, vlib_node_runtime_t *r, vlib_buffer_t *b, u32 n_data_bytes)
Definition: trace_funcs.h:55
transport_proto_vft_t * session_get_transport_vft(u8 type)
Definition: session.c:1236
Definition: defs.h:47
unsigned short u16
Definition: types.h:57
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
unsigned char u8
Definition: types.h:56
int svm_fifo_peek(svm_fifo_t *f, int pid, u32 offset, u32 max_bytes, u8 *copy_here)
Definition: svm_fifo.c:491
#define VLIB_BUFFER_TRACE_TRAJECTORY_INIT(b)
Definition: buffer.h:484
#define vnet_buffer(b)
Definition: buffer.h:294
#define VLIB_REGISTER_NODE(x,...)
Definition: node.h:143
u32 server_thread_index
Definition: node.c:37
static int session_tx_fifo_read_and_snd_i(vlib_main_t *vm, vlib_node_runtime_t *node, session_manager_main_t *smm, session_fifo_event_t *e0, stream_session_t *s0, u32 thread_index, int *n_tx_packets, u8 peek_data)
Definition: node.c:81
int svm_fifo_dequeue_nowait(svm_fifo_t *f, int pid, u32 max_bytes, u8 *copy_here)
Definition: svm_fifo.c:484
static u8 * format_session_queue_trace(u8 *s, va_list *args)
Definition: node.c:42
static void vlib_set_trace_count(vlib_main_t *vm, vlib_node_runtime_t *rt, u32 count)
Definition: trace_funcs.h:159
u32 flags
buffer flags: VLIB_BUFFER_IS_TRACED: trace this buffer.
Definition: buffer.h:74
static u32 vlib_buffer_alloc(vlib_main_t *vm, u32 *buffers, u32 n_buffers)
Allocate buffers into supplied array.
Definition: buffer_funcs.h:245
#define SESSION_EVT_DBG(_s, _evt, _body)
Definition: session_debug.h:76
static uword session_queue_node_fn(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
Definition: node.c:294
static vlib_buffer_t * vlib_get_buffer(vlib_main_t *vm, u32 buffer_index)
Translate buffer index into buffer pointer.
Definition: buffer_funcs.h:57
Definition: defs.h:46
struct _unix_shared_memory_queue unix_shared_memory_queue_t