FD.io VPP  v18.07.1-19-g511ce25
Vector Packet Processing
mc.h
Go to the documentation of this file.
1 /*
2  * mc.h: vlib reliable sequenced multicast distributed applications
3  *
4  * Copyright (c) 2010 Cisco and/or its affiliates.
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at:
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef included_vlib_mc_h
19 #define included_vlib_mc_h
20 
21 #include <vppinfra/elog.h>
22 #include <vppinfra/fifo.h>
23 #include <vppinfra/mhash.h>
24 #include <vlib/node.h>
25 
26 #ifndef MC_EVENT_LOGGING
27 #define MC_EVENT_LOGGING 1
28 #endif
29 
32 {
34 }
35 
36 /*
37  * Used to uniquely identify hosts.
38  * For IP4 this would be ip4_address plus tcp/udp port.
39  */
40 typedef union
41 {
42  u8 as_u8[8];
44 } mc_peer_id_t;
45 
48 {
49  /* Peer id is already in network byte order. */
50  return i;
51 }
52 
53 always_inline int
55 {
56  return memcmp (a.as_u8, b.as_u8, sizeof (a.as_u8));
57 }
58 
59 /* Assert mastership. Lowest peer_id amount all peers wins mastership.
60  Only sent/received over mastership channel (MC_TRANSPORT_MASTERSHIP).
61  So, we don't need a message opcode. */
62 typedef CLIB_PACKED (struct
63  {
64  /* Peer id asserting mastership. */
65  mc_peer_id_t peer_id;
66  /* Global sequence number asserted. */
67  u32 global_sequence;}) mc_msg_master_assert_t;
68 
69 always_inline void
70 mc_byte_swap_msg_master_assert (mc_msg_master_assert_t * r)
71 {
72  if (mc_need_byte_swap ())
73  {
74  r->peer_id = mc_byte_swap_peer_id (r->peer_id);
75  r->global_sequence = clib_byte_swap_u32 (r->global_sequence);
76  }
77 }
78 
79 #define foreach_mc_msg_type \
80  _ (master_assert) \
81  _ (join_or_leave_request) \
82  _ (join_reply) \
83  _ (user_request) \
84  _ (user_ack) \
85  _ (catchup_request) \
86  _ (catchup_reply)
87 
88 typedef enum
89 {
90 #define _(f) MC_MSG_TYPE_##f,
92 #undef _
94 
95 /* Request to join a given stream. Multicast over MC_TRANSPORT_JOIN. */
96 typedef CLIB_PACKED (struct
97  {
98 mc_peer_id_t peer_id; mc_relay_msg_type_t type:32;
99  /* MC_MSG_TYPE_join_or_leave_request */
100  /* Stream to join or leave. */
101  u32 stream_index;
102  /* join = 1, leave = 0 */
103  u8 is_join;}) mc_msg_join_or_leave_request_t;
104 
105 always_inline void
106 mc_byte_swap_msg_join_or_leave_request (mc_msg_join_or_leave_request_t * r)
107 {
108  if (mc_need_byte_swap ())
109  {
110  r->peer_id = mc_byte_swap_peer_id (r->peer_id);
111  r->type = clib_byte_swap_u32 (r->type);
112  r->stream_index = clib_byte_swap_u32 (r->stream_index);
113  }
114 }
115 
116 /* Join reply. Multicast over MC_TRANSPORT_JOIN. */
117 typedef CLIB_PACKED (struct
118  {
119 mc_peer_id_t peer_id; mc_relay_msg_type_t type:32;
120  /* MC_MSG_TYPE_join_reply */
121  u32 stream_index;
122  /* Peer ID to contact to catchup with this stream. */
123  mc_peer_id_t catchup_peer_id;}) mc_msg_join_reply_t;
124 
125 always_inline void
126 mc_byte_swap_msg_join_reply (mc_msg_join_reply_t * r)
127 {
128  if (mc_need_byte_swap ())
129  {
130  r->peer_id = mc_byte_swap_peer_id (r->peer_id);
131  r->type = clib_byte_swap_u32 (r->type);
132  r->stream_index = clib_byte_swap_u32 (r->stream_index);
133  r->catchup_peer_id = mc_byte_swap_peer_id (r->catchup_peer_id);
134  }
135 }
136 
137 /* Generic (application) request. Multicast over MC_TRANSPORT_USER_REQUEST_TO_RELAY and then
138  relayed by relay master after filling in global sequence number. */
139 typedef CLIB_PACKED (struct
140  {
141  mc_peer_id_t peer_id; u32 stream_index;
142  /* Global sequence number as filled in by relay master. */
143  u32 global_sequence;
144  /* Local sequence number as filled in by peer sending message. */
145  u32 local_sequence;
146  /* Size of request data. */
147  u32 n_data_bytes;
148  /* Opaque request data. */
149  u8 data[0];}) mc_msg_user_request_t;
150 
151 always_inline void
152 mc_byte_swap_msg_user_request (mc_msg_user_request_t * r)
153 {
154  if (mc_need_byte_swap ())
155  {
156  r->peer_id = mc_byte_swap_peer_id (r->peer_id);
157  r->stream_index = clib_byte_swap_u32 (r->stream_index);
158  r->global_sequence = clib_byte_swap_u32 (r->global_sequence);
159  r->local_sequence = clib_byte_swap_u32 (r->local_sequence);
160  r->n_data_bytes = clib_byte_swap_u32 (r->n_data_bytes);
161  }
162 }
163 
164 /* Sent unicast over ACK channel. */
165 typedef CLIB_PACKED (struct
166  {
167  mc_peer_id_t peer_id;
168  u32 global_sequence; u32 stream_index;
169  u32 local_sequence;
170  i32 seq_cmp_result;}) mc_msg_user_ack_t;
171 
172 always_inline void
173 mc_byte_swap_msg_user_ack (mc_msg_user_ack_t * r)
174 {
175  if (mc_need_byte_swap ())
176  {
177  r->peer_id = mc_byte_swap_peer_id (r->peer_id);
178  r->stream_index = clib_byte_swap_u32 (r->stream_index);
179  r->global_sequence = clib_byte_swap_u32 (r->global_sequence);
180  r->local_sequence = clib_byte_swap_u32 (r->local_sequence);
181  r->seq_cmp_result = clib_byte_swap_i32 (r->seq_cmp_result);
182  }
183 }
184 
185 /* Sent/received unicast over catchup channel (e.g. using TCP). */
186 typedef CLIB_PACKED (struct
187  {
188  mc_peer_id_t peer_id;
189  u32 stream_index;}) mc_msg_catchup_request_t;
190 
191 always_inline void
192 mc_byte_swap_msg_catchup_request (mc_msg_catchup_request_t * r)
193 {
194  if (mc_need_byte_swap ())
195  {
196  r->peer_id = mc_byte_swap_peer_id (r->peer_id);
197  r->stream_index = clib_byte_swap_u32 (r->stream_index);
198  }
199 }
200 
201 /* Sent/received unicast over catchup channel. */
202 typedef CLIB_PACKED (struct
203  {
204  mc_peer_id_t peer_id; u32 stream_index;
205  /* Last global sequence number included in catchup data. */
206  u32 last_global_sequence_included;
207  /* Size of catchup data. */
208  u32 n_data_bytes;
209  /* Catchup data. */
210  u8 data[0];}) mc_msg_catchup_reply_t;
211 
212 always_inline void
213 mc_byte_swap_msg_catchup_reply (mc_msg_catchup_reply_t * r)
214 {
215  if (mc_need_byte_swap ())
216  {
217  r->peer_id = mc_byte_swap_peer_id (r->peer_id);
218  r->stream_index = clib_byte_swap_u32 (r->stream_index);
219  r->last_global_sequence_included =
220  clib_byte_swap_u32 (r->last_global_sequence_included);
221  r->n_data_bytes = clib_byte_swap_u32 (r->n_data_bytes);
222  }
223 }
224 
225 typedef struct _mc_serialize_msg
226 {
227  /* Name for this type. */
228  char *name;
229 
230  /* Functions to serialize/unserialize data. */
233 
234  /* Maximum message size in bytes when serialized.
235  If zero then this will be set to the largest sent message. */
236  u32 max_n_bytes_serialized;
237 
238  /* Opaque to use for first argument to serialize/unserialize function. */
239  u32 opaque;
240 
241  /* Index in global message vector. */
242  u32 global_index;
243 
244  /* Registration list */
245  struct _mc_serialize_msg *next_registration;
247 
248 typedef struct
249 {
250  /* Index into global message vector. */
253 
254 #define MC_SERIALIZE_MSG(x,...) \
255  __VA_ARGS__ mc_serialize_msg_t x; \
256 static void __mc_serialize_msg_registration_##x (void) \
257  __attribute__((__constructor__)) ; \
258 static void __mc_serialize_msg_registration_##x (void) \
259 { \
260  vlib_main_t * vm = vlib_get_main(); \
261  x.next_registration = vm->mc_msg_registrations; \
262  vm->mc_msg_registrations = &x; \
263 } \
264 static void __mc_serialize_msg_unregistration_##x (void) \
265  __attribute__((__destructor__)) ; \
266 static void __mc_serialize_msg_unregistration_##x (void) \
267 { \
268  vlib_main_t * vm = vlib_get_main(); \
269  VLIB_REMOVE_FROM_LINKED_LIST (vm->mc_msg_registrations, &x, \
270  next_registration); \
271 } \
272 __VA_ARGS__ mc_serialize_msg_t x
273 
274 typedef enum
275 {
282 
283 typedef struct
284 {
285  clib_error_t *(*tx_buffer) (void *opaque, mc_transport_type_t type,
286  u32 buffer_index);
287 
288  clib_error_t *(*tx_ack) (void *opaque, mc_peer_id_t peer_id,
289  u32 buffer_index);
290 
291  /* Returns catchup opaque. */
292  uword (*catchup_request_fun) (void *opaque, u32 stream_index,
293  mc_peer_id_t catchup_peer_id);
294 
295  void (*catchup_send_fun) (void *opaque, uword catchup_opaque,
296  u8 * data_vector);
297 
298  /* Opaque passed to callbacks. */
299  void *opaque;
300 
303 
304  /* Max packet size (MTU) for this transport.
305  For IP this is interface MTU less IP + UDP header size. */
307 
310 
311 typedef struct
312 {
313  /* Count of messages received from this peer from the past/future
314  (with seq_cmp != 0). */
318 
319 typedef struct
320 {
321  /* ID of this peer. */
323 
324  /* The last sequence we received from this peer. */
326 
329 
330 typedef struct
331 {
333 
334  /* Cached copy of local sequence number from buffer. */
336 
337  /* Number of times this buffer has been sent (retried). */
339 
340  /* Previous/next retries in doubly-linked list. */
341  u32 prev_index, next_index;
342 
343  /* Bitmap of all peers which have acked this msg */
345 
346  /* Message send or resend time */
348 } mc_retry_t;
349 
350 typedef struct
351 {
352  /* Number of retries sent for this stream. */
355 
356 struct mc_main_t;
357 struct mc_stream_t;
358 
359 typedef struct
360 {
361  /* Stream name. */
362  char *name;
363 
364  /* Number of outstanding messages. */
366 
367  /* Retry interval, in seconds */
369 
370  /* Retry limit */
372 
373  /* User rx buffer callback */
374  void (*rx_buffer) (struct mc_main_t * mc_main,
375  struct mc_stream_t * stream,
376  mc_peer_id_t peer_id, u32 buffer_index);
377 
378  /* User callback to create a snapshot */
379  u8 *(*catchup_snapshot) (struct mc_main_t * mc_main,
380  u8 * snapshot_vector,
381  u32 last_global_sequence_included);
382 
383  /* User callback to replay a snapshot */
384  void (*catchup) (struct mc_main_t * mc_main,
385  u8 * snapshot_data, u32 n_snapshot_data_bytes);
386 
387  /* Callback to save a snapshot for offline replay */
388  void (*save_snapshot) (struct mc_main_t * mc_main,
389  u32 is_catchup,
390  u8 * snapshot_data, u32 n_snapshot_data_bytes);
391 
392  /* Called when a peer dies */
393  void (*peer_died) (struct mc_main_t * mc_main,
394  struct mc_stream_t * stream, mc_peer_id_t peer_id);
396 
397 #define foreach_mc_stream_state \
398  _ (invalid) \
399  _ (name_known) \
400  _ (join_in_progress) \
401  _ (catchup) \
402  _ (ready)
403 
404 typedef enum
405 {
406 #define _(f) MC_STREAM_STATE_##f,
408 #undef _
410 
411 typedef struct mc_stream_t
412 {
414 
415  mc_stream_state_t state;
416 
417  /* Index in stream pool. */
419 
420  /* Stream index 0 is always for MC internal use. */
421 #define MC_STREAM_INDEX_INTERNAL 0
422 
424 
425  /* Head and tail index of retry pool. */
426  u32 retry_head_index, retry_tail_index;
427 
428  /*
429  * Country club for recently retired messages
430  * If the set of peers is expanding and a new peer
431  * misses a message, we can easily retire the FIFO
432  * element before we even know about the new peer
433  */
435 
436  /* Hash mapping local sequence to retry pool index. */
438 
439  /* catch-up fifo of VLIB buffer indices.
440  start recording when catching up. */
442 
444 
445  /* Peer pool. */
447 
448  /* Bitmap with ones for all peers in peer pool. */
450 
451  /* Map of 64 bit id to index in stream pool. */
453 
454  /* Timeout, in case we're alone in the world */
456 
458 
460 
461  /* Next sequence number to use */
463 
464  /*
465  * Last global sequence we processed.
466  * When supplying catchup data, we need to tell
467  * the client precisely where to start replaying
468  */
470 
471  /* Vector of unique messages we've sent on this stream. */
473 
474  /* Vector global message index into per stream message index. */
476 
477  /* Hashed by message name. */
479 
482 } mc_stream_t;
483 
484 always_inline void
486 {
487  pool_free (s->retry_pool);
490  pool_free (s->peers);
494 }
495 
496 always_inline void
498 {
499  memset (s, 0, sizeof (s[0]));
500  s->retry_head_index = s->retry_tail_index = ~0;
501 }
502 
503 typedef struct
504 {
509 
510 typedef enum
511 {
516 
517 typedef struct
518 {
520 
523 
524 typedef struct
525 {
529 
530 typedef struct mc_main_t
531 {
532  mc_relay_state_t relay_state;
533 
534  /* Mastership */
536 
538 
540 
541  /* Map of 64 bit id to index in stream pool. */
543 
544  /* The transport we're using. */
546 
547  /* Last-used global sequence number. */
549 
550  /* Vector of streams. */
552 
553  /* Hash table mapping stream name to pool index. */
555 
557 
559 
561 
563 
564  /* Node indices for mastership, join ager,
565  retry and catchup processes. */
571 
572  /* Global vector of messages. */
574 
575  /* Hash table mapping message name to index. */
577 
578  /* Shared serialize/unserialize main. */
579  serialize_main_t serialize_mains[VLIB_N_RX_TX];
580 
582 
583  /* Convenience variables */
586 
587  /* Maps 64 bit peer id to elog string table offset for this formatted peer id. */
589 
591 
592  /* For mc_unserialize. */
594 } mc_main_t;
595 
598 {
599  uword *p = hash_get (m->stream_index_by_name, name);
600  return p ? vec_elt_at_index (m->stream_vector, p[0]) : 0;
601 }
602 
605 {
606  return i < vec_len (m->stream_vector) ? m->stream_vector + i : 0;
607 }
608 
609 always_inline void
611 {
612  mc_stream_t *s;
613  mc_stream_peer_t *p;
614  vec_foreach (s, m->stream_vector)
615  {
616  s->stats_last_clear = s->stats;
617  /* *INDENT-OFF* */
618  pool_foreach (p, s->peers, ({
619  p->stats_last_clear = p->stats;
620  }));
621  /* *INDENT-ON* */
622  }
623 }
624 
625 /* Declare all message handlers. */
626 #define _(f) void mc_msg_##f##_handler (mc_main_t * mcm, mc_msg_##f##_t * msg, u32 buffer_index);
628 #undef _
630 
631 void mc_stream_leave (mc_main_t * mcm, u32 stream_index);
632 
633 void mc_wait_for_stream_ready (mc_main_t * m, char *stream_name);
634 
635 u32 mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index);
636 
637 void mc_main_init (mc_main_t * mcm, char *tag);
638 
639 void mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master);
640 
641 void *mc_get_vlib_buffer (struct vlib_main_t *vm, u32 n_bytes,
642  u32 * bi_return);
643 
645 
647  u32 stream_index,
648  u32 multiple_messages_per_vlib_buffer,
649  mc_serialize_msg_t * msg, ...);
650 
652  u32 stream_index,
653  u32 multiple_messages_per_vlib_buffer,
654  mc_serialize_msg_t * msg, va_list * va);
655 
656 #define mc_serialize_stream(mc,si,msg,args...) \
657  mc_serialize_internal((mc),(si),(0),(msg),(msg)->serialize,args)
658 
659 #define mc_serialize(mc,msg,args...) \
660  mc_serialize_internal((mc),(~0),(0),(msg),(msg)->serialize,args)
661 
662 #define mc_serialize2(mc,add,msg,args...) \
663  mc_serialize_internal((mc),(~0),(add),(msg),(msg)->serialize,args)
664 
665 void mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index);
667  serialize_main_t * m);
668 
670 
673 {
674  return mcm->transport.max_packet_size - sizeof (mc_msg_user_request_t);
675 }
676 
679 {
680  return mc_max_message_size_in_bytes (mcm) -
682 }
683 
684 void unserialize_mc_stream (serialize_main_t * m, va_list * va);
685 void mc_stream_join_process_hold (void);
686 
687 #endif /* included_vlib_mc_h */
688 
689 /*
690  * fd.io coding-style-patch-verification: ON
691  *
692  * Local Variables:
693  * eval: (c-set-style "gnu")
694  * End:
695  */
static mc_stream_t * mc_stream_by_index(mc_main_t *m, u32 i)
Definition: mc.h:604
u32 last_global_sequence_processed
Definition: mc.h:469
u32 buffer_index
Definition: mc.h:332
mc_stream_state_t state
Definition: mc.h:415
mhash_t mastership_peer_index_by_id
Definition: mc.h:542
Definition: mhash.h:46
static void mc_stream_init(mc_stream_t *s)
Definition: mc.h:497
mc_relay_state_t
Definition: mc.h:510
void mc_main_init(mc_main_t *mcm, char *tag)
Definition: mc.c:2417
static uword mc_need_byte_swap(void)
Definition: mc.h:31
static void mc_stream_free(mc_stream_t *s)
Definition: mc.h:485
elog_main_t * elog_main
Definition: mc.h:585
uword * all_peer_bitmap
Definition: mc.h:449
u64 user_requests_received
Definition: mc.h:481
f64 sent_at
Definition: mc.h:347
a
Definition: bitmap.h:538
static void mc_byte_swap_msg_join_reply(mc_msg_join_reply_t *r)
Definition: mc.h:126
mhash_t peer_index_by_id
Definition: mc.h:452
vlib_one_time_waiting_process_t * procs_waiting_for_join_done
Definition: mc.h:457
u64 n_msgs_from_future
Definition: mc.h:316
u32 last_sequence_received
Definition: mc.h:325
unsigned long u64
Definition: types.h:89
void mc_stream_join_process_hold(void)
Definition: mc.c:722
static u32 clib_byte_swap_u32(u32 x)
Definition: byte_order.h:76
static void mc_byte_swap_msg_user_request(mc_msg_user_request_t *r)
Definition: mc.h:152
#define CLIB_ARCH_IS_LITTLE_ENDIAN
Definition: byte_order.h:45
mc_serialize_msg_t ** global_msgs
Definition: mc.h:573
#define foreach_mc_stream_state
Definition: mc.h:397
int i
struct mc_main_t mc_main_t
mc_stream_state_t
Definition: mc.h:404
u32 * catchup_fifo
Definition: mc.h:441
static uword mc_max_message_size_in_bytes(mc_main_t *mcm)
Definition: mc.h:672
u8 *( format_function_t)(u8 *s, va_list *args)
Definition: format.h:48
u64 relay_master_peer_id
Definition: mc.h:537
static void mc_byte_swap_msg_catchup_reply(mc_msg_catchup_reply_t *r)
Definition: mc.h:213
mc_stream_stats_t stats
Definition: mc.h:443
u32 index
Definition: mc.h:418
unsigned char u8
Definition: types.h:56
typedef CLIB_PACKED(struct{mc_peer_id_t peer_id;u32 global_sequence;}) mc_msg_master_assert_t
double f64
Definition: types.h:142
f64 join_timeout
Definition: mc.h:455
u32 relay_global_sequence
Definition: mc.h:548
format_function_t format_mc_main
Definition: mc.h:644
mc_retry_t * retry_pool
Definition: mc.h:423
u32 retry_tail_index
Definition: mc.h:426
u32 n_retries
Definition: mc.h:338
static word mc_serialize_n_bytes_left(mc_main_t *mcm, serialize_main_t *m)
Definition: mc.h:678
uword * stream_msg_index_by_name
Definition: mc.h:478
i64 word
Definition: types.h:111
#define pool_foreach(VAR, POOL, BODY)
Iterate through pool.
Definition: pool.h:443
struct mc_stream_t mc_stream_t
mc_stream_config_t config
Definition: mc.h:413
#define always_inline
Definition: clib.h:92
static i32 clib_byte_swap_i32(i32 x)
Definition: byte_order.h:95
clib_error_t * mc_serialize_internal(mc_main_t *mc, u32 stream_index, u32 multiple_messages_per_vlib_buffer, mc_serialize_msg_t *msg,...)
Definition: mc.c:2076
#define vec_elt_at_index(v, i)
Get vector value at index i checking that i is in bounds.
void * opaque
Definition: mc.h:299
void unserialize_mc_stream(serialize_main_t *m, va_list *va)
Definition: mc.c:1467
mc_peer_id_t our_catchup_peer_id
Definition: mc.h:302
u32 retry_head_index
Definition: mc.h:426
unsigned int u32
Definition: types.h:88
uword * stream_index_by_name
Definition: mc.h:554
uword * procs_waiting_for_stream_name_by_name
Definition: mc.h:556
void mc_wait_for_stream_ready(mc_main_t *m, char *stream_name)
Definition: mc.c:993
u32 join_ager_process
Definition: mc.h:567
mc_mastership_peer_t * mastership_peers
Definition: mc.h:539
static u32 serialize_vlib_buffer_n_bytes(serialize_main_t *m)
Definition: buffer.h:519
serialize_function_t unserialize_mc_main
Definition: mc.h:669
char * name
Definition: main.h:104
#define hash_get(h, key)
Definition: hash.h:249
foreach_mc_msg_type u32 mc_stream_join(mc_main_t *mcm, mc_stream_config_t *)
Definition: mc.c:879
u64 as_u64
Definition: mc.h:43
vlib_one_time_waiting_process_t * procs_waiting_for_open_window
Definition: mc.h:459
uword mc_unserialize_message(mc_main_t *mcm, mc_stream_t *s, serialize_main_t *m)
Definition: mc.c:2101
uword * unacked_by_peer_bitmap
Definition: mc.h:344
u32 prev_index
Definition: mc.h:341
static void mc_byte_swap_msg_master_assert(mc_msg_master_assert_t *r)
Definition: mc.h:70
void mc_stream_leave(mc_main_t *mcm, u32 stream_index)
Definition: mc.c:885
#define hash_free(h)
Definition: hash.h:310
u32 mastership_process
Definition: mc.h:566
f64 time_last_master_assert_received
Definition: mc.h:521
mc_catchup_process_arg_t * catchup_process_args
Definition: mc.h:562
static void mc_clear_stream_stats(mc_main_t *m)
Definition: mc.h:610
u32 catchup_process
Definition: mc.h:569
mc_peer_id_t peer_id
Definition: mc.h:519
void mc_enable_disable_mastership(mc_main_t *mcm, int we_can_be_master)
Definition: mc.c:1879
u32 retry_process
Definition: mc.h:568
u32 mc_stream_send(mc_main_t *mcm, u32 stream_index, u32 buffer_index)
Definition: mc.c:1016
mc_peer_id_t our_ack_peer_id
Definition: mc.h:301
static void catchup_send_fun(void *transport_main, uword opaque, u8 *data)
Definition: mc_socket.c:925
#define pool_free(p)
Free a pool.
Definition: pool.h:357
The fine-grained event logger allows lightweight, thread-safe event logging at minimum cost...
void * mc_get_vlib_buffer(struct vlib_main_t *vm, u32 n_bytes, u32 *bi_return)
Definition: mc.c:110
mc_relay_msg_type_t
Definition: mc.h:88
clib_error_t * serialize(serialize_main_t *m,...)
Definition: serialize.c:672
u32 retry_limit
Definition: mc.h:371
#define vec_free(V)
Free vector&#39;s memory (no header).
Definition: vec.h:339
Definition: mc.h:330
u32 window_size
Definition: mc.h:365
f64 retry_interval
Definition: mc.h:368
mhash_t elog_id_by_peer_id
Definition: mc.h:588
static int mc_peer_id_compare(mc_peer_id_t a, mc_peer_id_t b)
Definition: mc.h:54
mc_retry_t * retired_fifo
Definition: mc.h:434
void mc_unserialize(mc_main_t *mcm, mc_stream_t *s, u32 buffer_index)
Definition: mc.c:2264
u32 max_packet_size
Definition: mc.h:306
u8 as_u8[8]
Definition: mc.h:42
mc_stream_stats_t stats_last_clear
Definition: mc.h:443
signed int i32
Definition: types.h:81
Definition: mc.h:530
static void mc_byte_swap_msg_user_ack(mc_msg_user_ack_t *r)
Definition: mc.h:173
char * name
Definition: mc.h:362
static mc_stream_t * mc_stream_by_name(mc_main_t *m, char *name)
Definition: mc.h:597
static void mhash_free(mhash_t *h)
Definition: mhash.h:149
mc_serialize_stream_msg_t * stream_msgs
Definition: mc.h:472
clib_error_t * unserialize(serialize_main_t *m,...)
Definition: serialize.c:684
u32 we_can_be_relay_master
Definition: mc.h:535
#define clib_fifo_free(f)
Definition: fifo.h:257
vlib_one_time_waiting_process_t ** procs_waiting_for_stream_name_pool
Definition: mc.h:558
uword * global_msg_index_by_name
Definition: mc.h:576
mc_transport_type_t
Definition: mc.h:274
int joins_in_progress
Definition: mc.h:560
u32 unserialize_process
Definition: mc.h:570
mc_relay_state_t relay_state
Definition: mc.h:532
struct _mc_serialize_msg mc_serialize_msg_t
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
void( serialize_function_t)(serialize_main_t *m, va_list *va)
Definition: serialize.h:168
mc_stream_peer_stats_t stats_last_clear
Definition: mc.h:327
uword * retry_index_by_local_sequence
Definition: mc.h:437
mc_stream_peer_t * peers
Definition: mc.h:446
static void mc_byte_swap_msg_join_or_leave_request(mc_msg_join_or_leave_request_t *r)
Definition: mc.h:106
u64 user_requests_sent
Definition: mc.h:480
uword * elog_id_by_msg_name
Definition: mc.h:590
mc_peer_id_t id
Definition: mc.h:322
u64 n_msgs_from_past
Definition: mc.h:315
u64 uword
Definition: types.h:112
u64 n_retries
Definition: mc.h:353
mc_stream_and_buffer_t * mc_unserialize_stream_and_buffers
Definition: mc.h:593
serialize_function_t serialize_mc_main
Definition: mc.h:669
format_function_t * format_peer_id
Definition: mc.h:308
u32 our_local_sequence
Definition: mc.h:462
clib_error_t * mc_serialize_va(mc_main_t *mc, u32 stream_index, u32 multiple_messages_per_vlib_buffer, mc_serialize_msg_t *msg, va_list *va)
Definition: mc.c:1993
#define vec_foreach(var, vec)
Vector iterator.
#define foreach_mc_msg_type
Definition: mc.h:79
u32 local_sequence
Definition: mc.h:335
u32 * stream_msg_index_by_global_index
Definition: mc.h:475
struct vlib_main_t * vlib_main
Definition: mc.h:584
static mc_peer_id_t mc_byte_swap_peer_id(mc_peer_id_t i)
Definition: mc.h:47
static uword catchup_request_fun(void *transport_main, u32 stream_index, mc_peer_id_t catchup_peer_id)
Definition: mc_socket.c:843
mc_transport_t transport
Definition: mc.h:545
mc_stream_t * stream_vector
Definition: mc.h:551
static void mc_byte_swap_msg_catchup_request(mc_msg_catchup_request_t *r)
Definition: mc.h:192