24 #define MSG_ID_DEBUG 0 60 name_copy =
format (0,
"%s%c", msg_name, 0);
73 .format =
"tx-msg: stream %d local seq %d attempt %d",
74 .format_args =
"i4i4i4",
76 struct {
u32 stream_id, local_sequence, retry_count; } * ed;
78 ed->stream_id = stream_id;
79 ed->local_sequence = local_sequence;
80 ed->retry_count = retry_count;
96 {
return (
i32) x - (
i32) y;}
109 return (
void *) b->
data;
115 int notify_application)
127 .format =
"delete peer %s from all_peer_bitmap",
130 struct {
u32 peer; } * ed = 0;
153 memset (p, 0,
sizeof (p[0]));
164 .format =
"get_or_create %s peer %s stream %d seq %d",
165 .format_args =
"t4T4i4i4",
167 .enum_strings = {
"old",
"new", },
169 struct {
u32 is_new, peer, stream_index, rx_sequence; } * ed = 0;
172 ed->is_new = q ? 0 : 1;
174 ed->stream_index = s->
index;
224 .format =
"resend-retired: search for local seq %d",
227 struct {
u32 local_sequence; } * ed;
229 ed->local_sequence = local_sequence;
235 if (retry->local_sequence == local_sequence)
237 elog_tx_msg (mcm, s->index, retry->local_sequence, -13);
239 mcm->transport.tx_buffer
240 (mcm->transport.opaque,
241 MC_TRANSPORT_USER_REQUEST_TO_RELAY,
242 retry->buffer_index);
250 .format =
"resend-retired: FAILED search for local seq %d",
253 struct {
u32 local_sequence; } * ed;
255 ed->local_sequence = local_sequence;
263 uword * dead_peer_bitmap)
268 uword pi = p - stream->peers;
269 uword is_alive = 0 == clib_bitmap_get (r->unacked_by_peer_bitmap, pi);
272 dead_peer_bitmap = clib_bitmap_ori (dead_peer_bitmap, pi);
274 if (MC_EVENT_LOGGING > 0)
276 ELOG_TYPE_DECLARE (e) = {
277 .format =
"delete_retry_fifo_elt: peer %s is %s",
278 .format_args =
"T4t4",
280 .enum_strings = {
"alive",
"dead", },
282 struct { u32 peer, is_alive; } * ed;
283 ed = ELOG_DATA (mcm->elog_main, e);
284 ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
285 ed->is_alive = is_alive;
289 hash_unset (stream->retry_index_by_local_sequence, r->local_sequence);
292 return dead_peer_bitmap;
334 uword * dead_peer_bitmap = 0;
358 .format =
"resend local seq %d attempt %d",
359 .format_args =
"i4i4",
363 if (clib_bitmap_get (r->unacked_by_peer_bitmap, p - s->peers))
365 ELOG_TYPE_DECLARE (ev) = {
366 .format =
"resend: needed by peer %s local seq %d",
367 .format_args =
"T4i4",
369 struct { u32 peer, rx_sequence; } * ed;
370 ed = ELOG_DATA (mcm->elog_main, ev);
371 ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
372 ed->rx_sequence = r->local_sequence;
376 struct {
u32 sequence;
u32 trail; } * ed;
406 r->unacked_by_peer_bitmap =
407 clib_bitmap_andnoti (r->unacked_by_peer_bitmap, i);
434 if (s->
state != MC_STREAM_STATE_invalid)
444 mc_msg_join_or_leave_request_t * mp;
448 memset(mp, 0,
sizeof (*mp));
449 mp->type = MC_MSG_TYPE_join_or_leave_request;
451 mp->stream_index = stream_index;
452 mp->is_join = is_join;
480 if (s->
state != MC_STREAM_STATE_join_in_progress)
485 s->
state = MC_STREAM_STATE_ready;
490 .format =
"stream %d join timeout",
522 .format =
"stream %d resend join request",
538 char * name = va_arg (*va,
char *);
545 buf[n_buf_bytes - 1] = 0;
562 .format =
"stream index %d already named %s",
563 .format_args =
"i4s16",
565 struct {
u32 stream_index;
char name[16]; } * ed;
567 ed->stream_index = p[0];
577 s->
state = MC_STREAM_STATE_name_known;
584 .format =
"stream index %d named %s",
585 .format_args =
"i4s16",
587 struct {
u32 stream_index;
char name[16]; } * ed;
589 ed->stream_index = s->
index;
608 .name =
"mc_register_stream_name",
623 u32 last_global_sequence_processed)
668 if (s->
state == MC_STREAM_STATE_ready)
675 == MC_STREAM_STATE_invalid))
678 .
name =
"mc-internal",
700 &mc_register_stream_name_msg,
759 s->
state = MC_STREAM_STATE_join_in_progress;
770 .format =
"stream index %d join request %s",
771 .format_args =
"i4s16",
773 struct {
u32 stream_index;
char name[16]; } * ed;
775 ed->stream_index = s->
index;
786 ELOG_TYPE (e,
"join complete stream %d");
806 .format =
"leave-stream: %d",
809 struct {
u32 index; } * ed;
811 ed->index = stream_index;
816 s->
state = MC_STREAM_STATE_name_known;
820 mc_msg_join_or_leave_request_t * req,
824 mc_msg_join_reply_t * rep;
830 if (! s || s->
state != MC_STREAM_STATE_ready)
845 if (this_s->
state != MC_STREAM_STATE_ready
846 && this_s->
state != MC_STREAM_STATE_name_known)
860 memset (rep, 0,
sizeof (rep[0]));
861 rep->type = MC_MSG_TYPE_join_reply;
862 rep->stream_index = req->stream_index;
879 mc_msg_join_reply_t * mp,
888 if (! s || s->
state != MC_STREAM_STATE_join_in_progress)
893 s->
state = MC_STREAM_STATE_catchup;
898 mp->catchup_peer_id);
914 if (s->
state == MC_STREAM_STATE_catchup
915 || s->
state == MC_STREAM_STATE_ready)
928 mc_msg_user_request_t * mp;
935 if (s->
state != MC_STREAM_STATE_ready)
965 mp->global_sequence = 0xdeadbeef;
966 mp->stream_index = s->
index;
1006 if (! s || s->
state != MC_STREAM_STATE_ready)
1017 seq_cmp_result =
mc_seq_cmp (mp->local_sequence,
1023 .format =
"rx-msg: peer %s stream %d rx seq %d seq_cmp %d",
1024 .format_args =
"T4i4i4i4",
1026 struct {
u32 peer, stream_index, rx_sequence;
i32 seq_cmp_result; } * ed;
1029 ed->stream_index = mp->stream_index;
1030 ed->rx_sequence = mp->local_sequence;
1031 ed->seq_cmp_result = seq_cmp_result;
1034 if (0 && mp->stream_index == 1 && once == 0)
1037 ELOG_TYPE (e,
"FAKE lost msg on stream 1");
1045 if (seq_cmp_result > 0)
1051 mc_msg_user_ack_t * rp;
1056 rp->stream_index = s->
index;
1057 rp->local_sequence = mp->local_sequence;
1058 rp->seq_cmp_result = seq_cmp_result;
1063 .format =
"tx-ack: stream %d local seq %d",
1064 .format_args =
"i4i4",
1066 struct {
u32 stream_index;
u32 local_sequence; } * ed;
1068 ed->stream_index = rp->stream_index;
1069 ed->local_sequence = rp->local_sequence;
1076 if (seq_cmp_result < 0)
1083 if (seq_cmp_result == 0)
1088 case MC_STREAM_STATE_ready:
1099 case MC_STREAM_STATE_catchup:
1118 int peer_created = 0;
1127 .format =
"rx-ack: local seq %d peer %s seq_cmp_result %d",
1128 .format_args =
"i4T4i4",
1130 struct {
u32 local_sequence;
u32 peer;
i32 seq_cmp_result;} * ed;
1132 ed->local_sequence = mp->local_sequence;
1134 ed->seq_cmp_result = mp->seq_cmp_result;
1149 if (mp->seq_cmp_result > 0)
1152 mp->seq_cmp_result);
1177 .format =
"ack: for seq %d from peer %s no fifo elt",
1178 .format_args =
"i4T4",
1180 struct {
u32 seq;
u32 peer; } * ed;
1182 ed->seq = mp->local_sequence;
1208 if (!peer_created &&
1215 .format =
"dup-ack: for seq %d from peer %s",
1216 .format_args =
"i4T4",
1218 struct {
u32 seq;
u32 peer; } * ed;
1231 .format =
"ack: for seq %d from peer %s",
1232 .format_args =
"i4T4",
1234 struct {
u32 seq;
u32 peer; } * ed;
1236 ed->seq = mp->local_sequence;
1252 .format =
"ack: retire fifo elt loc seq %d after %d acks",
1253 .format_args =
"i4i4",
1255 struct {
u32 seq;
u32 npeers; } * ed;
1267 #define EVENT_MC_SEND_CATCHUP_DATA 0 1275 uword *event_data = 0;
1282 _vec_len(event_data) = 0;
1285 for (i = 0; i <
vec_len(event_data); i++)
1309 u8 * x = serialize_get (m, sizeof (p->id));
1310 clib_memcpy (x, p->id.as_u8, sizeof (p->id));
1311 serialize_integer (m, p->last_sequence_received,
1312 sizeof (p->last_sequence_received));
1325 for (i = 0; i < n_peers; i++)
1350 if (! s || s->
state != MC_STREAM_STATE_ready)
1357 .format =
"catchup-request: from %s stream %d",
1358 .format_args =
"T4i4",
1360 struct {
u32 peer, stream; } * ed;
1363 ed->stream = req->stream_index;
1385 mc_msg_catchup_reply_t * rep;
1392 rep->peer_id = req->peer_id;
1393 rep->stream_index = req->stream_index;
1408 rep->last_global_sequence_included);
1422 #define EVENT_MC_UNSERIALIZE_BUFFER 0 1423 #define EVENT_MC_UNSERIALIZE_CATCHUP 1 1443 if (! s || s->
state == MC_STREAM_STATE_ready)
1460 if (p->id.as_u64 == mcm->transport.our_ack_peer_id.as_u64)
1461 s->our_local_sequence = p->last_sequence_received + 1;
1470 mp->n_data_bytes - n_stream_bytes);
1481 mc_msg_user_request_t * gp;
1491 seq_cmp_result =
mc_seq_cmp (gp->global_sequence,
1492 mp->last_global_sequence_included);
1494 if (seq_cmp_result > 0)
1503 .format =
"catchup replay local sequence 0x%x",
1504 .format_args =
"i4",
1506 struct {
u32 local_sequence; } * ed;
1508 ed->local_sequence = gp->local_sequence;
1516 .format =
"catchup discard local sequence 0x%x",
1517 .format_args =
"i4",
1519 struct {
u32 local_sequence; } * ed;
1521 ed->local_sequence = gp->local_sequence;
1528 s->
state = MC_STREAM_STATE_ready;
1543 mc_msg_master_assert_t * mp;
1548 f64 now, time_last_master_assert = -1;
1565 if (now >= time_last_master_assert + 1)
1567 time_last_master_assert = now;
1580 .format =
"tx-massert: peer %s global seq %u",
1581 .format_args =
"T4i4",
1583 struct {
u32 peer, global_sequence; } * ed;
1586 ed->global_sequence = mp->global_sequence;
1602 if (! is_master && timeouts++ > 2)
1608 ELOG_TYPE (e,
"become master (was maybe_master)");
1619 ELOG_TYPE (e,
"become slave (was maybe_master)");
1653 ELOG_TYPE (e,
"timeouts; negoitate mastership");
1707 u8 signal_slave = 0;
1708 u8 update_global_sequence = 0;
1712 his_peer_id = mp->peer_id;
1716 seq_cmp_result =
mc_seq_cmp (mp->global_sequence,
1730 if (seq_cmp_result > 0)
1733 update_global_sequence = 1;
1759 .format =
"rx-massert: peer %s global seq %u upd %d slave %d",
1760 .format_args =
"T4i4i1i1",
1764 u32 global_sequence;
1770 ed->global_sequence = mp->global_sequence;
1771 ed->update_sequence = update_global_sequence;
1772 ed->slave = signal_slave;
1794 m = m->next_registration;
1801 u32 multiple_messages_per_vlib_buffer,
1809 u32 bi, n_before, n_after, n_total, n_this_msg;
1814 sbm->
tx.max_n_data_bytes_per_chain = 4096;
1824 gi = msg->global_index;
1840 .format =
"serialize-msg: %s index %d",
1841 .format_args =
"T4i4",
1843 struct {
u32 c[2]; } * ed;
1852 n_this_msg = n_after - n_before;
1853 n_total = n_after +
sizeof (mc_msg_user_request_t);
1857 msg->max_n_bytes_serialized =
clib_max (msg->max_n_bytes_serialized, n_this_msg);
1859 if (! multiple_messages_per_vlib_buffer
1877 u32 multiple_messages_per_vlib_buffer,
1885 if (stream_index == ~0)
1895 multiple_messages_per_vlib_buffer,
1924 .format =
"unserialize-msg: %s rx index %d",
1925 .format_args =
"T4i4",
1927 struct {
u32 c[2]; } * ed;
1960 .format =
"msg-bind: stream %d %s to index %d",
1961 .format_args =
"i4T4i4",
1963 struct {
u32 c[3]; } * ed;
1965 ed->c[0] = s->
index;
1976 .format =
"msg-id-ERROR: %s index %d expected %d",
1977 .format_args =
"T4i4i4",
1979 struct {
u32 c[3]; } * ed;
2021 static u8 * contents;
2059 uword event_type, * event_data = 0;
2065 _vec_len(event_data) = 0;
2072 for (i = 0; i <
vec_len (event_data); i++)
2077 for (i = 0; i <
vec_len (event_data); i++)
2119 u32 i, n_streams, n_stream_msgs;
2125 for (i = 0; i < n_streams; i++)
2136 s->
state = MC_STREAM_STATE_name_known;
2162 .format =
"catchup-bind: %s to %d global index %d stream %d",
2163 .format_args =
"T4i4i4i4",
2165 struct {
u32 c[4]; } * ed;
2170 ed->c[3] = s->
index;
2202 memset (&r, 0,
sizeof (r));
2207 r.runtime_data = &mcm;
2208 r.runtime_data_bytes =
sizeof (&mcm);
2210 r.name = (
char *)
format (0,
"mc-mastership-%s", tag);
2214 r.name = (
char *)
format (0,
"mc-join-ager-%s", tag);
2218 r.name = (
char *)
format (0,
"mc-retry-%s", tag);
2222 r.name = (
char *)
format (0,
"mc-catchup-%s", tag);
2226 r.name = (
char *)
format (0,
"mc-unserialize-%s", tag);
2254 return format (s,
"unknown 0x%x", state);
2257 return format (s,
"%s", t);
2266 #define _(f) case MC_STREAM_STATE_##f: t = #f; break; 2270 return format (s,
"unknown 0x%x", state);
2273 return format (s,
"%s", t);
2292 s =
format (s,
"MC state %U, %d streams joined, global sequence 0x%x",
2300 s =
format (s,
"\n%UMost recent mastership peers:",
2304 s =
format (s,
"\n%U%-30U%.4e",
2313 s =
format (s,
"\n%Ustream `%s' index %d",
2317 s =
format (s,
"\n%Ustate %U",
2321 s =
format (s,
"\n%Uretries: interval %.0f sec, limit %d, pool elts %d, %Ld sent",
2327 s =
format (s,
"\n%U%Ld/%Ld user requests sent/received",
2331 s =
format (s,
"\n%U%d peers, local/global sequence 0x%x/0x%x",
2340 if (clib_bitmap_get (t->all_peer_bitmap, p - t->peers))
2341 vec_add1 (ps, p[0]);
2344 s =
format (s,
"\n%U%=30s%10s%16s%16s",
2346 "Peer",
"Last seq",
"Retries",
"Future");
2350 s =
format (s,
"\n%U%-30U0x%08x%16Ld%16Ld%s",
u32 last_global_sequence_processed
void mc_main_init(mc_main_t *mcm, char *tag)
void mc_stream_join_process_hold(void)
#define vec_validate(V, I)
Make sure vector is long enough for given index (no header, unspecified alignment) ...
#define EVENT_MC_SEND_CATCHUP_DATA
mhash_t mastership_peer_index_by_id
static void delete_peer_with_index(mc_main_t *mcm, mc_stream_t *s, uword index, int notify_application)
void(* save_snapshot)(struct mc_main_t *mc_main, u32 is_catchup, u8 *snapshot_data, u32 n_snapshot_data_bytes)
#define hash_set(h, key, value)
void mc_unserialize(mc_main_t *mcm, mc_stream_t *s, u32 buffer_index)
sll srl srl sll sra u16x4 i
always_inline u32 serialize_vlib_buffer_n_bytes(serialize_main_t *m)
u64 user_requests_received
void mc_rx_buffer_unserialize(mc_main_t *mcm, mc_stream_t *stream, mc_peer_id_t peer_id, u32 buffer_index)
static void elog_stream_name(char *buf, int n_buf_bytes, char *v)
always_inline uword vlib_process_get_events(vlib_main_t *vm, uword **data_vector)
#define hash_unset(h, key)
always_inline mc_retry_t * next_retry(mc_stream_t *s, mc_retry_t *r)
void serialize_bitmap(serialize_main_t *m, uword *b)
static void(BVT(clib_bihash)*h, BVT(clib_bihash_value)*v)
always_inline int mc_peer_id_compare(mc_peer_id_t a, mc_peer_id_t b)
always_inline mc_stream_t * mc_stream_by_index(mc_main_t *m, u32 i)
vlib_one_time_waiting_process_t * procs_waiting_for_join_done
void unserialize_close_vlib_buffer(serialize_main_t *m)
static uword mc_retry_process(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *f)
vlib_one_time_waiting_process_t * procs_waiting_for_mc_stream_join
static void maybe_send_window_open_event(vlib_main_t *vm, mc_stream_t *stream)
u32 last_sequence_received
u32 mc_stream_send(mc_main_t *mcm, u32 stream_index, u32 buffer_index)
always_inline void mc_stream_free(mc_stream_t *s)
always_inline void remove_retry_from_pool(mc_stream_t *s, mc_retry_t *r)
always_inline mc_retry_t * prev_retry(mc_stream_t *s, mc_retry_t *r)
always_inline void serialize_likely_small_unsigned_integer(serialize_main_t *m, u64 x)
struct vlib_main_t * vlib_main
void mc_msg_master_assert_handler(mc_main_t *mcm, mc_msg_master_assert_t *mp, u32 buffer_index)
u8 *(* catchup_snapshot)(struct mc_main_t *mc_main, u8 *snapshot_vector, u32 last_global_sequence_included)
#define ELOG(em, f, data)
#define vec_add1(V, E)
Add 1 element to end of vector (unspecified alignment).
always_inline void mc_byte_swap_msg_user_request(mc_msg_user_request_t *r)
void mc_msg_user_ack_handler(mc_main_t *mcm, mc_msg_user_ack_t *mp, u32 buffer_index)
mc_serialize_msg_t ** global_msgs
void * mc_get_vlib_buffer(vlib_main_t *vm, u32 n_bytes, u32 *bi_return)
always_inline uword mhash_set(mhash_t *h, void *key, uword new_value, uword *old_value)
struct _vlib_node_registration vlib_node_registration_t
#define vec_add2(V, P, N)
Add N elements to end of vector V, return pointer to new elements in P.
always_inline void vlib_process_signal_event(vlib_main_t *vm, uword node_index, uword type_opaque, uword data)
#define hash_set_mem(h, key, value)
clib_error_t *(* tx_buffer)(void *opaque, mc_transport_type_t type, u32 buffer_index)
clib_error_t * mc_serialize_internal(mc_main_t *mc, u32 stream_index, u32 multiple_messages_per_vlib_buffer, mc_serialize_msg_t *msg,...)
#define clib_error_report(e)
void unserialize_mc_main(serialize_main_t *m, va_list *va)
add_epi add_epi sub_epi sub_epi adds_epu subs_epu i16x8 y
always_inline uword vlib_process_suspend(vlib_main_t *vm, f64 dt)
void mc_msg_join_reply_handler(mc_main_t *mcm, mc_msg_join_reply_t *mp, u32 buffer_index)
u32 mc_stream_join(mc_main_t *mcm, mc_stream_config_t *config)
always_inline void * vlib_buffer_get_current(vlib_buffer_t *b)
Get pointer to current data to process.
always_inline vlib_main_t * vlib_get_main(void)
void(* peer_died)(struct mc_main_t *mc_main, struct mc_stream_t *stream, mc_peer_id_t peer_id)
struct vlib_serialize_buffer_main_t::@27::@30 rx
#define vec_reset_length(v)
Reset vector length to zero NULL-pointer tolerant.
u32 relay_global_sequence
u32 vlib_register_node(vlib_main_t *vm, vlib_node_registration_t *r)
static void elog_tx_msg(mc_main_t *m, u32 stream_id, u32 local_sequence, u32 retry_count)
#define mc_serialize_stream(mc, si, msg, args...)
uword(* catchup_request_fun)(void *opaque, u32 stream_index, mc_peer_id_t catchup_peer_id)
void mc_msg_catchup_reply_handler(mc_main_t *mcm, mc_msg_catchup_reply_t *mp, u32 catchup_opaque)
static u8 * format_mc_relay_state(u8 *s, va_list *args)
#define pool_foreach(VAR, POOL, BODY)
mc_stream_config_t config
always_inline i32 mc_seq_cmp(u32 x, u32 y)
void unserialize_open_data(serialize_main_t *m, u8 *data, uword n_data_bytes)
always_inline void unserialize_integer(serialize_main_t *m, void *x, u32 n_bytes)
static void perform_catchup(mc_main_t *mcm, mc_msg_catchup_reply_t *mp)
#define vec_elt_at_index(v, i)
Get vector value at index i checking that i is in bounds.
void(* catchup_send_fun)(void *opaque, uword catchup_opaque, u8 *data_vector)
always_inline uword pool_elts(void *v)
serialize_main_t serialize_mains[VLIB_N_RX_TX]
#define MC_STREAM_INDEX_INTERNAL
static int mc_peer_comp(void *a1, void *a2)
always_inline uword clib_fifo_elts(void *v)
#define clib_warning(format, args...)
always_inline void vlib_signal_one_time_waiting_process(vlib_main_t *vm, vlib_one_time_waiting_process_t *p)
mc_peer_id_t our_catchup_peer_id
#define vec_resize(V, N)
Resize a vector (no header, unspecified alignment) Add N elements to end of given vector V...
always_inline mc_stream_t * mc_stream_by_name(mc_main_t *m, char *name)
always_inline void mc_byte_swap_msg_catchup_request(mc_msg_catchup_request_t *r)
uword * stream_index_by_name
uword * procs_waiting_for_stream_name_by_name
static void check_retry(mc_main_t *mcm, mc_stream_t *s)
static uword pointer_to_uword(const void *p)
u32 serialize_close_vlib_buffer(serialize_main_t *m)
#define hash_create_string(elts, value_bytes)
mc_mastership_peer_t * mastership_peers
void(* catchup)(struct mc_main_t *mc_main, u8 *snapshot_data, u32 n_snapshot_data_bytes)
always_inline void vlib_buffer_free_one(vlib_main_t *vm, u32 buffer_index)
Free one buffer Shorthand to free a single buffer chain.
#define clib_bitmap_foreach(i, ai, body)
#define pool_elt_at_index(p, i)
#define hash_unset_mem(h, key)
always_inline void mc_byte_swap_msg_user_ack(mc_msg_user_ack_t *r)
#define clib_fifo_sub1(f, e)
u16 current_length
Nbytes between current data and the end of this buffer.
vlib_one_time_waiting_process_t * procs_waiting_for_open_window
uword * unacked_by_peer_bitmap
always_inline uword * vlib_process_wait_for_event(vlib_main_t *vm)
always_inline uword vlib_buffer_contents(vlib_main_t *vm, u32 buffer_index, u8 *contents)
Copy buffer contents to memory.
always_inline f64 vlib_process_wait_for_event_or_clock(vlib_main_t *vm, f64 dt)
f64 time_last_master_assert_received
mc_catchup_process_arg_t * catchup_process_args
void unserialize_mc_stream(serialize_main_t *m, va_list *va)
always_inline void * unserialize_get(serialize_main_t *m, uword n_bytes)
always_inline void mc_byte_swap_msg_join_or_leave_request(mc_msg_join_or_leave_request_t *r)
static uword mc_mastership_process(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *f)
always_inline void mc_stream_init(mc_stream_t *s)
#define EVENT_MC_UNSERIALIZE_BUFFER
#define clib_fifo_foreach(v, f, body)
always_inline void mc_byte_swap_msg_join_reply(mc_msg_join_reply_t *r)
void unserialize_cstring(serialize_main_t *m, char **s)
#define uword_to_pointer(u, type)
clib_error_t *(* tx_ack)(void *opaque, mc_peer_id_t peer_id, u32 buffer_index)
mc_peer_id_t our_ack_peer_id
static format_function_t format_mc_stream_state
always_inline uword clib_bitmap_get(uword *ai, uword i)
static void mc_serialize_init(mc_main_t *mcm)
MC_SERIALIZE_MSG(mc_register_stream_name_msg, static)
static uword mc_catchup_process(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *f)
serialize_stream_t stream
clib_error_t * serialize(serialize_main_t *m,...)
void mhash_init(mhash_t *h, uword n_value_bytes, uword n_key_bytes)
vlib_serialize_buffer_main_t serialize_buffer_mains[VLIB_N_RX_TX]
uword mc_unserialize_message(mc_main_t *mcm, mc_stream_t *s, serialize_main_t *m)
static uword * delete_retry_fifo_elt(mc_main_t *mcm, mc_stream_t *stream, mc_retry_t *r, uword *dead_peer_bitmap)
void serialize_open_vector(serialize_main_t *m, u8 *vector)
always_inline mc_main_t * mc_node_get_main(vlib_node_runtime_t *node)
#define vec_free(V)
Free vector's memory (no header).
void serialize_mc_main(serialize_main_t *m, va_list *va)
#define clib_memcpy(a, b, c)
void mc_unserialize_internal(mc_main_t *mcm, u32 stream_and_buffer_index)
mhash_t elog_id_by_peer_id
u8 * format_mc_main(u8 *s, va_list *args)
always_inline uword vlib_process_wait_for_event_with_type(vlib_main_t *vm, uword **data_vector, uword with_type_opaque)
static uword mc_unserialize_process(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *f)
always_inline void mc_byte_swap_msg_master_assert(mc_msg_master_assert_t *r)
always_inline void serialize_integer(serialize_main_t *m, u64 x, u32 n_bytes)
#define ELOG_TYPE(f, fmt)
mc_retry_t * retired_fifo
always_inline void mc_byte_swap_msg_catchup_reply(mc_msg_catchup_reply_t *r)
#define ELOG_TYPE_DECLARE(f)
void mc_enable_disable_mastership(mc_main_t *mcm, int we_can_be_master)
static void this_node_slave(mc_main_t *mcm)
void mc_msg_user_request_handler(mc_main_t *mcm, mc_msg_user_request_t *mp, u32 buffer_index)
#define VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX
always_inline uword vlib_buffer_index_length_in_chain(vlib_main_t *vm, u32 bi)
Get length in bytes of the buffer index buffer chain.
mc_stream_stats_t stats_last_clear
static uword mc_join_ager_process(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *f)
#define pool_put_index(p, i)
void mc_stream_leave(mc_main_t *mcm, u32 stream_index)
void mc_msg_join_or_leave_request_handler(mc_main_t *mcm, mc_msg_join_or_leave_request_t *req, u32 buffer_index)
static void serialize_mc_stream(serialize_main_t *m, va_list *va)
mc_serialize_stream_msg_t * stream_msgs
static u32 elog_id_for_msg_name(mc_main_t *m, char *msg_name)
mc_stream_peer_stats_t stats
vhost_vring_state_t state
mc_serialize_msg_t * mc_msg_registrations
clib_error_t * unserialize(serialize_main_t *m,...)
#define clib_bitmap_free(v)
u32 vlib_buffer_alloc(vlib_main_t *vm, u32 *buffers, u32 n_buffers)
Allocate buffers into supplied array.
u32 we_can_be_relay_master
#define EVENT_MC_UNSERIALIZE_CATCHUP
vlib_one_time_waiting_process_t ** procs_waiting_for_stream_name_pool
void serialize_cstring(serialize_main_t *m, char *s)
always_inline uword clib_bitmap_is_zero(uword *ai)
#define vec_elt(v, i)
Get vector value at index i.
uword * global_msg_index_by_name
static void mc_internal_catchup(mc_main_t *mcm, u8 *data, u32 n_data_bytes)
void mc_wait_for_stream_ready(mc_main_t *m, char *stream_name)
#define clib_fifo_add1(f, e)
#define vec_copy(DST, SRC)
Copy a vector, memcpy wrapper.
uword * unserialize_bitmap(serialize_main_t *m)
always_inline void vlib_current_process_wait_for_one_time_event_vector(vlib_main_t *vm, vlib_one_time_waiting_process_t **wps)
u32 elog_string(elog_main_t *em, char *fmt,...)
mc_relay_state_t relay_state
struct _mc_serialize_msg mc_serialize_msg_t
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
always_inline void vlib_buffer_advance(vlib_buffer_t *b, word l)
Advance current data pointer by the supplied (signed!) amount.
mc_stream_peer_stats_t stats_last_clear
uword * retry_index_by_local_sequence
uword * elog_id_by_msg_name
clib_error_t * va_serialize(serialize_main_t *sm, va_list *va)
#define vec_sort_with_function(vec, f)
Sort a vector using the supplied element comparison function.
mc_stream_and_buffer_t * mc_unserialize_stream_and_buffers
static mc_stream_peer_t * get_or_create_peer_with_id(mc_main_t *mcm, mc_stream_t *s, mc_peer_id_t id, int *created)
static u32 elog_id_for_peer_id(mc_main_t *m, u64 peer_id)
format_function_t * format_peer_id
static u8 * mc_internal_catchup_snapshot(mc_main_t *mcm, u8 *data_vector, u32 last_global_sequence_processed)
#define hash_get_mem(h, key)
#define clib_fifo_add2(f, p)
static void this_node_maybe_master(mc_main_t *mcm)
always_inline uword vlib_in_process_context(vlib_main_t *vm)
static u32 mc_stream_join_helper(mc_main_t *mcm, mc_stream_config_t *config, u32 is_internal)
void mc_msg_catchup_request_handler(mc_main_t *mcm, mc_msg_catchup_request_t *req, u32 catchup_opaque)
always_inline u32 unserialize_vlib_buffer_n_bytes(serialize_main_t *m)
#define vec_foreach(var, vec)
Vector iterator.
always_inline f64 vlib_time_now(vlib_main_t *vm)
void * serialize_close_vector(serialize_main_t *m)
static void unserialize_mc_register_stream_name(serialize_main_t *m, va_list *va)
always_inline u64 unserialize_likely_small_unsigned_integer(serialize_main_t *m)
clib_error_t * mc_serialize_va(mc_main_t *mc, u32 stream_index, u32 multiple_messages_per_vlib_buffer, mc_serialize_msg_t *msg, va_list *va)
void(* rx_buffer)(struct mc_main_t *mc_main, struct mc_stream_t *stream, mc_peer_id_t peer_id, u32 buffer_index)
#define vec_validate_init_empty(V, I, INIT)
Make sure vector is long enough for given index and initialize empty space (no header, unspecified alignment)
static void mc_retry_free(mc_main_t *mcm, mc_stream_t *s, mc_retry_t *r)
u32 * stream_msg_index_by_global_index
struct vlib_main_t * vlib_main
static void send_join_or_leave_request(mc_main_t *mcm, u32 stream_index, u32 is_join)
void unserialize_open_vlib_buffer(serialize_main_t *m, vlib_main_t *vm, vlib_serialize_buffer_main_t *sm)
uword runtime_data[(128-1 *sizeof(vlib_node_function_t *)-1 *sizeof(vlib_error_t *)-11 *sizeof(u32)-5 *sizeof(u16))/sizeof(uword)]
always_inline void vlib_current_process_wait_for_one_time_event(vlib_main_t *vm, vlib_one_time_waiting_process_t *p)
always_inline vlib_buffer_t * vlib_get_buffer(vlib_main_t *vm, u32 buffer_index)
Translate buffer index into buffer pointer.
void serialize_open_vlib_buffer(serialize_main_t *m, vlib_main_t *vm, vlib_serialize_buffer_main_t *sm)
always_inline uword * mhash_get(mhash_t *h, void *key)
static void mc_resend_retired(mc_main_t *mcm, mc_stream_t *s, u32 local_sequence)
mc_stream_t * stream_vector
struct vlib_serialize_buffer_main_t::@27::@29 tx
static void serialize_mc_register_stream_name(serialize_main_t *m, va_list *va)