FD.io VPP  v16.06
Vector Packet Processing
mc.c
Go to the documentation of this file.
1 /*
2  * mc.c: vlib reliable sequenced multicast distributed applications
3  *
4  * Copyright (c) 2010 Cisco and/or its affiliates.
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at:
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #include <vlib/vlib.h>
19 
20 /*
21  * 1 to enable msg id training wheels, which are useful for tracking
22  * down catchup and/or partitioned network problems
23  */
24 #define MSG_ID_DEBUG 0
25 
27 
28 static u32 elog_id_for_peer_id (mc_main_t * m, u64 peer_id)
29 {
30  uword * p, r;
31  mhash_t * h = &m->elog_id_by_peer_id;
32 
33  if (! m->elog_id_by_peer_id.hash)
34  mhash_init (h, sizeof (uword), sizeof (mc_peer_id_t));
35 
36  p = mhash_get (h, &peer_id);
37  if (p)
38  return p[0];
39  r = elog_string (m->elog_main, "%U",
40  m->transport.format_peer_id, peer_id);
41  mhash_set (h, &peer_id, r, /* old_value */ 0);
42  return r;
43 }
44 
45 static u32 elog_id_for_msg_name (mc_main_t * m, char *msg_name)
46 {
47  uword * p, r;
48  uword * h = m->elog_id_by_msg_name;
49  u8 *name_copy;
50 
51  if (! h)
52  h = m->elog_id_by_msg_name
53  = hash_create_string (0, sizeof (uword));
54 
55  p = hash_get_mem (h, msg_name);
56  if (p)
57  return p[0];
58  r = elog_string (m->elog_main, "%s", msg_name);
59 
60  name_copy = format (0, "%s%c", msg_name, 0);
61 
62  hash_set_mem (h, name_copy, r);
63  m->elog_id_by_msg_name = h;
64 
65  return r;
66 }
67 
68 static void elog_tx_msg (mc_main_t * m, u32 stream_id, u32 local_sequence, u32 retry_count)
69 {
70  if (MC_EVENT_LOGGING > 0)
71  {
72  ELOG_TYPE_DECLARE (e) = {
73  .format = "tx-msg: stream %d local seq %d attempt %d",
74  .format_args = "i4i4i4",
75  };
76  struct { u32 stream_id, local_sequence, retry_count; } * ed;
77  ed = ELOG_DATA (m->elog_main, e);
78  ed->stream_id = stream_id;
79  ed->local_sequence = local_sequence;
80  ed->retry_count = retry_count;
81  }
82 }
83 
84 /*
85  * seq_cmp
86  * correctly compare two unsigned sequence numbers.
87  * This function works so long as x and y are within 2**(n-1) of each
88  * other, where n = bits(x, y).
89  *
90  * Magic decoder ring:
91  * seq_cmp == 0 => x and y are equal
92  * seq_cmp < 0 => x is "in the past" with respect to y
93  * seq_cmp > 0 => x is "in the future" with respect to y
94  */
96 { return (i32) x - (i32) y;}
97 
98 void * mc_get_vlib_buffer (vlib_main_t * vm, u32 n_bytes, u32 * bi_return)
99 {
100  u32 n_alloc, bi;
101  vlib_buffer_t * b;
102 
103  n_alloc = vlib_buffer_alloc (vm, &bi, 1);
104  ASSERT (n_alloc == 1);
105 
106  b = vlib_get_buffer (vm, bi);
107  b->current_length = n_bytes;
108  *bi_return = bi;
109  return (void *) b->data;
110 }
111 
112 static void
114  uword index,
115  int notify_application)
116 {
117  mc_stream_peer_t * p = pool_elt_at_index (s->peers, index);
118  ASSERT (p != 0);
119  if (s->config.peer_died && notify_application)
120  s->config.peer_died (mcm, s, p->id);
121 
122  s->all_peer_bitmap = clib_bitmap_andnoti (s->all_peer_bitmap, p - s->peers);
123 
124  if (MC_EVENT_LOGGING > 0)
125  {
126  ELOG_TYPE_DECLARE (e) = {
127  .format = "delete peer %s from all_peer_bitmap",
128  .format_args = "T4",
129  };
130  struct { u32 peer; } * ed = 0;
131 
132  ed = ELOG_DATA (mcm->elog_main, e);
133  ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
134  }
135  /* Do not delete the pool / hash table entries, or we lose sequence number state */
136 }
137 
138 static mc_stream_peer_t *
140  mc_stream_t * s, mc_peer_id_t id,
141  int * created)
142 {
143  uword * q = mhash_get (&s->peer_index_by_id, &id);
144  mc_stream_peer_t * p;
145 
146  if (q)
147  {
148  p = pool_elt_at_index (s->peers, q[0]);
149  goto done;
150  }
151 
152  pool_get (s->peers, p);
153  memset (p, 0, sizeof (p[0]));
154  p->id = id;
155  p->last_sequence_received = ~0;
156  mhash_set (&s->peer_index_by_id, &id, p - s->peers, /* old_value */ 0);
157  if (created)
158  *created = 1;
159 
160  done:
161  if (MC_EVENT_LOGGING > 0)
162  {
163  ELOG_TYPE_DECLARE (e) = {
164  .format = "get_or_create %s peer %s stream %d seq %d",
165  .format_args = "t4T4i4i4",
166  .n_enum_strings = 2,
167  .enum_strings = { "old", "new", },
168  };
169  struct { u32 is_new, peer, stream_index, rx_sequence; } * ed = 0;
170 
171  ed = ELOG_DATA (mcm->elog_main, e);
172  ed->is_new = q ? 0 : 1;
173  ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
174  ed->stream_index = s->index;
175  ed->rx_sequence = p->last_sequence_received;
176  }
177  /* $$$$ Enable or reenable this peer */
178  s->all_peer_bitmap = clib_bitmap_ori (s->all_peer_bitmap, p - s->peers);
179  return p;
180 }
181 
183 {
185 
186  if (pool_elts (stream->retry_pool) >= stream->config.window_size)
187  return;
188 
191 
192  if (stream->procs_waiting_for_open_window)
193  _vec_len (stream->procs_waiting_for_open_window) = 0;
194 }
195 
196 static void mc_retry_free (mc_main_t * mcm, mc_stream_t *s, mc_retry_t * r)
197 {
198  mc_retry_t record, *retp;
199 
200  if (r->unacked_by_peer_bitmap)
201  _vec_len (r->unacked_by_peer_bitmap) = 0;
202 
203  if (clib_fifo_elts (s->retired_fifo) >= 2 * s->config.window_size)
204  {
205  clib_fifo_sub1 (s->retired_fifo, record);
207  }
208 
209  clib_fifo_add2 (s->retired_fifo, retp);
210 
211  retp->buffer_index = r->buffer_index;
212  retp->local_sequence = r->local_sequence;
213 
214  r->buffer_index = ~0; /* poison buffer index in this retry */
215 }
216 
217 static void mc_resend_retired (mc_main_t *mcm, mc_stream_t *s, u32 local_sequence)
218 {
219  mc_retry_t *retry;
220 
221  if (MC_EVENT_LOGGING > 0)
222  {
223  ELOG_TYPE_DECLARE (e) = {
224  .format = "resend-retired: search for local seq %d",
225  .format_args = "i4",
226  };
227  struct { u32 local_sequence; } * ed;
228  ed = ELOG_DATA (mcm->elog_main, e);
229  ed->local_sequence = local_sequence;
230  }
231 
233  (retry, s->retired_fifo,
234  ({
235  if (retry->local_sequence == local_sequence)
236  {
237  elog_tx_msg (mcm, s->index, retry->local_sequence, -13);
238 
239  mcm->transport.tx_buffer
240  (mcm->transport.opaque,
241  MC_TRANSPORT_USER_REQUEST_TO_RELAY,
242  retry->buffer_index);
243  return;
244  }
245  }));
246 
247  if (MC_EVENT_LOGGING > 0)
248  {
249  ELOG_TYPE_DECLARE (e) = {
250  .format = "resend-retired: FAILED search for local seq %d",
251  .format_args = "i4",
252  };
253  struct { u32 local_sequence; } * ed;
254  ed = ELOG_DATA (mcm->elog_main, e);
255  ed->local_sequence = local_sequence;
256  }
257 }
258 
259 static uword *
261  mc_stream_t * stream,
262  mc_retry_t * r,
263  uword * dead_peer_bitmap)
264 {
265  mc_stream_peer_t * p;
266 
267  pool_foreach (p, stream->peers, ({
268  uword pi = p - stream->peers;
269  uword is_alive = 0 == clib_bitmap_get (r->unacked_by_peer_bitmap, pi);
270 
271  if (! is_alive)
272  dead_peer_bitmap = clib_bitmap_ori (dead_peer_bitmap, pi);
273 
274  if (MC_EVENT_LOGGING > 0)
275  {
276  ELOG_TYPE_DECLARE (e) = {
277  .format = "delete_retry_fifo_elt: peer %s is %s",
278  .format_args = "T4t4",
279  .n_enum_strings = 2,
280  .enum_strings = { "alive", "dead", },
281  };
282  struct { u32 peer, is_alive; } * ed;
283  ed = ELOG_DATA (mcm->elog_main, e);
284  ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
285  ed->is_alive = is_alive;
286  }
287  }));
288 
289  hash_unset (stream->retry_index_by_local_sequence, r->local_sequence);
290  mc_retry_free (mcm, stream, r);
291 
292  return dead_peer_bitmap;
293 }
294 
297 {
298  return (r->prev_index != ~0
300  : 0);
301 }
302 
305 {
306  return (r->next_index != ~0
308  : 0);
309 }
310 
311 always_inline void
313 {
314  mc_retry_t * p = prev_retry (s, r);
315  mc_retry_t * n = next_retry (s, r);
316 
317  if (p)
318  p->next_index = r->next_index;
319  else
321  if (n)
322  n->prev_index = r->prev_index;
323  else
325 
326  pool_put_index (s->retry_pool, r - s->retry_pool);
327 }
328 
329 static void check_retry (mc_main_t * mcm, mc_stream_t * s)
330 {
331  mc_retry_t * r;
332  vlib_main_t * vm = mcm->vlib_main;
333  f64 now = vlib_time_now(vm);
334  uword * dead_peer_bitmap = 0;
335  u32 ri, ri_next;
336 
337  for (ri = s->retry_head_index; ri != ~0; ri = ri_next)
338  {
339  r = pool_elt_at_index (s->retry_pool, ri);
340  ri_next = r->next_index;
341 
342  if (now < r->sent_at + s->config.retry_interval)
343  continue;
344 
345  r->n_retries += 1;
346  if (r->n_retries > s->config.retry_limit)
347  {
348  dead_peer_bitmap =
349  delete_retry_fifo_elt (mcm, s, r, dead_peer_bitmap);
350  remove_retry_from_pool (s, r);
351  }
352  else
353  {
354  if (MC_EVENT_LOGGING > 0)
355  {
356  mc_stream_peer_t * p;
357  ELOG_TYPE_DECLARE (t) = {
358  .format = "resend local seq %d attempt %d",
359  .format_args = "i4i4",
360  };
361 
362  pool_foreach (p, s->peers, ({
363  if (clib_bitmap_get (r->unacked_by_peer_bitmap, p - s->peers))
364  {
365  ELOG_TYPE_DECLARE (ev) = {
366  .format = "resend: needed by peer %s local seq %d",
367  .format_args = "T4i4",
368  };
369  struct { u32 peer, rx_sequence; } * ed;
370  ed = ELOG_DATA (mcm->elog_main, ev);
371  ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
372  ed->rx_sequence = r->local_sequence;
373  }
374  }));
375 
376  struct { u32 sequence; u32 trail; } * ed;
377  ed = ELOG_DATA (mcm->elog_main, t);
378  ed->sequence = r->local_sequence;
379  ed->trail = r->n_retries;
380  }
381 
382  r->sent_at = vlib_time_now (vm);
383  s->stats.n_retries += 1;
384 
385  elog_tx_msg (mcm, s->index, r->local_sequence, r->n_retries);
386 
387  mcm->transport.tx_buffer
388  (mcm->transport.opaque,
390  r->buffer_index);
391  }
392  }
393 
394  maybe_send_window_open_event (mcm->vlib_main, s);
395 
396  /* Delete any dead peers we've found. */
397  if (! clib_bitmap_is_zero (dead_peer_bitmap))
398  {
399  uword i;
400 
401  clib_bitmap_foreach (i, dead_peer_bitmap, ({
402  delete_peer_with_index (mcm, s, i, /* notify_application */ 1);
403 
404  /* Delete any references to just deleted peer in retry pool. */
405  pool_foreach (r, s->retry_pool, ({
406  r->unacked_by_peer_bitmap =
407  clib_bitmap_andnoti (r->unacked_by_peer_bitmap, i);
408  }));
409  }));
410  clib_bitmap_free (dead_peer_bitmap);
411  }
412 }
413 
416 {
417  mc_main_t ** p = (void *) node->runtime_data;
418  return p[0];
419 }
420 
421 static uword
423  vlib_node_runtime_t * node,
424  vlib_frame_t * f)
425 {
426  mc_main_t * mcm = mc_node_get_main (node);
427  mc_stream_t * s;
428 
429  while (1)
430  {
431  vlib_process_suspend (vm, 1.0);
432  vec_foreach (s, mcm->stream_vector)
433  {
434  if (s->state != MC_STREAM_STATE_invalid)
435  check_retry (mcm, s);
436  }
437  }
438  return 0; /* not likely */
439 }
440 
441 static void send_join_or_leave_request (mc_main_t * mcm, u32 stream_index, u32 is_join)
442 {
443  vlib_main_t * vm = mcm->vlib_main;
444  mc_msg_join_or_leave_request_t * mp;
445  u32 bi;
446 
447  mp = mc_get_vlib_buffer (vm, sizeof (mp[0]), &bi);
448  memset(mp, 0, sizeof (*mp));
449  mp->type = MC_MSG_TYPE_join_or_leave_request;
450  mp->peer_id = mcm->transport.our_ack_peer_id;
451  mp->stream_index = stream_index;
452  mp->is_join = is_join;
453 
455 
456  /*
457  * These msgs are unnumbered, unordered so send on the from-relay
458  * channel.
459  */
461 }
462 
463 static uword
465  vlib_node_runtime_t * node,
466  vlib_frame_t * f)
467 {
468  mc_main_t * mcm = mc_node_get_main (node);
469 
470  while (1)
471  {
472  if (mcm->joins_in_progress)
473  {
474  mc_stream_t * s;
476  f64 now = vlib_time_now (vm);
477 
478  vec_foreach (s, mcm->stream_vector)
479  {
480  if (s->state != MC_STREAM_STATE_join_in_progress)
481  continue;
482 
483  if (now > s->join_timeout)
484  {
485  s->state = MC_STREAM_STATE_ready;
486 
487  if (MC_EVENT_LOGGING > 0)
488  {
489  ELOG_TYPE_DECLARE (e) = {
490  .format = "stream %d join timeout",
491  };
492  ELOG (mcm->elog_main, e, s->index);
493  }
494  /* Make sure that this app instance exists as a stream peer,
495  or we may answer a catchup request with a NULL
496  all_peer_bitmap... */
498  (mcm, s, mcm->transport.our_ack_peer_id, /* created */ 0);
499 
503  _vec_len (s->procs_waiting_for_join_done) = 0;
504 
505  mcm->joins_in_progress--;
506  ASSERT (mcm->joins_in_progress >= 0);
507  }
508  else
509  {
510  /* Resent join request which may have been lost. */
512  1 /* is_join */);
513 
514  /* We're *not* alone, retry for as long as it takes */
515  if (mcm->relay_state == MC_RELAY_STATE_SLAVE)
516  s->join_timeout = vlib_time_now (vm) + 2.0;
517 
518 
519  if (MC_EVENT_LOGGING > 0)
520  {
521  ELOG_TYPE_DECLARE (e) = {
522  .format = "stream %d resend join request",
523  };
524  ELOG (mcm->elog_main, e, s->index);
525  }
526  }
527  }
528  }
529 
530  vlib_process_suspend (vm, .5);
531  }
532 
533  return 0; /* not likely */
534 }
535 
537 {
538  char * name = va_arg (*va, char *);
539  serialize_cstring (m, name);
540 }
541 
542 static void elog_stream_name (char * buf, int n_buf_bytes, char * v)
543 {
544  clib_memcpy (buf, v, clib_min (n_buf_bytes - 1, vec_len (v)));
545  buf[n_buf_bytes - 1] = 0;
546 }
547 
549 {
550  mc_main_t * mcm = va_arg (*va, mc_main_t *);
551  char * name;
552  mc_stream_t * s;
553  uword * p;
554 
555  unserialize_cstring (m, &name);
556 
557  if ((p = hash_get_mem (mcm->stream_index_by_name, name)))
558  {
559  if (MC_EVENT_LOGGING > 0)
560  {
561  ELOG_TYPE_DECLARE (e) = {
562  .format = "stream index %d already named %s",
563  .format_args = "i4s16",
564  };
565  struct { u32 stream_index; char name[16]; } * ed;
566  ed = ELOG_DATA (mcm->elog_main, e);
567  ed->stream_index = p[0];
568  elog_stream_name (ed->name, sizeof (ed->name), name);
569  }
570 
571  vec_free (name);
572  return;
573  }
574 
575  vec_add2 (mcm->stream_vector, s, 1);
576  mc_stream_init (s);
577  s->state = MC_STREAM_STATE_name_known;
578  s->index = s - mcm->stream_vector;
579  s->config.name = name;
580 
581  if (MC_EVENT_LOGGING > 0)
582  {
583  ELOG_TYPE_DECLARE (e) = {
584  .format = "stream index %d named %s",
585  .format_args = "i4s16",
586  };
587  struct { u32 stream_index; char name[16]; } * ed;
588  ed = ELOG_DATA (mcm->elog_main, e);
589  ed->stream_index = s->index;
590  elog_stream_name (ed->name, sizeof (ed->name), name);
591  }
592 
593  hash_set_mem (mcm->stream_index_by_name, name, s->index);
594 
596  if (p)
597  {
600  vec_foreach (wp, w[0])
604  }
605 }
606 
607 MC_SERIALIZE_MSG (mc_register_stream_name_msg, static) = {
608  .name = "mc_register_stream_name",
611 };
612 
613 void
615  mc_stream_t * stream,
616  mc_peer_id_t peer_id,
617  u32 buffer_index)
618 { return mc_unserialize (mcm, stream, buffer_index); }
619 
620 static u8 *
622  u8 * data_vector,
623  u32 last_global_sequence_processed)
624 {
626 
627  /* Append serialized data to data vector. */
628  serialize_open_vector (&m, data_vector);
629  m.stream.current_buffer_index = vec_len (data_vector);
630 
631  serialize (&m, serialize_mc_main, mcm);
632  return serialize_close_vector (&m);
633 }
634 
635 static void
637  u8 * data,
638  u32 n_data_bytes)
639 {
641 
642  unserialize_open_data (&s, data, n_data_bytes);
643 
644  unserialize (&s, unserialize_mc_main, mcm);
645 }
646 
647 /* Overridden from the application layer, not actually used here */
648 void mc_stream_join_process_hold (void) __attribute__ ((weak));
650 
651 static u32
653  mc_stream_config_t * config,
654  u32 is_internal)
655 {
656  mc_stream_t * s;
657  vlib_main_t * vm = mcm->vlib_main;
658 
659  s = 0;
660  if (! is_internal)
661  {
662  uword * p;
663 
664  /* Already have a stream with given name? */
665  if ((s = mc_stream_by_name (mcm, config->name)))
666  {
667  /* Already joined and ready? */
668  if (s->state == MC_STREAM_STATE_ready)
669  return s->index;
670  }
671 
672  /* First join MC internal stream. */
673  if (! mcm->stream_vector
675  == MC_STREAM_STATE_invalid))
676  {
677  static mc_stream_config_t c = {
678  .name = "mc-internal",
679  .rx_buffer = mc_rx_buffer_unserialize,
680  .catchup = mc_internal_catchup,
681  .catchup_snapshot = mc_internal_catchup_snapshot,
682  };
683 
684  c.save_snapshot = config->save_snapshot;
685 
686  mc_stream_join_helper (mcm, &c, /* is_internal */ 1);
687  }
688 
689  /* If stream is still unknown register this name and wait for
690  sequenced message to name stream. This way all peers agree
691  on stream name to index mappings. */
692  s = mc_stream_by_name (mcm, config->name);
693  if (! s)
694  {
696  u8 * name_copy = format (0, "%s", config->name);
697 
698  mc_serialize_stream (mcm,
700  &mc_register_stream_name_msg,
701  config->name);
702 
703  /* Wait for this stream to be named. */
705  if (p)
707  else
708  {
712  = hash_create_string (/* elts */ 0, /* value size */ sizeof (uword));
714  name_copy,
716  w[0] = 0;
717  }
718 
719  vec_add2 (w[0], wp, 1);
721  vec_free (name_copy);
722  }
723 
724  /* Name should be known now. */
725  s = mc_stream_by_name (mcm, config->name);
726  ASSERT (s != 0);
727  ASSERT (s->state == MC_STREAM_STATE_name_known);
728  }
729 
730  if (! s)
731  {
732  vec_add2 (mcm->stream_vector, s, 1);
733  mc_stream_init (s);
734  s->index = s - mcm->stream_vector;
735  }
736 
737  {
738  /* Save name since we could have already used it as hash key. */
739  char * name_save = s->config.name;
740 
741  s->config = config[0];
742 
743  if (name_save)
744  s->config.name = name_save;
745  }
746 
747  if (s->config.window_size == 0)
748  s->config.window_size = 8;
749 
750  if (s->config.retry_interval == 0.0)
751  s->config.retry_interval = 1.0;
752 
753  /* Sanity. */
754  ASSERT (s->config.retry_interval < 30);
755 
756  if (s->config.retry_limit == 0)
757  s->config.retry_limit = 7;
758 
759  s->state = MC_STREAM_STATE_join_in_progress;
760  if (! s->peer_index_by_id.hash)
761  mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
762 
763  /* If we don't hear from someone in 5 seconds, we're alone */
764  s->join_timeout = vlib_time_now (vm) + 5.0;
765  mcm->joins_in_progress++;
766 
767  if (MC_EVENT_LOGGING > 0)
768  {
769  ELOG_TYPE_DECLARE (e) = {
770  .format = "stream index %d join request %s",
771  .format_args = "i4s16",
772  };
773  struct { u32 stream_index; char name[16]; } * ed;
774  ed = ELOG_DATA (mcm->elog_main, e);
775  ed->stream_index = s->index;
776  elog_stream_name (ed->name, sizeof (ed->name), s->config.name);
777  }
778 
779  send_join_or_leave_request (mcm, s->index, 1 /* join */);
780 
782  (vm, &s->procs_waiting_for_join_done);
783 
784  if (MC_EVENT_LOGGING)
785  {
786  ELOG_TYPE (e, "join complete stream %d");
787  ELOG (mcm->elog_main, e, s->index);
788  }
789 
790  return s->index;
791 }
792 
794 { return mc_stream_join_helper (mcm, config, /* is_internal */ 0); }
795 
796 void mc_stream_leave (mc_main_t * mcm, u32 stream_index)
797 {
798  mc_stream_t * s = mc_stream_by_index (mcm, stream_index);
799 
800  if (! s)
801  return;
802 
803  if (MC_EVENT_LOGGING)
804  {
805  ELOG_TYPE_DECLARE (t) = {
806  .format = "leave-stream: %d",
807  .format_args = "i4",
808  };
809  struct { u32 index; } * ed;
810  ed = ELOG_DATA (mcm->elog_main, t);
811  ed->index = stream_index;
812  }
813 
814  send_join_or_leave_request (mcm, stream_index, 0 /* is_join */);
815  mc_stream_free (s);
816  s->state = MC_STREAM_STATE_name_known;
817 }
818 
820  mc_msg_join_or_leave_request_t * req,
821  u32 buffer_index)
822 {
823  mc_stream_t * s;
824  mc_msg_join_reply_t * rep;
825  u32 bi;
826 
828 
829  s = mc_stream_by_index (mcm, req->stream_index);
830  if (! s || s->state != MC_STREAM_STATE_ready)
831  return;
832 
833  /* If the peer is joining, create it */
834  if (req->is_join)
835  {
836  mc_stream_t * this_s;
837 
838  /* We're not in a position to catch up a peer until all
839  stream joins are complete. */
840  if (0)
841  {
842  /* XXX This is hard to test so we've. */
843  vec_foreach (this_s, mcm->stream_vector)
844  {
845  if (this_s->state != MC_STREAM_STATE_ready
846  && this_s->state != MC_STREAM_STATE_name_known)
847  return;
848  }
849  }
850  else
851  if (mcm->joins_in_progress > 0)
852  return;
853 
855  s,
856  req->peer_id,
857  /* created */ 0);
858 
859  rep = mc_get_vlib_buffer (mcm->vlib_main, sizeof (rep[0]), &bi);
860  memset (rep, 0, sizeof (rep[0]));
861  rep->type = MC_MSG_TYPE_join_reply;
862  rep->stream_index = req->stream_index;
863 
865  /* These two are already in network byte order... */
866  rep->peer_id = mcm->transport.our_ack_peer_id;
867  rep->catchup_peer_id = mcm->transport.our_catchup_peer_id;
868 
870  }
871  else
872  {
873  if (s->config.peer_died)
874  s->config.peer_died (mcm, s, req->peer_id);
875  }
876 }
877 
879  mc_msg_join_reply_t * mp,
880  u32 buffer_index)
881 {
882  mc_stream_t * s;
883 
885 
886  s = mc_stream_by_index (mcm, mp->stream_index);
887 
888  if (! s || s->state != MC_STREAM_STATE_join_in_progress)
889  return;
890 
891  /* Switch to catchup state; next join reply
892  for this stream will be ignored. */
893  s->state = MC_STREAM_STATE_catchup;
894 
895  mcm->joins_in_progress--;
897  mp->stream_index,
898  mp->catchup_peer_id);
899 }
900 
901 void mc_wait_for_stream_ready (mc_main_t * m, char * stream_name)
902 {
903  mc_stream_t * s;
904 
905  while (1)
906  {
907  s = mc_stream_by_name (m, stream_name);
908  if (s)
909  break;
911  }
912 
913  /* It's OK to send a message in catchup and ready states. */
914  if (s->state == MC_STREAM_STATE_catchup
915  || s->state == MC_STREAM_STATE_ready)
916  return;
917 
918  /* Otherwise we are waiting for a join to finish. */
921 }
922 
923 u32 mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index)
924 {
925  mc_stream_t * s = mc_stream_by_index (mcm, stream_index);
926  vlib_main_t * vm = mcm->vlib_main;
927  mc_retry_t * r;
928  mc_msg_user_request_t * mp;
929  vlib_buffer_t * b = vlib_get_buffer (vm, buffer_index);
930  u32 ri;
931 
932  if (! s)
933  return 0;
934 
935  if (s->state != MC_STREAM_STATE_ready)
937  (vm, &s->procs_waiting_for_join_done);
938 
939  while (pool_elts (s->retry_pool) >= s->config.window_size)
940  {
943  }
944 
945  pool_get (s->retry_pool, r);
946  ri = r - s->retry_pool;
947 
949  r->next_index = ~0;
950  s->retry_tail_index = ri;
951 
952  if (r->prev_index == ~0)
953  s->retry_head_index = ri;
954  else
955  {
957  p->next_index = ri;
958  }
959 
960  vlib_buffer_advance (b, -sizeof (mp[0]));
961  mp = vlib_buffer_get_current (b);
962 
963  mp->peer_id = mcm->transport.our_ack_peer_id;
964  /* mp->transport.global_sequence set by relay agent. */
965  mp->global_sequence = 0xdeadbeef;
966  mp->stream_index = s->index;
967  mp->local_sequence = s->our_local_sequence++;
968  mp->n_data_bytes = vlib_buffer_index_length_in_chain (vm, buffer_index) - sizeof (mp[0]);
969 
970  r->buffer_index = buffer_index;
971  r->local_sequence = mp->local_sequence;
972  r->sent_at = vlib_time_now(vm);
973  r->n_retries = 0;
974 
975  /* Retry will be freed when all currently known peers have acked. */
978 
980 
981  elog_tx_msg (mcm, s->index, mp->local_sequence, r->n_retries);
982 
984 
986 
987  s->user_requests_sent++;
988 
989  /* return amount of window remaining */
990  return s->config.window_size - pool_elts (s->retry_pool);
991 }
992 
993 void mc_msg_user_request_handler (mc_main_t * mcm, mc_msg_user_request_t * mp, u32 buffer_index)
994 {
995  vlib_main_t * vm = mcm->vlib_main;
996  mc_stream_t * s;
997  mc_stream_peer_t * peer;
998  i32 seq_cmp_result;
999  static int once=0;
1000 
1002 
1003  s = mc_stream_by_index (mcm, mp->stream_index);
1004 
1005  /* Not signed up for this stream? Turf-o-matic */
1006  if (! s || s->state != MC_STREAM_STATE_ready)
1007  {
1008  vlib_buffer_free_one (vm, buffer_index);
1009  return;
1010  }
1011 
1012  /* Find peer, including ourselves. */
1013  peer = get_or_create_peer_with_id (mcm,
1014  s, mp->peer_id,
1015  /* created */ 0);
1016 
1017  seq_cmp_result = mc_seq_cmp (mp->local_sequence,
1018  peer->last_sequence_received + 1);
1019 
1020  if (MC_EVENT_LOGGING > 0)
1021  {
1022  ELOG_TYPE_DECLARE (e) = {
1023  .format = "rx-msg: peer %s stream %d rx seq %d seq_cmp %d",
1024  .format_args = "T4i4i4i4",
1025  };
1026  struct { u32 peer, stream_index, rx_sequence; i32 seq_cmp_result; } * ed;
1027  ed = ELOG_DATA (mcm->elog_main, e);
1028  ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
1029  ed->stream_index = mp->stream_index;
1030  ed->rx_sequence = mp->local_sequence;
1031  ed->seq_cmp_result = seq_cmp_result;
1032  }
1033 
1034  if (0 && mp->stream_index == 1 && once == 0)
1035  {
1036  once = 1;
1037  ELOG_TYPE (e, "FAKE lost msg on stream 1");
1038  ELOG (mcm->elog_main,e,0);
1039  return;
1040  }
1041 
1042  peer->last_sequence_received += seq_cmp_result == 0;
1044 
1045  if (seq_cmp_result > 0)
1046  peer->stats.n_msgs_from_future += 1;
1047 
1048  /* Send ack even if msg from future */
1049  if (1)
1050  {
1051  mc_msg_user_ack_t * rp;
1052  u32 bi;
1053 
1054  rp = mc_get_vlib_buffer (vm, sizeof (rp[0]), &bi);
1055  rp->peer_id = mcm->transport.our_ack_peer_id;
1056  rp->stream_index = s->index;
1057  rp->local_sequence = mp->local_sequence;
1058  rp->seq_cmp_result = seq_cmp_result;
1059 
1060  if (MC_EVENT_LOGGING > 0)
1061  {
1062  ELOG_TYPE_DECLARE (e) = {
1063  .format = "tx-ack: stream %d local seq %d",
1064  .format_args = "i4i4",
1065  };
1066  struct { u32 stream_index; u32 local_sequence; } * ed;
1067  ed = ELOG_DATA (mcm->elog_main, e);
1068  ed->stream_index = rp->stream_index;
1069  ed->local_sequence = rp->local_sequence;
1070  }
1071 
1073 
1074  mcm->transport.tx_ack (mcm->transport.opaque, mp->peer_id, bi);
1075  /* Msg from past? If so, free the buffer... */
1076  if (seq_cmp_result < 0)
1077  {
1078  vlib_buffer_free_one (vm, buffer_index);
1079  peer->stats.n_msgs_from_past += 1;
1080  }
1081  }
1082 
1083  if (seq_cmp_result == 0)
1084  {
1085  vlib_buffer_t * b = vlib_get_buffer (vm, buffer_index);
1086  switch (s->state)
1087  {
1088  case MC_STREAM_STATE_ready:
1089  vlib_buffer_advance (b, sizeof (mp[0]));
1090  s->config.rx_buffer(mcm, s, mp->peer_id, buffer_index);
1091 
1092  /* Stream vector can change address via rx callback for mc-internal
1093  stream. */
1094  s = mc_stream_by_index (mcm, mp->stream_index);
1095  ASSERT (s != 0);
1096  s->last_global_sequence_processed = mp->global_sequence;
1097  break;
1098 
1099  case MC_STREAM_STATE_catchup:
1100  clib_fifo_add1 (s->catchup_fifo, buffer_index);
1101  break;
1102 
1103  default:
1104  clib_warning ("stream in unknown state %U",
1106  break;
1107  }
1108  }
1109 }
1110 
1111 void mc_msg_user_ack_handler (mc_main_t * mcm, mc_msg_user_ack_t * mp, u32 buffer_index)
1112 {
1113  vlib_main_t * vm = mcm->vlib_main;
1114  uword *p;
1115  mc_stream_t * s;
1116  mc_stream_peer_t * peer;
1117  mc_retry_t * r;
1118  int peer_created = 0;
1119 
1121 
1122  s = mc_stream_by_index (mcm, mp->stream_index);
1123 
1124  if (MC_EVENT_LOGGING > 0)
1125  {
1126  ELOG_TYPE_DECLARE (t) = {
1127  .format = "rx-ack: local seq %d peer %s seq_cmp_result %d",
1128  .format_args = "i4T4i4",
1129  };
1130  struct { u32 local_sequence; u32 peer; i32 seq_cmp_result;} * ed;
1131  ed = ELOG_DATA (mcm->elog_main, t);
1132  ed->local_sequence = mp->local_sequence;
1133  ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
1134  ed->seq_cmp_result = mp->seq_cmp_result;
1135  }
1136 
1137  /* Unknown stream? */
1138  if (! s)
1139  return;
1140 
1141  /* Find the peer which just ack'ed. */
1142  peer = get_or_create_peer_with_id (mcm, s, mp->peer_id,
1143  /* created */ &peer_created);
1144 
1145  /*
1146  * Peer reports message from the future. If it's not in the retry
1147  * fifo, look for a retired message.
1148  */
1149  if (mp->seq_cmp_result > 0)
1150  {
1151  p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence -
1152  mp->seq_cmp_result);
1153  if (p == 0)
1154  mc_resend_retired (mcm, s, mp->local_sequence - mp->seq_cmp_result);
1155 
1156  /* Normal retry should fix it... */
1157  return;
1158  }
1159 
1160  /*
1161  * Pointer to the indicated retry fifo entry.
1162  * Worth hashing because we could use a window size of 100 or 1000.
1163  */
1164  p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence);
1165 
1166  /*
1167  * Is this a duplicate ACK, received after we've retired the
1168  * fifo entry. This can happen when learning about new
1169  * peers.
1170  */
1171  if (p == 0)
1172  {
1173  if (MC_EVENT_LOGGING > 0)
1174  {
1175  ELOG_TYPE_DECLARE (t) =
1176  {
1177  .format = "ack: for seq %d from peer %s no fifo elt",
1178  .format_args = "i4T4",
1179  };
1180  struct { u32 seq; u32 peer; } * ed;
1181  ed = ELOG_DATA (mcm->elog_main, t);
1182  ed->seq = mp->local_sequence;
1183  ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
1184  }
1185 
1186  return;
1187  }
1188 
1189  r = pool_elt_at_index (s->retry_pool, p[0]);
1190 
1191  /* Make sure that this new peer ACKs our msgs from now on */
1192  if (peer_created)
1193  {
1194  mc_retry_t *later_retry = next_retry (s, r);
1195 
1196  while (later_retry)
1197  {
1198  later_retry->unacked_by_peer_bitmap =
1199  clib_bitmap_ori (later_retry->unacked_by_peer_bitmap,
1200  peer - s->peers);
1201  later_retry = next_retry (s, later_retry);
1202  }
1203  }
1204 
1205  ASSERT (mp->local_sequence == r->local_sequence);
1206 
1207  /* If we weren't expecting to hear from this peer */
1208  if (!peer_created &&
1209  ! clib_bitmap_get (r->unacked_by_peer_bitmap, peer - s->peers))
1210  {
1211  if (MC_EVENT_LOGGING > 0)
1212  {
1213  ELOG_TYPE_DECLARE (t) =
1214  {
1215  .format = "dup-ack: for seq %d from peer %s",
1216  .format_args = "i4T4",
1217  };
1218  struct { u32 seq; u32 peer; } * ed;
1219  ed = ELOG_DATA (mcm->elog_main, t);
1220  ed->seq = r->local_sequence;
1221  ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
1222  }
1224  return;
1225  }
1226 
1227  if (MC_EVENT_LOGGING > 0)
1228  {
1229  ELOG_TYPE_DECLARE (t) =
1230  {
1231  .format = "ack: for seq %d from peer %s",
1232  .format_args = "i4T4",
1233  };
1234  struct { u32 seq; u32 peer; } * ed;
1235  ed = ELOG_DATA (mcm->elog_main, t);
1236  ed->seq = mp->local_sequence;
1237  ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
1238  }
1239 
1241  clib_bitmap_andnoti (r->unacked_by_peer_bitmap, peer - s->peers);
1242 
1243  /* Not all clients have ack'ed */
1245  {
1246  return;
1247  }
1248  if (MC_EVENT_LOGGING > 0)
1249  {
1250  ELOG_TYPE_DECLARE (t) =
1251  {
1252  .format = "ack: retire fifo elt loc seq %d after %d acks",
1253  .format_args = "i4i4",
1254  };
1255  struct { u32 seq; u32 npeers; } * ed;
1256  ed = ELOG_DATA (mcm->elog_main, t);
1257  ed->seq = r->local_sequence;
1258  ed->npeers = pool_elts (s->peers);
1259  }
1260 
1261  hash_unset (s->retry_index_by_local_sequence, mp->local_sequence);
1262  mc_retry_free (mcm, s, r);
1263  remove_retry_from_pool (s, r);
1265 }
1266 
1267 #define EVENT_MC_SEND_CATCHUP_DATA 0
1268 
1269 static uword
1271  vlib_node_runtime_t * node,
1272  vlib_frame_t * f)
1273 {
1274  mc_main_t * mcm = mc_node_get_main (node);
1275  uword *event_data = 0;
1276  mc_catchup_process_arg_t * args;
1277  int i;
1278 
1279  while (1)
1280  {
1281  if (event_data)
1282  _vec_len(event_data) = 0;
1284 
1285  for (i = 0; i < vec_len(event_data); i++)
1286  {
1288  event_data[i]);
1289 
1291  args->catchup_opaque,
1292  args->catchup_snapshot);
1293 
1294  /* Send function will free snapshot data vector. */
1295  pool_put (mcm->catchup_process_args, args);
1296  }
1297  }
1298 
1299  return 0; /* not likely */
1300 }
1301 
1302 static void serialize_mc_stream (serialize_main_t * m, va_list * va)
1303 {
1304  mc_stream_t * s = va_arg (*va, mc_stream_t *);
1305  mc_stream_peer_t * p;
1306 
1307  serialize_integer (m, pool_elts (s->peers), sizeof (u32));
1308  pool_foreach (p, s->peers, ({
1309  u8 * x = serialize_get (m, sizeof (p->id));
1310  clib_memcpy (x, p->id.as_u8, sizeof (p->id));
1311  serialize_integer (m, p->last_sequence_received,
1312  sizeof (p->last_sequence_received));
1313  }));
1315 }
1316 
1317 void unserialize_mc_stream (serialize_main_t * m, va_list * va)
1318 {
1319  mc_stream_t * s = va_arg (*va, mc_stream_t *);
1320  u32 i, n_peers;
1321  mc_stream_peer_t * p;
1322 
1323  unserialize_integer (m, &n_peers, sizeof (u32));
1324  mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
1325  for (i = 0; i < n_peers; i++)
1326  {
1327  u8 * x;
1328  pool_get (s->peers, p);
1329  x = unserialize_get (m, sizeof (p->id));
1330  clib_memcpy (p->id.as_u8, x, sizeof (p->id));
1332  mhash_set (&s->peer_index_by_id, &p->id, p - s->peers, /* old_value */ 0);
1333  }
1335 
1336  /* This is really bad. */
1337  if (!s->all_peer_bitmap)
1338  clib_warning ("BUG: stream %s all_peer_bitmap NULL", s->config.name);
1339 }
1340 
1341 void mc_msg_catchup_request_handler (mc_main_t * mcm, mc_msg_catchup_request_t * req, u32 catchup_opaque)
1342 {
1343  vlib_main_t * vm = mcm->vlib_main;
1344  mc_stream_t * s;
1345  mc_catchup_process_arg_t * args;
1346 
1348 
1349  s = mc_stream_by_index (mcm, req->stream_index);
1350  if (! s || s->state != MC_STREAM_STATE_ready)
1351  return;
1352 
1353  if (MC_EVENT_LOGGING > 0)
1354  {
1355  ELOG_TYPE_DECLARE (t) =
1356  {
1357  .format = "catchup-request: from %s stream %d",
1358  .format_args = "T4i4",
1359  };
1360  struct { u32 peer, stream; } * ed;
1361  ed = ELOG_DATA (mcm->elog_main, t);
1362  ed->peer = elog_id_for_peer_id (mcm, req->peer_id.as_u64);
1363  ed->stream = req->stream_index;
1364  }
1365 
1366  /*
1367  * The application has to snapshoot its data structures right
1368  * here, right now. If we process any messages after
1369  * noting the last global sequence we've processed, the client
1370  * won't be able to accurately reconstruct our data structures.
1371  *
1372  * Once the data structures are e.g. vec_dup()'ed, we
1373  * send the resulting messages from a separate process, to
1374  * make sure that we don't cause a bunch of message retransmissions
1375  */
1376  pool_get (mcm->catchup_process_args, args);
1377 
1378  args->stream_index = s - mcm->stream_vector;
1379  args->catchup_opaque = catchup_opaque;
1380  args->catchup_snapshot = 0;
1381 
1382  /* Construct catchup reply and snapshot state for stream to send as
1383  catchup reply payload. */
1384  {
1385  mc_msg_catchup_reply_t * rep;
1386  serialize_main_t m;
1387 
1388  vec_resize (args->catchup_snapshot, sizeof (rep[0]));
1389 
1390  rep = (void *) args->catchup_snapshot;
1391 
1392  rep->peer_id = req->peer_id;
1393  rep->stream_index = req->stream_index;
1394  rep->last_global_sequence_included = s->last_global_sequence_processed;
1395 
1396  /* Setup for serialize to append to catchup snapshot. */
1399 
1400  serialize (&m, serialize_mc_stream, s);
1401 
1403 
1404  /* Actually copy internal state */
1406  (mcm,
1407  args->catchup_snapshot,
1408  rep->last_global_sequence_included);
1409 
1410  rep = (void *) args->catchup_snapshot;
1411  rep->n_data_bytes = vec_len (args->catchup_snapshot) - sizeof (rep[0]);
1412 
1414  }
1415 
1416  /* now go send it... */
1419  args - mcm->catchup_process_args);
1420 }
1421 
1422 #define EVENT_MC_UNSERIALIZE_BUFFER 0
1423 #define EVENT_MC_UNSERIALIZE_CATCHUP 1
1424 
1425 void mc_msg_catchup_reply_handler (mc_main_t * mcm, mc_msg_catchup_reply_t * mp, u32 catchup_opaque)
1426 {
1428  mcm->unserialize_process,
1430  pointer_to_uword (mp));
1431 }
1432 
1433 static void perform_catchup (mc_main_t * mcm, mc_msg_catchup_reply_t * mp)
1434 {
1435  mc_stream_t * s;
1436  i32 seq_cmp_result;
1437 
1439 
1440  s = mc_stream_by_index (mcm, mp->stream_index);
1441 
1442  /* Never heard of this stream or already caught up. */
1443  if (! s || s->state == MC_STREAM_STATE_ready)
1444  return;
1445 
1446  {
1447  serialize_main_t m;
1448  mc_stream_peer_t * p;
1449  u32 n_stream_bytes;
1450 
1451  /* For offline sim replay: save the entire catchup snapshot... */
1452  if (s->config.save_snapshot)
1453  s->config.save_snapshot (mcm, /* is_catchup */ 1, mp->data, mp->n_data_bytes);
1454 
1455  unserialize_open_data (&m, mp->data, mp->n_data_bytes);
1457 
1458  /* Make sure we start numbering our messages as expected */
1459  pool_foreach (p, s->peers, ({
1460  if (p->id.as_u64 == mcm->transport.our_ack_peer_id.as_u64)
1461  s->our_local_sequence = p->last_sequence_received + 1;
1462  }));
1463 
1464  n_stream_bytes = m.stream.current_buffer_index;
1465 
1466  /* No need to unserialize close; nothing to free. */
1467 
1468  /* After serialized stream is user's catchup data. */
1469  s->config.catchup (mcm, mp->data + n_stream_bytes,
1470  mp->n_data_bytes - n_stream_bytes);
1471  }
1472 
1473  /* Vector could have been moved by catchup.
1474  This can only happen for mc-internal stream. */
1475  s = mc_stream_by_index (mcm, mp->stream_index);
1476 
1477  s->last_global_sequence_processed = mp->last_global_sequence_included;
1478 
1479  while (clib_fifo_elts (s->catchup_fifo))
1480  {
1481  mc_msg_user_request_t * gp;
1482  u32 bi;
1483  vlib_buffer_t * b;
1484 
1485  clib_fifo_sub1(s->catchup_fifo, bi);
1486 
1487  b = vlib_get_buffer (mcm->vlib_main, bi);
1488  gp = vlib_buffer_get_current (b);
1489 
1490  /* Make sure we're replaying "new" news */
1491  seq_cmp_result = mc_seq_cmp (gp->global_sequence,
1492  mp->last_global_sequence_included);
1493 
1494  if (seq_cmp_result > 0)
1495  {
1496  vlib_buffer_advance (b, sizeof (gp[0]));
1497  s->config.rx_buffer (mcm, s, gp->peer_id, bi);
1498  s->last_global_sequence_processed = gp->global_sequence;
1499 
1500  if (MC_EVENT_LOGGING)
1501  {
1502  ELOG_TYPE_DECLARE (t) = {
1503  .format = "catchup replay local sequence 0x%x",
1504  .format_args = "i4",
1505  };
1506  struct { u32 local_sequence; } * ed;
1507  ed = ELOG_DATA (mcm->elog_main, t);
1508  ed->local_sequence = gp->local_sequence;
1509  }
1510  }
1511  else
1512  {
1513  if (MC_EVENT_LOGGING)
1514  {
1515  ELOG_TYPE_DECLARE (t) = {
1516  .format = "catchup discard local sequence 0x%x",
1517  .format_args = "i4",
1518  };
1519  struct { u32 local_sequence; } * ed;
1520  ed = ELOG_DATA (mcm->elog_main, t);
1521  ed->local_sequence = gp->local_sequence;
1522  }
1523 
1524  vlib_buffer_free_one (mcm->vlib_main, bi);
1525  }
1526  }
1527 
1528  s->state = MC_STREAM_STATE_ready;
1529 
1530  /* Now that we are caught up wake up joining process. */
1531  {
1536  _vec_len (s->procs_waiting_for_join_done) = 0;
1537  }
1538 }
1539 
1541 {
1542  vlib_main_t * vm = mcm->vlib_main;
1543  mc_msg_master_assert_t * mp;
1544  uword event_type;
1545  int timeouts = 0;
1546  int is_master = mcm->relay_state == MC_RELAY_STATE_MASTER;
1547  clib_error_t * error;
1548  f64 now, time_last_master_assert = -1;
1549  u32 bi;
1550 
1551  while (1)
1552  {
1553  if (! mcm->we_can_be_relay_master)
1554  {
1556  if (MC_EVENT_LOGGING)
1557  {
1558  ELOG_TYPE (e, "become slave (config)");
1559  ELOG (mcm->elog_main, e, 0);
1560  }
1561  return;
1562  }
1563 
1564  now = vlib_time_now (vm);
1565  if (now >= time_last_master_assert + 1)
1566  {
1567  time_last_master_assert = now;
1568  mp = mc_get_vlib_buffer (mcm->vlib_main, sizeof (mp[0]), &bi);
1569 
1570  mp->peer_id = mcm->transport.our_ack_peer_id;
1571  mp->global_sequence = mcm->relay_global_sequence;
1572 
1573  /*
1574  * these messages clog the event log, set MC_EVENT_LOGGING higher
1575  * if you want them
1576  */
1577  if (MC_EVENT_LOGGING > 1)
1578  {
1579  ELOG_TYPE_DECLARE (e) = {
1580  .format = "tx-massert: peer %s global seq %u",
1581  .format_args = "T4i4",
1582  };
1583  struct { u32 peer, global_sequence; } * ed;
1584  ed = ELOG_DATA (mcm->elog_main, e);
1585  ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
1586  ed->global_sequence = mp->global_sequence;
1587  }
1588 
1590 
1591  error = mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_MASTERSHIP, bi);
1592  if (error)
1593  clib_error_report (error);
1594  }
1595 
1597  event_type = vlib_process_get_events (vm, /* no event data */ 0);
1598 
1599  switch (event_type)
1600  {
1601  case ~0:
1602  if (! is_master && timeouts++ > 2)
1603  {
1606  if (MC_EVENT_LOGGING)
1607  {
1608  ELOG_TYPE (e, "become master (was maybe_master)");
1609  ELOG (mcm->elog_main, e, 0);
1610  }
1611  return;
1612  }
1613  break;
1614 
1615  case MC_RELAY_STATE_SLAVE:
1618  {
1619  ELOG_TYPE (e, "become slave (was maybe_master)");
1620  ELOG (mcm->elog_main, e, 0);
1621  }
1622  return;
1623  }
1624  }
1625 }
1626 
1627 static void this_node_slave (mc_main_t * mcm)
1628 {
1629  vlib_main_t * vm = mcm->vlib_main;
1630  uword event_type;
1631  int timeouts = 0;
1632 
1633  if (MC_EVENT_LOGGING)
1634  {
1635  ELOG_TYPE (e, "become slave");
1636  ELOG (mcm->elog_main, e, 0);
1637  }
1638 
1639  while (1)
1640  {
1642  event_type = vlib_process_get_events (vm, /* no event data */ 0);
1643 
1644  switch (event_type)
1645  {
1646  case ~0:
1647  if (timeouts++ > 2)
1648  {
1650  mcm->relay_master_peer_id = ~0ULL;
1651  if (MC_EVENT_LOGGING)
1652  {
1653  ELOG_TYPE (e, "timeouts; negoitate mastership");
1654  ELOG (mcm->elog_main, e, 0);
1655  }
1656  return;
1657  }
1658  break;
1659 
1660  case MC_RELAY_STATE_SLAVE:
1662  timeouts = 0;
1663  break;
1664  }
1665  }
1666 }
1667 
1668 static uword
1670  vlib_node_runtime_t * node,
1671  vlib_frame_t * f)
1672 {
1673  mc_main_t * mcm = mc_node_get_main (node);
1674 
1675  while (1)
1676  {
1677  switch (mcm->relay_state)
1678  {
1680  case MC_RELAY_STATE_MASTER:
1682  break;
1683 
1684  case MC_RELAY_STATE_SLAVE:
1685  this_node_slave (mcm);
1686  break;
1687  }
1688  }
1689  return 0; /* not likely */
1690 }
1691 
1692 void mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master)
1693 {
1694  if (we_can_be_master != mcm->we_can_be_relay_master)
1695  {
1696  mcm->we_can_be_relay_master = we_can_be_master;
1698  mcm->mastership_process,
1700  }
1701 }
1702 
1703 void mc_msg_master_assert_handler (mc_main_t * mcm, mc_msg_master_assert_t * mp, u32 buffer_index)
1704 {
1705  mc_peer_id_t his_peer_id, our_peer_id;
1706  i32 seq_cmp_result;
1707  u8 signal_slave = 0;
1708  u8 update_global_sequence = 0;
1709 
1711 
1712  his_peer_id = mp->peer_id;
1713  our_peer_id = mcm->transport.our_ack_peer_id;
1714 
1715  /* compare the incoming global sequence with ours */
1716  seq_cmp_result = mc_seq_cmp (mp->global_sequence,
1717  mcm->relay_global_sequence);
1718 
1719  /* If the sender has a lower peer id and the sender's sequence >=
1720  our global sequence, we become a slave. Otherwise we are master. */
1721  if (mc_peer_id_compare (his_peer_id, our_peer_id) < 0 && seq_cmp_result >= 0)
1722  {
1724  mcm->mastership_process,
1726  signal_slave = 1;
1727  }
1728 
1729  /* Update our global sequence. */
1730  if (seq_cmp_result > 0)
1731  {
1732  mcm->relay_global_sequence = mp->global_sequence;
1733  update_global_sequence = 1;
1734  }
1735 
1736  {
1737  uword * q = mhash_get (&mcm->mastership_peer_index_by_id, &his_peer_id);
1739 
1740  if (q)
1741  p = vec_elt_at_index (mcm->mastership_peers, q[0]);
1742  else
1743  {
1744  vec_add2 (mcm->mastership_peers, p, 1);
1745  p->peer_id = his_peer_id;
1747  /* old_value */ 0);
1748  }
1750  }
1751 
1752  /*
1753  * these messages clog the event log, set MC_EVENT_LOGGING higher
1754  * if you want them.
1755  */
1756  if (MC_EVENT_LOGGING > 1)
1757  {
1758  ELOG_TYPE_DECLARE (e) = {
1759  .format = "rx-massert: peer %s global seq %u upd %d slave %d",
1760  .format_args = "T4i4i1i1",
1761  };
1762  struct {
1763  u32 peer;
1764  u32 global_sequence;
1765  u8 update_sequence;
1766  u8 slave;
1767  } * ed;
1768  ed = ELOG_DATA (mcm->elog_main, e);
1769  ed->peer = elog_id_for_peer_id (mcm, his_peer_id.as_u64);
1770  ed->global_sequence = mp->global_sequence;
1771  ed->update_sequence = update_global_sequence;
1772  ed->slave = signal_slave;
1773  }
1774 }
1775 
1776 static void
1778 {
1779  mc_serialize_msg_t * m;
1780  vlib_main_t * vm = vlib_get_main();
1781 
1783  = hash_create_string (/* elts */ 0, sizeof (uword));
1784 
1785  m = vm->mc_msg_registrations;
1786 
1787  while (m)
1788  {
1789  m->global_index = vec_len (mcm->global_msgs);
1791  m->name,
1792  m->global_index);
1793  vec_add1 (mcm->global_msgs, m);
1794  m = m->next_registration;
1795  }
1796 }
1797 
1798 clib_error_t *
1800  u32 stream_index,
1801  u32 multiple_messages_per_vlib_buffer,
1802  mc_serialize_msg_t * msg,
1803  va_list * va)
1804 {
1805  mc_stream_t * s;
1806  clib_error_t * error;
1809  u32 bi, n_before, n_after, n_total, n_this_msg;
1810  u32 si, gi;
1811 
1812  if (! sbm->vlib_main)
1813  {
1814  sbm->tx.max_n_data_bytes_per_chain = 4096;
1815  sbm->tx.free_list_index = VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX;
1816  }
1817 
1818  if (sbm->first_buffer == 0)
1819  serialize_open_vlib_buffer (m, mc->vlib_main, sbm);
1820 
1821  n_before = serialize_vlib_buffer_n_bytes (m);
1822 
1823  s = mc_stream_by_index (mc, stream_index);
1824  gi = msg->global_index;
1825  ASSERT (msg == vec_elt (mc->global_msgs, gi));
1826 
1827  si = ~0;
1830 
1832 
1833  /* For first time message is sent, use name to identify message. */
1834  if (si == ~0 || MSG_ID_DEBUG)
1835  serialize_cstring (m, msg->name);
1836 
1837  if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0)
1838  {
1839  ELOG_TYPE_DECLARE (e) = {
1840  .format = "serialize-msg: %s index %d",
1841  .format_args = "T4i4",
1842  };
1843  struct { u32 c[2]; } * ed;
1844  ed = ELOG_DATA (mc->elog_main, e);
1845  ed->c[0] = elog_id_for_msg_name (mc, msg->name);
1846  ed->c[1] = si;
1847  }
1848 
1849  error = va_serialize (m, va);
1850 
1851  n_after = serialize_vlib_buffer_n_bytes (m);
1852  n_this_msg = n_after - n_before;
1853  n_total = n_after + sizeof (mc_msg_user_request_t);
1854 
1855  /* For max message size ignore first message where string name is sent. */
1856  if (si != ~0)
1857  msg->max_n_bytes_serialized = clib_max (msg->max_n_bytes_serialized, n_this_msg);
1858 
1859  if (! multiple_messages_per_vlib_buffer
1860  || si == ~0
1861  || n_total + msg->max_n_bytes_serialized > mc->transport.max_packet_size)
1862  {
1863  bi = serialize_close_vlib_buffer (m);
1864  sbm->first_buffer = 0;
1865  if (! error)
1866  mc_stream_send (mc, stream_index, bi);
1867  else if (bi != ~0)
1868  vlib_buffer_free_one (mc->vlib_main, bi);
1869  }
1870 
1871  return error;
1872 }
1873 
1874 clib_error_t *
1876  u32 stream_index,
1877  u32 multiple_messages_per_vlib_buffer,
1878  mc_serialize_msg_t * msg,
1879  ...)
1880 {
1881  vlib_main_t * vm = mc->vlib_main;
1882  va_list va;
1883  clib_error_t * error;
1884 
1885  if (stream_index == ~0)
1886  {
1887  if (vm->mc_main && vm->mc_stream_index == ~0)
1890  stream_index = vm->mc_stream_index;
1891  }
1892 
1893  va_start (va, msg);
1894  error = mc_serialize_va (mc, stream_index,
1895  multiple_messages_per_vlib_buffer,
1896  msg, &va);
1897  va_end (va);
1898  return error;
1899 }
1900 
1902  mc_stream_t * s,
1903  serialize_main_t * m)
1904 {
1906  u32 gi, si;
1907 
1909 
1910  if (! (si == ~0 || MSG_ID_DEBUG))
1911  {
1912  sm = vec_elt_at_index (s->stream_msgs, si);
1913  gi = sm->global_index;
1914  }
1915  else
1916  {
1917  char * name;
1918 
1919  unserialize_cstring (m, &name);
1920 
1921  if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0)
1922  {
1923  ELOG_TYPE_DECLARE (e) = {
1924  .format = "unserialize-msg: %s rx index %d",
1925  .format_args = "T4i4",
1926  };
1927  struct { u32 c[2]; } * ed;
1928  ed = ELOG_DATA (mcm->elog_main, e);
1929  ed->c[0] = elog_id_for_msg_name (mcm, name);
1930  ed->c[1] = si;
1931  }
1932 
1933  {
1934  uword * p = hash_get_mem (mcm->global_msg_index_by_name, name);
1935  gi = p ? p[0] : ~0;
1936  }
1937 
1938  /* Unknown message? */
1939  if (gi == ~0)
1940  {
1941  vec_free (name);
1942  goto done;
1943  }
1944 
1947 
1948  /* Stream local index unknown? Create it. */
1949  if (si == ~0)
1950  {
1951  vec_add2 (s->stream_msgs, sm, 1);
1952 
1953  si = sm - s->stream_msgs;
1954  sm->global_index = gi;
1956 
1957  if (MC_EVENT_LOGGING > 0)
1958  {
1959  ELOG_TYPE_DECLARE (e) = {
1960  .format = "msg-bind: stream %d %s to index %d",
1961  .format_args = "i4T4i4",
1962  };
1963  struct { u32 c[3]; } * ed;
1964  ed = ELOG_DATA (mcm->elog_main, e);
1965  ed->c[0] = s->index;
1966  ed->c[1] = elog_id_for_msg_name (mcm, name);
1967  ed->c[2] = si;
1968  }
1969  }
1970  else
1971  {
1972  sm = vec_elt_at_index (s->stream_msgs, si);
1973  if (gi != sm->global_index && MC_EVENT_LOGGING > 0)
1974  {
1975  ELOG_TYPE_DECLARE (e) = {
1976  .format = "msg-id-ERROR: %s index %d expected %d",
1977  .format_args = "T4i4i4",
1978  };
1979  struct { u32 c[3]; } * ed;
1980  ed = ELOG_DATA (mcm->elog_main, e);
1981  ed->c[0] = elog_id_for_msg_name (mcm, name);
1982  ed->c[1] = si;
1983  ed->c[2] = ~0;
1986  }
1987  }
1988 
1989  vec_free (name);
1990  }
1991 
1992  if (gi != ~0)
1993  {
1994  mc_serialize_msg_t * msg;
1995  msg = vec_elt (mcm->global_msgs, gi);
1996  unserialize (m, msg->unserialize, mcm);
1997  }
1998 
1999  done:
2000  return gi != ~0;
2001 }
2002 
2003 void
2004 mc_unserialize_internal (mc_main_t * mcm, u32 stream_and_buffer_index)
2005 {
2006  vlib_main_t * vm = mcm->vlib_main;
2010  mc_stream_t * stream;
2011  u32 buffer_index;
2012 
2013  sb = pool_elt_at_index (mcm->mc_unserialize_stream_and_buffers, stream_and_buffer_index);
2014  buffer_index = sb->buffer_index;
2015  stream = vec_elt_at_index (mcm->stream_vector, sb->stream_index);
2017 
2018  if (stream->config.save_snapshot)
2019  {
2020  u32 n_bytes = vlib_buffer_index_length_in_chain (vm, buffer_index);
2021  static u8 * contents;
2022  vec_reset_length (contents);
2023  vec_validate (contents, n_bytes - 1);
2024  vlib_buffer_contents (vm, buffer_index, contents);
2025  stream->config.save_snapshot (mcm, /* is_catchup */ 0, contents, n_bytes);
2026  }
2027 
2029 
2030  unserialize_open_vlib_buffer (m, vm, sbm);
2031 
2032  clib_fifo_add1 (sbm->rx.buffer_fifo, buffer_index);
2033 
2034  while (unserialize_vlib_buffer_n_bytes (m) > 0)
2035  mc_unserialize_message (mcm, stream, m);
2036 
2037  /* Frees buffer. */
2039 }
2040 
2041 void
2042 mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index)
2043 {
2044  vlib_main_t * vm = mcm->vlib_main;
2047  sb->stream_index = s->index;
2048  sb->buffer_index = buffer_index;
2051 }
2052 
2053 static uword
2055  vlib_node_runtime_t * node,
2056  vlib_frame_t * f)
2057 {
2058  mc_main_t * mcm = mc_node_get_main (node);
2059  uword event_type, * event_data = 0;
2060  int i;
2061 
2062  while (1)
2063  {
2064  if (event_data)
2065  _vec_len(event_data) = 0;
2066 
2068  event_type = vlib_process_get_events (vm, &event_data);
2069  switch (event_type)
2070  {
2072  for (i = 0; i < vec_len (event_data); i++)
2073  mc_unserialize_internal (mcm, event_data[i]);
2074  break;
2075 
2077  for (i = 0; i < vec_len (event_data); i++)
2078  {
2079  u8 * mp = uword_to_pointer (event_data[i], u8 *);
2080  perform_catchup (mcm, (void *) mp);
2081  vec_free (mp);
2082  }
2083  break;
2084 
2085  default:
2086  break;
2087  }
2088  }
2089 
2090  return 0; /* not likely */
2091 }
2092 
2093 void serialize_mc_main (serialize_main_t * m, va_list * va)
2094 {
2095  mc_main_t * mcm = va_arg (*va, mc_main_t *);
2096  mc_stream_t * s;
2098  mc_serialize_msg_t * msg;
2099 
2100  serialize_integer (m, vec_len (mcm->stream_vector), sizeof (u32));
2101  vec_foreach (s, mcm->stream_vector)
2102  {
2103  /* Stream name. */
2104  serialize_cstring (m, s->config.name);
2105 
2106  /* Serialize global names for all sent messages. */
2107  serialize_integer (m, vec_len (s->stream_msgs), sizeof (u32));
2108  vec_foreach (sm, s->stream_msgs)
2109  {
2110  msg = vec_elt (mcm->global_msgs, sm->global_index);
2111  serialize_cstring (m, msg->name);
2112  }
2113  }
2114 }
2115 
2116 void unserialize_mc_main (serialize_main_t * m, va_list * va)
2117 {
2118  mc_main_t * mcm = va_arg (*va, mc_main_t *);
2119  u32 i, n_streams, n_stream_msgs;
2120  char * name;
2121  mc_stream_t * s;
2123 
2124  unserialize_integer (m, &n_streams, sizeof (u32));
2125  for (i = 0; i < n_streams; i++)
2126  {
2127  unserialize_cstring (m, &name);
2128  if (i != MC_STREAM_INDEX_INTERNAL
2129  && ! mc_stream_by_name (mcm, name))
2130  {
2131  vec_validate (mcm->stream_vector, i);
2132  s = vec_elt_at_index (mcm->stream_vector, i);
2133  mc_stream_init (s);
2134  s->index = s - mcm->stream_vector;
2135  s->config.name = name;
2136  s->state = MC_STREAM_STATE_name_known;
2138  }
2139  else
2140  vec_free (name);
2141 
2142  s = vec_elt_at_index (mcm->stream_vector, i);
2143 
2144  vec_free (s->stream_msgs);
2146 
2147  unserialize_integer (m, &n_stream_msgs, sizeof (u32));
2148  vec_resize (s->stream_msgs, n_stream_msgs);
2149  vec_foreach (sm, s->stream_msgs)
2150  {
2151  uword * p;
2152  u32 si, gi;
2153 
2154  unserialize_cstring (m, &name);
2155  p = hash_get (mcm->global_msg_index_by_name, name);
2156  gi = p ? p[0] : ~0;
2157  si = sm - s->stream_msgs;
2158 
2159  if (MC_EVENT_LOGGING > 0)
2160  {
2161  ELOG_TYPE_DECLARE (e) = {
2162  .format = "catchup-bind: %s to %d global index %d stream %d",
2163  .format_args = "T4i4i4i4",
2164  };
2165  struct { u32 c[4]; } * ed;
2166  ed = ELOG_DATA (mcm->elog_main, e);
2167  ed->c[0] = elog_id_for_msg_name (mcm, name);
2168  ed->c[1] = si;
2169  ed->c[2] = gi;
2170  ed->c[3] = s->index;
2171  }
2172 
2173  vec_free (name);
2174 
2175  sm->global_index = gi;
2176  if (gi != ~0)
2177  {
2179  gi, ~0);
2181  }
2182  }
2183  }
2184 }
2185 
2186 void mc_main_init (mc_main_t * mcm, char * tag)
2187 {
2188  vlib_main_t * vm = vlib_get_main();
2189 
2190  mcm->vlib_main = vm;
2191  mcm->elog_main = &vm->elog_main;
2192 
2193  mcm->relay_master_peer_id = ~0ULL;
2195 
2197  = hash_create_string (/* elts */ 0, /* value size */ sizeof (uword));
2198 
2199  {
2201 
2202  memset (&r, 0, sizeof (r));
2203 
2204  r.type = VLIB_NODE_TYPE_PROCESS;
2205 
2206  /* Point runtime data to main instance. */
2207  r.runtime_data = &mcm;
2208  r.runtime_data_bytes = sizeof (&mcm);
2209 
2210  r.name = (char *) format (0, "mc-mastership-%s", tag);
2211  r.function = mc_mastership_process;
2212  mcm->mastership_process = vlib_register_node (vm, &r);
2213 
2214  r.name = (char *) format (0, "mc-join-ager-%s", tag);
2215  r.function = mc_join_ager_process;
2216  mcm->join_ager_process = vlib_register_node (vm, &r);
2217 
2218  r.name = (char *) format (0, "mc-retry-%s", tag);
2219  r.function = mc_retry_process;
2220  mcm->retry_process = vlib_register_node (vm, &r);
2221 
2222  r.name = (char *) format (0, "mc-catchup-%s", tag);
2223  r.function = mc_catchup_process;
2224  mcm->catchup_process = vlib_register_node (vm, &r);
2225 
2226  r.name = (char *) format (0, "mc-unserialize-%s", tag);
2227  r.function = mc_unserialize_process;
2228  mcm->unserialize_process = vlib_register_node (vm, &r);
2229  }
2230 
2231  if (MC_EVENT_LOGGING > 0)
2232  mhash_init (&mcm->elog_id_by_peer_id, sizeof (uword), sizeof (mc_peer_id_t));
2233 
2234  mhash_init (&mcm->mastership_peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
2235  mc_serialize_init (mcm);
2236 }
2237 
2238 static u8 * format_mc_relay_state (u8 * s, va_list * args)
2239 {
2240  mc_relay_state_t state = va_arg (*args, mc_relay_state_t);
2241  char * t = 0;
2242  switch (state)
2243  {
2245  t = "negotiate";
2246  break;
2247  case MC_RELAY_STATE_MASTER:
2248  t = "master";
2249  break;
2250  case MC_RELAY_STATE_SLAVE:
2251  t = "slave";
2252  break;
2253  default:
2254  return format (s, "unknown 0x%x", state);
2255  }
2256 
2257  return format (s, "%s", t);
2258 }
2259 
2260 static u8 * format_mc_stream_state (u8 * s, va_list * args)
2261 {
2262  mc_stream_state_t state = va_arg (*args, mc_stream_state_t);
2263  char * t = 0;
2264  switch (state)
2265  {
2266 #define _(f) case MC_STREAM_STATE_##f: t = #f; break;
2268 #undef _
2269  default:
2270  return format (s, "unknown 0x%x", state);
2271  }
2272 
2273  return format (s, "%s", t);
2274 }
2275 
2276 static int
2277 mc_peer_comp (void * a1, void * a2)
2278 {
2279  mc_stream_peer_t * p1 = a1;
2280  mc_stream_peer_t * p2 = a2;
2281 
2282  return mc_peer_id_compare (p1->id, p2->id);
2283 }
2284 
2285 u8 * format_mc_main (u8 * s, va_list * args)
2286 {
2287  mc_main_t * mcm = va_arg (*args, mc_main_t *);
2288  mc_stream_t * t;
2289  mc_stream_peer_t * p, * ps;
2290  uword indent = format_get_indent (s);
2291 
2292  s = format (s, "MC state %U, %d streams joined, global sequence 0x%x",
2294  vec_len (mcm->stream_vector),
2295  mcm->relay_global_sequence);
2296 
2297  {
2298  mc_mastership_peer_t * mp;
2299  f64 now = vlib_time_now (mcm->vlib_main);
2300  s = format (s, "\n%UMost recent mastership peers:",
2301  format_white_space, indent + 2);
2302  vec_foreach (mp, mcm->mastership_peers)
2303  {
2304  s = format (s, "\n%U%-30U%.4e",
2305  format_white_space, indent + 4,
2306  mcm->transport.format_peer_id, mp->peer_id,
2308  }
2309  }
2310 
2311  vec_foreach (t, mcm->stream_vector)
2312  {
2313  s = format (s, "\n%Ustream `%s' index %d",
2314  format_white_space, indent + 2,
2315  t->config.name, t->index);
2316 
2317  s = format (s, "\n%Ustate %U",
2318  format_white_space, indent + 4,
2320 
2321  s = format (s, "\n%Uretries: interval %.0f sec, limit %d, pool elts %d, %Ld sent",
2322  format_white_space, indent + 4, t->config.retry_interval,
2323  t->config.retry_limit,
2324  pool_elts (t->retry_pool),
2326 
2327  s = format (s, "\n%U%Ld/%Ld user requests sent/received",
2328  format_white_space, indent + 4,
2330 
2331  s = format (s, "\n%U%d peers, local/global sequence 0x%x/0x%x",
2332  format_white_space, indent + 4,
2333  pool_elts (t->peers),
2334  t->our_local_sequence,
2336 
2337  ps = 0;
2338  pool_foreach (p, t->peers,
2339  ({
2340  if (clib_bitmap_get (t->all_peer_bitmap, p - t->peers))
2341  vec_add1 (ps, p[0]);
2342  }));
2344  s = format (s, "\n%U%=30s%10s%16s%16s",
2345  format_white_space, indent + 6,
2346  "Peer", "Last seq", "Retries", "Future");
2347 
2348  vec_foreach (p, ps)
2349  {
2350  s = format (s, "\n%U%-30U0x%08x%16Ld%16Ld%s",
2351  format_white_space, indent + 6,
2352  mcm->transport.format_peer_id, p->id.as_u64,
2357  ? " (self)" : ""));
2358  }
2359  vec_free (ps);
2360  }
2361 
2362  return s;
2363 }
u32 last_global_sequence_processed
Definition: mc.h:464
u32 buffer_index
Definition: mc.h:327
void mc_main_init(mc_main_t *mcm, char *tag)
Definition: mc.c:2186
void mc_stream_join_process_hold(void)
Definition: mc.c:649
mc_stream_state_t state
Definition: mc.h:410
#define vec_validate(V, I)
Make sure vector is long enough for given index (no header, unspecified alignment) ...
Definition: vec.h:394
#define EVENT_MC_SEND_CATCHUP_DATA
Definition: mc.c:1267
mhash_t mastership_peer_index_by_id
Definition: mc.h:532
Definition: mhash.h:46
static void delete_peer_with_index(mc_main_t *mcm, mc_stream_t *s, uword index, int notify_application)
Definition: mc.c:113
void(* save_snapshot)(struct mc_main_t *mc_main, u32 is_catchup, u8 *snapshot_data, u32 n_snapshot_data_bytes)
Definition: mc.h:383
#define hash_set(h, key, value)
Definition: hash.h:237
void mc_unserialize(mc_main_t *mcm, mc_stream_t *s, u32 buffer_index)
Definition: mc.c:2042
sll srl srl sll sra u16x4 i
Definition: vector_sse2.h:267
mc_relay_state_t
Definition: mc.h:504
#define clib_min(x, y)
Definition: clib.h:295
elog_main_t * elog_main
Definition: mc.h:575
uword * all_peer_bitmap
Definition: mc.h:444
always_inline u32 serialize_vlib_buffer_n_bytes(serialize_main_t *m)
Definition: buffer.h:357
u64 user_requests_received
Definition: mc.h:476
f64 sent_at
Definition: mc.h:342
u32 mc_stream_index
Definition: main.h:136
void mc_rx_buffer_unserialize(mc_main_t *mcm, mc_stream_t *stream, mc_peer_id_t peer_id, u32 buffer_index)
Definition: mc.c:614
static void elog_stream_name(char *buf, int n_buf_bytes, char *v)
Definition: mc.c:542
always_inline uword vlib_process_get_events(vlib_main_t *vm, uword **data_vector)
Definition: node_funcs.h:410
#define hash_unset(h, key)
Definition: hash.h:243
always_inline mc_retry_t * next_retry(mc_stream_t *s, mc_retry_t *r)
Definition: mc.c:304
void serialize_bitmap(serialize_main_t *m, uword *b)
Definition: serialize.c:323
static void(BVT(clib_bihash)*h, BVT(clib_bihash_value)*v)
always_inline int mc_peer_id_compare(mc_peer_id_t a, mc_peer_id_t b)
Definition: mc.h:51
always_inline mc_stream_t * mc_stream_by_index(mc_main_t *m, u32 i)
Definition: mc.h:594
mhash_t peer_index_by_id
Definition: mc.h:447
vlib_one_time_waiting_process_t * procs_waiting_for_join_done
Definition: mc.h:452
void unserialize_close_vlib_buffer(serialize_main_t *m)
Definition: buffer.c:1416
static uword mc_retry_process(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *f)
Definition: mc.c:422
u64 n_msgs_from_future
Definition: mc.h:313
vlib_one_time_waiting_process_t * procs_waiting_for_mc_stream_join
Definition: main.h:138
static void maybe_send_window_open_event(vlib_main_t *vm, mc_stream_t *stream)
Definition: mc.c:182
u32 last_sequence_received
Definition: mc.h:321
u32 mc_stream_send(mc_main_t *mcm, u32 stream_index, u32 buffer_index)
Definition: mc.c:923
always_inline void mc_stream_free(mc_stream_t *s)
Definition: mc.h:480
always_inline void remove_retry_from_pool(mc_stream_t *s, mc_retry_t *r)
Definition: mc.c:312
always_inline mc_retry_t * prev_retry(mc_stream_t *s, mc_retry_t *r)
Definition: mc.c:296
always_inline void serialize_likely_small_unsigned_integer(serialize_main_t *m, u64 x)
Definition: serialize.h:198
#define MSG_ID_DEBUG
Definition: mc.c:24
struct vlib_main_t * vlib_main
Definition: buffer.h:319
void mc_msg_master_assert_handler(mc_main_t *mcm, mc_msg_master_assert_t *mp, u32 buffer_index)
Definition: mc.c:1703
u8 *(* catchup_snapshot)(struct mc_main_t *mc_main, u8 *snapshot_vector, u32 last_global_sequence_included)
Definition: mc.h:373
#define ELOG(em, f, data)
Definition: elog.h:376
#define vec_add1(V, E)
Add 1 element to end of vector (unspecified alignment).
Definition: vec.h:480
always_inline void mc_byte_swap_msg_user_request(mc_msg_user_request_t *r)
Definition: mc.h:161
void mc_msg_user_ack_handler(mc_main_t *mcm, mc_msg_user_ack_t *mp, u32 buffer_index)
Definition: mc.c:1111
mc_serialize_msg_t ** global_msgs
Definition: mc.h:563
void * mc_get_vlib_buffer(vlib_main_t *vm, u32 n_bytes, u32 *bi_return)
Definition: mc.c:98
always_inline uword mhash_set(mhash_t *h, void *key, uword new_value, uword *old_value)
Definition: mhash.h:111
struct _vlib_node_registration vlib_node_registration_t
#define vec_add2(V, P, N)
Add N elements to end of vector V, return pointer to new elements in P.
Definition: vec.h:519
mc_stream_state_t
Definition: mc.h:401
always_inline void vlib_process_signal_event(vlib_main_t *vm, uword node_index, uword type_opaque, uword data)
Definition: node_funcs.h:789
u32 * catchup_fifo
Definition: mc.h:436
#define hash_set_mem(h, key, value)
Definition: hash.h:257
clib_error_t *(* tx_buffer)(void *opaque, mc_transport_type_t type, u32 buffer_index)
Definition: mc.h:287
clib_error_t * mc_serialize_internal(mc_main_t *mc, u32 stream_index, u32 multiple_messages_per_vlib_buffer, mc_serialize_msg_t *msg,...)
Definition: mc.c:1875
#define clib_error_report(e)
Definition: error.h:126
void unserialize_mc_main(serialize_main_t *m, va_list *va)
Definition: mc.c:2116
add_epi add_epi sub_epi sub_epi adds_epu subs_epu i16x8 y
Definition: vector_sse2.h:231
always_inline uword vlib_process_suspend(vlib_main_t *vm, f64 dt)
Definition: node_funcs.h:326
u64 relay_master_peer_id
Definition: mc.h:527
void mc_msg_join_reply_handler(mc_main_t *mcm, mc_msg_join_reply_t *mp, u32 buffer_index)
Definition: mc.c:878
#define pool_get(P, E)
Definition: pool.h:186
u32 mc_stream_join(mc_main_t *mcm, mc_stream_config_t *config)
Definition: mc.c:793
mc_stream_stats_t stats
Definition: mc.h:438
u32 index
Definition: mc.h:413
always_inline void * vlib_buffer_get_current(vlib_buffer_t *b)
Get pointer to current data to process.
Definition: buffer.h:184
always_inline vlib_main_t * vlib_get_main(void)
Definition: global_funcs.h:23
void(* peer_died)(struct mc_main_t *mc_main, struct mc_stream_t *stream, mc_peer_id_t peer_id)
Definition: mc.h:389
mc_main_t * mc_main
Definition: main.h:133
struct vlib_serialize_buffer_main_t::@27::@30 rx
#define vec_reset_length(v)
Reset vector length to zero NULL-pointer tolerant.
f64 join_timeout
Definition: mc.h:450
u32 relay_global_sequence
Definition: mc.h:538
u32 vlib_register_node(vlib_main_t *vm, vlib_node_registration_t *r)
Definition: node.c:449
static void elog_tx_msg(mc_main_t *m, u32 stream_id, u32 local_sequence, u32 retry_count)
Definition: mc.c:68
mc_retry_t * retry_pool
Definition: mc.h:418
#define mc_serialize_stream(mc, si, msg, args...)
Definition: mc.h:648
uword(* catchup_request_fun)(void *opaque, u32 stream_index, mc_peer_id_t catchup_peer_id)
Definition: mc.h:292
u32 retry_tail_index
Definition: mc.h:421
u32 n_retries
Definition: mc.h:333
void mc_msg_catchup_reply_handler(mc_main_t *mcm, mc_msg_catchup_reply_t *mp, u32 catchup_opaque)
Definition: mc.c:1425
static u8 * format_mc_relay_state(u8 *s, va_list *args)
Definition: mc.c:2238
#define pool_foreach(VAR, POOL, BODY)
Definition: pool.h:328
mc_stream_config_t config
Definition: mc.h:408
always_inline i32 mc_seq_cmp(u32 x, u32 y)
Definition: mc.c:95
void unserialize_open_data(serialize_main_t *m, u8 *data, uword n_data_bytes)
Definition: serialize.c:835
#define always_inline
Definition: clib.h:84
u8 * format_white_space(u8 *s, va_list *va)
Definition: std-formats.c:107
int i32
Definition: types.h:81
always_inline void unserialize_integer(serialize_main_t *m, void *x, u32 n_bytes)
Definition: serialize.h:181
static void perform_catchup(mc_main_t *mcm, mc_msg_catchup_reply_t *mp)
Definition: mc.c:1433
#define vec_elt_at_index(v, i)
Get vector value at index i checking that i is in bounds.
void * opaque
Definition: mc.h:297
void(* catchup_send_fun)(void *opaque, uword catchup_opaque, u8 *data_vector)
Definition: mc.h:294
always_inline uword pool_elts(void *v)
Definition: pool.h:97
serialize_main_t serialize_mains[VLIB_N_RX_TX]
Definition: mc.h:569
#define MC_STREAM_INDEX_INTERNAL
Definition: mc.h:416
static int mc_peer_comp(void *a1, void *a2)
Definition: mc.c:2277
always_inline uword clib_fifo_elts(void *v)
Definition: fifo.h:63
#define clib_warning(format, args...)
Definition: error.h:59
unsigned long u64
Definition: types.h:89
always_inline void vlib_signal_one_time_waiting_process(vlib_main_t *vm, vlib_one_time_waiting_process_t *p)
Definition: node_funcs.h:822
mc_peer_id_t our_catchup_peer_id
Definition: mc.h:300
#define vec_resize(V, N)
Resize a vector (no header, unspecified alignment) Add N elements to end of given vector V...
Definition: vec.h:199
u32 retry_head_index
Definition: mc.h:421
always_inline mc_stream_t * mc_stream_by_name(mc_main_t *m, char *name)
Definition: mc.h:587
always_inline void mc_byte_swap_msg_catchup_request(mc_msg_catchup_request_t *r)
Definition: mc.h:202
uword * stream_index_by_name
Definition: mc.h:544
uword * procs_waiting_for_stream_name_by_name
Definition: mc.h:546
static void check_retry(mc_main_t *mcm, mc_stream_t *s)
Definition: mc.c:329
static uword pointer_to_uword(const void *p)
Definition: types.h:131
u32 serialize_close_vlib_buffer(serialize_main_t *m)
Definition: buffer.c:1393
u32 join_ager_process
Definition: mc.h:557
#define hash_create_string(elts, value_bytes)
Definition: hash.h:609
mc_mastership_peer_t * mastership_peers
Definition: mc.h:529
void(* catchup)(struct mc_main_t *mc_main, u8 *snapshot_data, u32 n_snapshot_data_bytes)
Definition: mc.h:378
always_inline void vlib_buffer_free_one(vlib_main_t *vm, u32 buffer_index)
Free one buffer Shorthand to free a single buffer chain.
Definition: buffer_funcs.h:319
#define hash_get(h, key)
Definition: hash.h:231
u32 next_index
Definition: mc.h:336
#define clib_bitmap_foreach(i, ai, body)
Definition: bitmap.h:308
#define pool_elt_at_index(p, i)
Definition: pool.h:346
#define hash_unset_mem(h, key)
Definition: hash.h:263
always_inline void mc_byte_swap_msg_user_ack(mc_msg_user_ack_t *r)
Definition: mc.h:183
#define clib_fifo_sub1(f, e)
Definition: fifo.h:219
u64 as_u64
Definition: mc.h:40
u16 current_length
Nbytes between current data and the end of this buffer.
Definition: buffer.h:81
vlib_one_time_waiting_process_t * procs_waiting_for_open_window
Definition: mc.h:454
uword * unacked_by_peer_bitmap
Definition: mc.h:339
u32 prev_index
Definition: mc.h:336
always_inline uword * vlib_process_wait_for_event(vlib_main_t *vm)
Definition: node_funcs.h:484
always_inline uword vlib_buffer_contents(vlib_main_t *vm, u32 buffer_index, u8 *contents)
Copy buffer contents to memory.
Definition: buffer_funcs.h:143
#define pool_put(P, E)
Definition: pool.h:200
always_inline f64 vlib_process_wait_for_event_or_clock(vlib_main_t *vm, f64 dt)
Definition: node_funcs.h:551
f64 time_last_master_assert_received
Definition: mc.h:513
u32 mastership_process
Definition: mc.h:556
#define ELOG_DATA(em, f)
Definition: elog.h:386
mc_catchup_process_arg_t * catchup_process_args
Definition: mc.h:552
void unserialize_mc_stream(serialize_main_t *m, va_list *va)
Definition: mc.c:1317
u32 catchup_process
Definition: mc.h:559
always_inline void * unserialize_get(serialize_main_t *m, uword n_bytes)
Definition: serialize.h:157
always_inline void mc_byte_swap_msg_join_or_leave_request(mc_msg_join_or_leave_request_t *r)
Definition: mc.h:106
static uword mc_mastership_process(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *f)
Definition: mc.c:1669
mc_peer_id_t peer_id
Definition: mc.h:511
always_inline void mc_stream_init(mc_stream_t *s)
Definition: mc.h:492
#define EVENT_MC_UNSERIALIZE_BUFFER
Definition: mc.c:1422
#define clib_fifo_foreach(v, f, body)
Definition: fifo.h:274
u32 retry_process
Definition: mc.h:558
always_inline void mc_byte_swap_msg_join_reply(mc_msg_join_reply_t *r)
Definition: mc.h:129
void unserialize_cstring(serialize_main_t *m, char **s)
Definition: serialize.c:148
#define uword_to_pointer(u, type)
Definition: types.h:134
clib_error_t *(* tx_ack)(void *opaque, mc_peer_id_t peer_id, u32 buffer_index)
Definition: mc.h:289
mc_peer_id_t our_ack_peer_id
Definition: mc.h:299
static format_function_t format_mc_stream_state
Definition: mc.c:26
always_inline uword clib_bitmap_get(uword *ai, uword i)
Definition: bitmap.h:158
static void mc_serialize_init(mc_main_t *mcm)
Definition: mc.c:1777
MC_SERIALIZE_MSG(mc_register_stream_name_msg, static)
static uword mc_catchup_process(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *f)
Definition: mc.c:1270
serialize_stream_t stream
Definition: serialize.h:138
clib_error_t * serialize(serialize_main_t *m,...)
Definition: serialize.c:627
void mhash_init(mhash_t *h, uword n_value_bytes, uword n_key_bytes)
Definition: mhash.c:169
vlib_serialize_buffer_main_t serialize_buffer_mains[VLIB_N_RX_TX]
Definition: mc.h:571
u32 current_buffer_index
Definition: serialize.h:61
u32 retry_limit
Definition: mc.h:364
uword mc_unserialize_message(mc_main_t *mcm, mc_stream_t *s, serialize_main_t *m)
Definition: mc.c:1901
static uword * delete_retry_fifo_elt(mc_main_t *mcm, mc_stream_t *stream, mc_retry_t *r, uword *dead_peer_bitmap)
Definition: mc.c:260
void serialize_open_vector(serialize_main_t *m, u8 *vector)
Definition: serialize.c:849
always_inline mc_main_t * mc_node_get_main(vlib_node_runtime_t *node)
Definition: mc.c:415
#define vec_free(V)
Free vector&#39;s memory (no header).
Definition: vec.h:298
Definition: mc.h:326
void serialize_mc_main(serialize_main_t *m, va_list *va)
Definition: mc.c:2093
u32 window_size
Definition: mc.h:358
#define clib_memcpy(a, b, c)
Definition: string.h:63
void mc_unserialize_internal(mc_main_t *mcm, u32 stream_and_buffer_index)
Definition: mc.c:2004
f64 retry_interval
Definition: mc.h:361
mhash_t elog_id_by_peer_id
Definition: mc.h:578
u8 * format_mc_main(u8 *s, va_list *args)
Definition: mc.c:2285
always_inline uword vlib_process_wait_for_event_with_type(vlib_main_t *vm, uword **data_vector, uword with_type_opaque)
Definition: node_funcs.h:525
elog_main_t elog_main
Definition: main.h:141
static uword mc_unserialize_process(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *f)
Definition: mc.c:2054
always_inline void mc_byte_swap_msg_master_assert(mc_msg_master_assert_t *r)
Definition: mc.h:68
always_inline void serialize_integer(serialize_main_t *m, u64 x, u32 n_bytes)
Definition: serialize.h:165
#define ELOG_TYPE(f, fmt)
Definition: elog.h:364
mc_retry_t * retired_fifo
Definition: mc.h:429
always_inline void mc_byte_swap_msg_catchup_reply(mc_msg_catchup_reply_t *r)
Definition: mc.h:228
#define ELOG_TYPE_DECLARE(f)
Definition: elog.h:344
void mc_enable_disable_mastership(mc_main_t *mcm, int we_can_be_master)
Definition: mc.c:1692
static void this_node_slave(mc_main_t *mcm)
Definition: mc.c:1627
void mc_msg_user_request_handler(mc_main_t *mcm, mc_msg_user_request_t *mp, u32 buffer_index)
Definition: mc.c:993
u32 max_packet_size
Definition: mc.h:304
u8 as_u8[8]
Definition: mc.h:39
#define VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX
Definition: buffer.h:296
always_inline uword vlib_buffer_index_length_in_chain(vlib_main_t *vm, u32 bi)
Get length in bytes of the buffer index buffer chain.
Definition: buffer_funcs.h:129
mc_stream_stats_t stats_last_clear
Definition: mc.h:438
Definition: mc.h:521
static uword mc_join_ager_process(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *f)
Definition: mc.c:464
#define pool_put_index(p, i)
Definition: pool.h:214
#define ASSERT(truth)
void mc_stream_leave(mc_main_t *mcm, u32 stream_index)
Definition: mc.c:796
unsigned int u32
Definition: types.h:88
char * name
Definition: mc.h:355
void mc_msg_join_or_leave_request_handler(mc_main_t *mcm, mc_msg_join_or_leave_request_t *req, u32 buffer_index)
Definition: mc.c:819
static void serialize_mc_stream(serialize_main_t *m, va_list *va)
Definition: mc.c:1302
mc_serialize_stream_msg_t * stream_msgs
Definition: mc.h:467
static u32 elog_id_for_msg_name(mc_main_t *m, char *msg_name)
Definition: mc.c:45
mc_stream_peer_stats_t stats
Definition: mc.h:323
u8 * format(u8 *s, char *fmt,...)
Definition: format.c:405
vhost_vring_state_t state
Definition: vhost-user.h:77
mc_serialize_msg_t * mc_msg_registrations
Definition: main.h:169
clib_error_t * unserialize(serialize_main_t *m,...)
Definition: serialize.c:639
#define clib_bitmap_free(v)
Definition: bitmap.h:76
u32 vlib_buffer_alloc(vlib_main_t *vm, u32 *buffers, u32 n_buffers)
Allocate buffers into supplied array.
Definition: buffer.c:770
u32 we_can_be_relay_master
Definition: mc.h:525
uword * hash
Definition: mhash.h:68
u8 *( format_function_t)(u8 *s, va_list *args)
Definition: format.h:48
#define EVENT_MC_UNSERIALIZE_CATCHUP
Definition: mc.c:1423
vlib_one_time_waiting_process_t ** procs_waiting_for_stream_name_pool
Definition: mc.h:548
#define clib_max(x, y)
Definition: clib.h:288
always_inline uword format_get_indent(u8 *s)
Definition: format.h:72
u64 uword
Definition: types.h:112
void serialize_cstring(serialize_main_t *m, char *s)
Definition: serialize.c:135
always_inline uword clib_bitmap_is_zero(uword *ai)
Definition: bitmap.h:50
#define vec_elt(v, i)
Get vector value at index i.
uword * global_msg_index_by_name
Definition: mc.h:566
static void mc_internal_catchup(mc_main_t *mcm, u8 *data, u32 n_data_bytes)
Definition: mc.c:636
void mc_wait_for_stream_ready(mc_main_t *m, char *stream_name)
Definition: mc.c:901
Definition: defs.h:46
#define clib_fifo_add1(f, e)
Definition: fifo.h:187
int joins_in_progress
Definition: mc.h:550
#define vec_copy(DST, SRC)
Copy a vector, memcpy wrapper.
Definition: vec.h:349
uword * unserialize_bitmap(serialize_main_t *m)
Definition: serialize.c:340
always_inline void vlib_current_process_wait_for_one_time_event_vector(vlib_main_t *vm, vlib_one_time_waiting_process_t **wps)
Definition: node_funcs.h:850
u32 unserialize_process
Definition: mc.h:560
u32 elog_string(elog_main_t *em, char *fmt,...)
Definition: elog.c:496
mc_relay_state_t relay_state
Definition: mc.h:522
struct _mc_serialize_msg mc_serialize_msg_t
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
double f64
Definition: types.h:140
always_inline void vlib_buffer_advance(vlib_buffer_t *b, word l)
Advance current data pointer by the supplied (signed!) amount.
Definition: buffer.h:197
unsigned char u8
Definition: types.h:56
mc_stream_peer_stats_t stats_last_clear
Definition: mc.h:323
uword * retry_index_by_local_sequence
Definition: mc.h:432
mc_stream_peer_t * peers
Definition: mc.h:441
u64 user_requests_sent
Definition: mc.h:475
uword * elog_id_by_msg_name
Definition: mc.h:580
mc_peer_id_t id
Definition: mc.h:318
u64 n_msgs_from_past
Definition: mc.h:312
clib_error_t * va_serialize(serialize_main_t *sm, va_list *va)
Definition: serialize.c:606
u64 n_retries
Definition: mc.h:347
#define vec_sort_with_function(vec, f)
Sort a vector using the supplied element comparison function.
Definition: vec.h:898
mc_stream_and_buffer_t * mc_unserialize_stream_and_buffers
Definition: mc.h:583
static mc_stream_peer_t * get_or_create_peer_with_id(mc_main_t *mcm, mc_stream_t *s, mc_peer_id_t id, int *created)
Definition: mc.c:139
static u32 elog_id_for_peer_id(mc_main_t *m, u64 peer_id)
Definition: mc.c:28
format_function_t * format_peer_id
Definition: mc.h:306
static u8 * mc_internal_catchup_snapshot(mc_main_t *mcm, u8 *data_vector, u32 last_global_sequence_processed)
Definition: mc.c:621
#define hash_get_mem(h, key)
Definition: hash.h:251
u32 our_local_sequence
Definition: mc.h:457
#define clib_fifo_add2(f, p)
Definition: fifo.h:195
static void this_node_maybe_master(mc_main_t *mcm)
Definition: mc.c:1540
always_inline uword vlib_in_process_context(vlib_main_t *vm)
Definition: node_funcs.h:313
static u32 mc_stream_join_helper(mc_main_t *mcm, mc_stream_config_t *config, u32 is_internal)
Definition: mc.c:652
#define MC_EVENT_LOGGING
Definition: mc.h:27
void mc_msg_catchup_request_handler(mc_main_t *mcm, mc_msg_catchup_request_t *req, u32 catchup_opaque)
Definition: mc.c:1341
u8 data[0]
Packet data.
Definition: buffer.h:150
always_inline u32 unserialize_vlib_buffer_n_bytes(serialize_main_t *m)
Definition: buffer_funcs.h:556
#define vec_foreach(var, vec)
Vector iterator.
always_inline f64 vlib_time_now(vlib_main_t *vm)
Definition: main.h:182
void * serialize_close_vector(serialize_main_t *m)
Definition: serialize.c:858
static void unserialize_mc_register_stream_name(serialize_main_t *m, va_list *va)
Definition: mc.c:548
always_inline u64 unserialize_likely_small_unsigned_integer(serialize_main_t *m)
Definition: serialize.h:234
u32 local_sequence
Definition: mc.h:330
clib_error_t * mc_serialize_va(mc_main_t *mc, u32 stream_index, u32 multiple_messages_per_vlib_buffer, mc_serialize_msg_t *msg, va_list *va)
Definition: mc.c:1799
void(* rx_buffer)(struct mc_main_t *mc_main, struct mc_stream_t *stream, mc_peer_id_t peer_id, u32 buffer_index)
Definition: mc.h:367
#define vec_validate_init_empty(V, I, INIT)
Make sure vector is long enough for given index and initialize empty space (no header, unspecified alignment)
Definition: vec.h:443
static void mc_retry_free(mc_main_t *mcm, mc_stream_t *s, mc_retry_t *r)
Definition: mc.c:196
u32 * stream_msg_index_by_global_index
Definition: mc.h:470
struct vlib_main_t * vlib_main
Definition: mc.h:574
static void send_join_or_leave_request(mc_main_t *mcm, u32 stream_index, u32 is_join)
Definition: mc.c:441
void unserialize_open_vlib_buffer(serialize_main_t *m, vlib_main_t *vm, vlib_serialize_buffer_main_t *sm)
Definition: buffer.c:1390
uword runtime_data[(128-1 *sizeof(vlib_node_function_t *)-1 *sizeof(vlib_error_t *)-11 *sizeof(u32)-5 *sizeof(u16))/sizeof(uword)]
Definition: node.h:432
always_inline void vlib_current_process_wait_for_one_time_event(vlib_main_t *vm, vlib_one_time_waiting_process_t *p)
Definition: node_funcs.h:839
always_inline vlib_buffer_t * vlib_get_buffer(vlib_main_t *vm, u32 buffer_index)
Translate buffer index into buffer pointer.
Definition: buffer_funcs.h:69
void serialize_open_vlib_buffer(serialize_main_t *m, vlib_main_t *vm, vlib_serialize_buffer_main_t *sm)
Definition: buffer.c:1387
mc_transport_t transport
Definition: mc.h:535
always_inline uword * mhash_get(mhash_t *h, void *key)
Definition: mhash.h:104
static void mc_resend_retired(mc_main_t *mcm, mc_stream_t *s, u32 local_sequence)
Definition: mc.c:217
mc_stream_t * stream_vector
Definition: mc.h:541
Definition: defs.h:45
struct vlib_serialize_buffer_main_t::@27::@29 tx
static void serialize_mc_register_stream_name(serialize_main_t *m, va_list *va)
Definition: mc.c:536