21 #include <sys/ioctl.h> 22 #include <netinet/tcp.h> 28 u64 peer_id_as_u64 = va_arg (*args,
u64);
30 peer_id.
as_u64 = peer_id_as_u64;
44 u32 buffer_index,
u32 handler_frees_buffer,
void *_h)
51 h (mcm, the_msg, buffer_index);
52 if (!handler_frees_buffer)
58 u32 buffer_index,
struct iovec **iovs_return)
62 u32 bi = buffer_index;
82 int socket,
struct sockaddr_in *tx_addr,
u32 buffer_index)
86 word n_bytes, n_bytes_tx, n_retries;
88 memset (&h, 0,
sizeof (h));
90 h.msg_namelen =
sizeof (tx_addr[0]);
93 _vec_len (msm->
iovecs) = 0;
96 ASSERT (n_bytes <= msm->mc_main.transport.max_packet_size);
98 clib_error (
"sending packet larger than interace MTU %d bytes", n_bytes);
104 while ((n_bytes_tx = sendmsg (socket, &h, 0)) != n_bytes
107 if (n_bytes_tx != n_bytes)
116 .format =
"sendmsg-helper: %d retries",.format_args =
"i4",};
123 ed->retries = n_retries;
144 struct sockaddr_in tx_addr;
149 memset (&tx_addr, 0,
sizeof (tx_addr));
150 tx_addr.sin_family = AF_INET;
154 error =
sendmsg_helper (msm, msm->ack_socket, &tx_addr, buffer_index);
162 struct sockaddr_in *rx_addr,
163 u32 * buffer_index,
u32 drop_message)
167 uword n_left, n_alloc, n_mtu,
i, i_rx;
176 uword max_alloc = 8 * n_mtu;
180 _vec_len (msm->
rx_buffers) = n_left + n_alloc;
189 for (i = 0; i < n_mtu; i++)
193 msm->
iovecs[
i].iov_len = buffer_size;
195 _vec_len (msm->
iovecs) = n_mtu;
200 memset (&h, 0,
sizeof (h));
203 h.msg_name = rx_addr;
204 h.msg_namelen =
sizeof (rx_addr[0]);
209 n_bytes_left = recvmsg (socket, &h, 0);
210 if (n_bytes_left < 0)
228 n_bytes_left < buffer_size ? n_bytes_left : buffer_size;
230 n_bytes_left -= buffer_size;
232 if (n_bytes_left <= 0)
284 if (!error && is_master)
288 mp->global_sequence = clib_host_to_net_u32 (mcm->relay_global_sequence);
289 mcm->relay_global_sequence++;
291 sendmsg_helper (msm, ms_from_relay->socket, &ms_from_relay->tx_addr,
336 switch (clib_host_to_net_u32 (mp->type))
338 case MC_MSG_TYPE_join_or_leave_request:
343 case MC_MSG_TYPE_join_reply:
421 _vec_len (
c->input_vector) = l + n;
423 if (is_eof &&
vec_len (
c->input_vector) > 0)
429 _vec_len (
c->input_vector) = 0;
457 ELOG_TYPE (e,
"catchup_client_read_ready");
473 if (
c->connect_in_progress)
477 c->connect_in_progress = 0;
478 len =
sizeof (value);
479 if (getsockopt (
c->socket, SOL_SOCKET, SO_ERROR, &value, &len) < 0)
502 if (n_this_write <= 0)
508 c->output_vector +
c->output_vector_n_written,
511 while (n < 0 && errno == EAGAIN);
518 c->output_vector_n_written += n;
521 if (
c->output_vector_n_written >=
vec_len (
c->output_vector))
567 struct sockaddr_in client_addr;
573 memset (
c, 0,
sizeof (
c[0]));
575 client_len =
sizeof (client_addr);
578 c->socket = accept (uf->file_descriptor,
579 (
struct sockaddr *) &client_addr,
580 (socklen_t *) & client_len);
595 .format =
"catchup accepted from 0x%lx",.format_args =
"i4",};
602 ed->addr = ntohl (client_addr.sin_addr.s_addr);
608 if ((setsockopt (
c->socket, IPPROTO_TCP,
609 TCP_NODELAY, (
void *) &one, sizeof (one))) < 0)
618 template.file_descriptor =
c->socket;
621 hash_set (msm->catchup_index_by_file_descriptor,
c->socket,
631 for (; port < 1 << 16; port++)
633 struct sockaddr_in
a;
635 memset (&a, 0,
sizeof (a));
637 a.sin_family = PF_INET;
638 a.sin_addr.s_addr = INADDR_ANY;
639 a.sin_port = htons (port);
641 if (bind (sock, (
struct sockaddr *) &a,
sizeof (a)) >= 0)
645 return port < 1 << 16 ? port : -1;
651 char *type,
uword udp_port)
654 struct ip_mreq mcast_req;
660 if ((ms->
socket = socket (PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0)
666 if ((setsockopt (ms->
socket, IPPROTO_IP,
667 IP_MULTICAST_TTL, (
void *) &ttl, sizeof (ttl))) < 0)
671 if (setsockopt (ms->
socket, SOL_SOCKET, SO_REUSEADDR, &one, sizeof (one)) <
676 ms->
tx_addr.sin_family = AF_INET;
679 ms->
tx_addr.sin_port = htons (udp_port);
685 memset (&mcast_req, 0,
sizeof (mcast_req));
686 mcast_req.imr_multiaddr.s_addr =
690 if ((setsockopt (ms->
socket, IPPROTO_IP,
691 IP_ADD_MEMBERSHIP, (
void *) &mcast_req,
692 sizeof (mcast_req))) < 0)
696 if (ioctl (ms->
socket, FIONBIO, &one) < 0)
702 socklen_t sl =
sizeof (len);
703 if (setsockopt (ms->
socket, SOL_SOCKET, SO_SNDBUF, &len, sl) < 0)
747 "from relay", port++);
752 msm->
ack_socket = socket (PF_INET, SOCK_DGRAM, IPPROTO_UDP);
758 if (ioctl (msm->
ack_socket, FIONBIO, &one) < 0)
779 template.file_descriptor =
781 template.private_data = (
uword) msm;
786 template.file_descriptor =
788 template.private_data = (
uword) msm;
793 template.file_descriptor =
795 template.private_data = (
uword) msm;
799 template.file_descriptor =
801 template.private_data = (
uword) msm;
807 template.private_data = (
uword) msm;
813 template.private_data = (
uword) msm;
822 u8 * set_output_vector)
828 if (set_output_vector)
850 struct sockaddr_in
addr;
855 memset (c, 0,
sizeof (*c));
857 c->
socket = socket (AF_INET, SOCK_STREAM, 0);
864 if (ioctl (c->
socket, FIONBIO, &one) < 0)
870 memset (&addr, 0,
sizeof (addr));
871 addr.sin_family = AF_INET;
881 .format =
"connecting to peer 0x%Lx",.format_args =
"i8",};
887 ed->peer = catchup_peer_id.
as_u64;
890 if (connect (c->
socket, (
const void *) &addr, sizeof (addr))
891 < 0 && errno != EINPROGRESS)
904 template.file_descriptor = c->
socket;
905 template.private_data = (
uword) msm;
913 mc_msg_catchup_request_t *mp;
917 mp->stream_index = stream_index;
937 struct sockaddr_in *sa;
940 fd = socket (PF_INET, AF_INET, 0);
947 ifr.ifr_addr.sa_family = AF_INET;
948 strncpy (ifr.ifr_name, if_name, sizeof (ifr.ifr_name) - 1);
949 if (ioctl (fd, SIOCGIFADDR, &ifr) < 0)
956 sa = (
void *) &ifr.ifr_addr;
957 clib_memcpy (ip4_address, &sa->sin_addr.s_addr, sizeof (ip4_address[0]));
959 if (ioctl (fd, SIOCGIFMTU, &ifr) < 0)
965 *mtu = ifr.ifr_mtu - ( 20 + 8);
974 int n_intfcs_to_probe)
998 for (i = 0; i < n_intfcs_to_probe; i++)
void mc_main_init(mc_main_t *mcm, char *tag)
#define vec_validate(V, I)
Make sure vector is long enough for given index (no header, unspecified alignment) ...
static void msg_handler(mc_main_t *mcm, u32 buffer_index, u32 handler_frees_buffer, void *_h)
static void clib_file_del(clib_file_main_t *um, clib_file_t *f)
static clib_error_t * catchup_listen_read_ready(clib_file_t *uf)
#define hash_set(h, key, value)
sll srl srl sll sra u16x4 i
int catchup_server_socket
#define hash_unset(h, key)
#define clib_error(format, args...)
static void catchup_cleanup(mc_socket_main_t *msm, mc_socket_catchup_t *c, clib_file_main_t *um, clib_file_t *uf)
void mc_msg_master_assert_handler(mc_main_t *mcm, mc_msg_master_assert_t *mp, u32 buffer_index)
static_always_inline mc_peer_id_t mc_socket_set_peer_id(u32 address_net_byte_order, u32 port_host_byte_order)
#define ELOG(em, f, data)
void mc_msg_user_ack_handler(mc_main_t *mcm, mc_msg_user_ack_t *mp, u32 buffer_index)
#define vec_add2(V, P, N)
Add N elements to end of vector V, return pointer to new elements in P.
static clib_error_t * catchup_client_read_ready(clib_file_t *uf)
void mc_msg_join_reply_handler(mc_main_t *mcm, mc_msg_join_reply_t *mp, u32 buffer_index)
#define pool_get(P, E)
Allocate an object E from a pool P (unspecified alignment).
#define clib_unix_error(format, args...)
mc_multicast_socket_t multicast_sockets[MC_N_TRANSPORT_TYPE]
clib_file_function_t * read_function
#define VLIB_BUFFER_NEXT_PRESENT
i16 current_data
signed offset in data[], pre_data[] that we are currently processing.
static clib_error_t * tx_buffer(void *transport, mc_transport_type_t type, u32 buffer_index)
static clib_error_t * catchup_socket_error_ready(clib_file_t *uf)
void mc_msg_catchup_reply_handler(mc_main_t *mcm, mc_msg_catchup_reply_t *mp, u32 catchup_opaque)
static clib_error_t * ack_socket_read_ready(clib_file_t *uf)
#define clib_error_return(e, args...)
clib_file_main_t file_main
#define VLIB_BUFFER_DEFAULT_FREE_LIST_BYTES
static clib_error_t * setup_mutlicast_socket(mc_socket_main_t *msm, mc_multicast_socket_t *ms, char *type, uword udp_port)
mc_peer_id_t our_catchup_peer_id
#define vec_resize(V, N)
Resize a vector (no header, unspecified alignment) Add N elements to end of given vector V...
static clib_error_t * sendmsg_helper(mc_socket_main_t *msm, int socket, struct sockaddr_in *tx_addr, u32 buffer_index)
static uword pointer_to_uword(const void *p)
mc_socket_catchup_t * catchups
static u8 * format_socket_peer_id(u8 *s, va_list *args)
#define pool_elt_at_index(p, i)
Returns pointer to element at given index.
u32 base_multicast_udp_port_host_byte_order
static word find_and_bind_to_free_port(word sock, word port)
u16 current_length
Nbytes between current data and the end of this buffer.
static clib_error_t * catchup_server_write_ready(clib_file_t *uf)
#define clib_error_return_unix(e, args...)
static void * vlib_buffer_get_current(vlib_buffer_t *b)
Get pointer to current data to process.
#define pool_put(P, E)
Free an object E in pool P.
static u32 mc_socket_peer_id_get_port(mc_peer_id_t i)
static int find_interface_ip4_address(char *if_name, u32 *ip4_address, u32 *mtu)
static clib_error_t * mastership_socket_read_ready(clib_file_t *uf)
static uword append_buffer_index_to_iovec(vlib_main_t *vm, u32 buffer_index, struct iovec **iovs_return)
uword * catchup_index_by_file_descriptor
void(* file_update)(clib_file_t *file, clib_file_update_type_t update_type)
static void * catchup_add_pending_output(mc_socket_catchup_t *c, uword n_bytes, u8 *set_output_vector)
mc_peer_id_t our_ack_peer_id
static void catchup_send_fun(void *transport_main, uword opaque, u8 *data)
clib_error_t * mc_socket_main_init(mc_socket_main_t *msm, char **intfc_probe_list, int n_intfcs_to_probe)
static clib_error_t * from_relay_socket_read_ready(clib_file_t *uf)
static clib_error_t * catchup_socket_read_ready(clib_file_t *uf, int is_server)
#define vec_free(V)
Free vector's memory (no header).
static clib_error_t * tx_ack(void *transport, mc_peer_id_t dest_peer_id, u32 buffer_index)
static clib_error_t * to_relay_socket_read_ready(clib_file_t *uf)
#define clib_memcpy(a, b, c)
#define ELOG_TYPE(f, fmt)
#define ELOG_TYPE_DECLARE(f)
void mc_msg_user_request_handler(mc_main_t *mcm, mc_msg_user_request_t *mp, u32 buffer_index)
uword(* catchup_request_fun)(void *opaque, u32 stream_index, mc_peer_id_t catchup_peer_id)
static clib_error_t * join_socket_read_ready(clib_file_t *uf)
static clib_error_t * catchup_server_read_ready(clib_file_t *uf)
void mc_msg_join_or_leave_request_handler(mc_main_t *mcm, mc_msg_join_or_leave_request_t *req, u32 buffer_index)
static clib_error_t * catchup_socket_write_ready(clib_file_t *uf, int is_server)
static uword clib_file_add(clib_file_main_t *um, clib_file_t *template)
u32 next_buffer
Next buffer for this linked-list of buffers.
#define UNIX_FILE_DATA_AVAILABLE_TO_WRITE
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
struct sockaddr_in tx_addr
#define clib_unix_warning(format, args...)
void(* catchup_send_fun)(void *opaque, uword catchup_opaque, u8 *data_vector)
format_function_t * format_peer_id
static clib_error_t * recvmsg_helper(mc_socket_main_t *msm, int socket, struct sockaddr_in *rx_addr, u32 *buffer_index, u32 drop_message)
u32 if_ip4_address_net_byte_order
clib_error_t *(* tx_buffer)(void *opaque, mc_transport_type_t type, u32 buffer_index)
clib_error_t *(* tx_ack)(void *opaque, mc_peer_id_t peer_id, u32 buffer_index)
void mc_msg_catchup_request_handler(mc_main_t *mcm, mc_msg_catchup_request_t *req, u32 catchup_opaque)
#define clib_error_return_code(e, code, flags, args...)
u32 multicast_tx_ip4_address_host_byte_order
static clib_error_t * catchup_client_write_ready(clib_file_t *uf)
static void vlib_buffer_free_one(vlib_main_t *vm, u32 buffer_index)
Free one buffer Shorthand to free a single buffer chain.
char * multicast_interface_name
u32 flags
buffer flags: VLIB_BUFFER_FREE_LIST_INDEX_MASK: bits used to store free list index, VLIB_BUFFER_IS_TRACED: trace this buffer.
static u32 vlib_buffer_alloc(vlib_main_t *vm, u32 *buffers, u32 n_buffers)
Allocate buffers into supplied array.
struct vlib_main_t * vlib_main
static uword catchup_request_fun(void *transport_main, u32 stream_index, mc_peer_id_t catchup_peer_id)
static vlib_buffer_t * vlib_get_buffer(vlib_main_t *vm, u32 buffer_index)
Translate buffer index into buffer pointer.
void( mc_msg_handler_t)(mc_main_t *mcm, void *msg, u32 buffer_index)
static mc_socket_catchup_t * find_catchup_from_file_descriptor(mc_socket_main_t *msm, int file_descriptor)
static u32 mc_socket_peer_id_get_address(mc_peer_id_t i)
static void mc_byte_swap_msg_catchup_request(mc_msg_catchup_request_t *r)
static clib_error_t * socket_setup(mc_socket_main_t *msm)