21 #include <sys/ioctl.h> 22 #include <netinet/tcp.h> 28 u64 peer_id_as_u64 = va_arg (*args,
u64);
30 peer_id.
as_u64 = peer_id_as_u64;
44 u32 buffer_index,
u32 handler_frees_buffer,
void *_h)
51 h (mcm, the_msg, buffer_index);
52 if (!handler_frees_buffer)
58 u32 buffer_index,
struct iovec **iovs_return)
62 u32 bi = buffer_index;
82 int socket,
struct sockaddr_in *tx_addr,
u32 buffer_index)
86 word n_bytes, n_bytes_tx, n_retries;
88 memset (&h, 0,
sizeof (h));
90 h.msg_namelen =
sizeof (tx_addr[0]);
93 _vec_len (msm->
iovecs) = 0;
96 ASSERT (n_bytes <= msm->mc_main.transport.max_packet_size);
98 clib_error (
"sending packet larger than interace MTU %d bytes", n_bytes);
104 while ((n_bytes_tx = sendmsg (socket, &h, 0)) != n_bytes
107 if (n_bytes_tx != n_bytes)
116 .format =
"sendmsg-helper: %d retries",.format_args =
"i4",};
123 ed->retries = n_retries;
144 struct sockaddr_in tx_addr;
149 memset (&tx_addr, 0,
sizeof (tx_addr));
150 tx_addr.sin_family = AF_INET;
154 error =
sendmsg_helper (msm, msm->ack_socket, &tx_addr, buffer_index);
162 struct sockaddr_in *rx_addr,
163 u32 * buffer_index,
u32 drop_message)
167 uword n_left, n_alloc, n_mtu,
i, i_rx;
176 uword max_alloc = 8 * n_mtu;
180 _vec_len (msm->
rx_buffers) = n_left + n_alloc;
189 for (i = 0; i < n_mtu; i++)
193 msm->
iovecs[
i].iov_len = buffer_size;
195 _vec_len (msm->
iovecs) = n_mtu;
200 memset (&h, 0,
sizeof (h));
203 h.msg_name = rx_addr;
204 h.msg_namelen =
sizeof (rx_addr[0]);
209 n_bytes_left = recvmsg (socket, &h, 0);
210 if (n_bytes_left < 0)
228 n_bytes_left < buffer_size ? n_bytes_left : buffer_size;
230 n_bytes_left -= buffer_size;
232 if (n_bytes_left <= 0)
284 if (!error && is_master)
288 mp->global_sequence = clib_host_to_net_u32 (mcm->relay_global_sequence);
289 mcm->relay_global_sequence++;
291 sendmsg_helper (msm, ms_from_relay->socket, &ms_from_relay->tx_addr,
336 switch (clib_host_to_net_u32 (mp->type))
338 case MC_MSG_TYPE_join_or_leave_request:
343 case MC_MSG_TYPE_join_reply:
420 _vec_len (
c->input_vector) = l + n;
422 if (is_eof &&
vec_len (
c->input_vector) > 0)
428 _vec_len (
c->input_vector) = 0;
456 ELOG_TYPE (e,
"catchup_client_read_ready");
457 ELOG (&vm->elog_main, e, 0);
472 if (
c->connect_in_progress)
476 c->connect_in_progress = 0;
477 len =
sizeof (value);
478 if (getsockopt (
c->socket, SOL_SOCKET, SO_ERROR, &value, &len) < 0)
501 if (n_this_write <= 0)
507 c->output_vector +
c->output_vector_n_written,
510 while (n < 0 && errno == EAGAIN);
517 c->output_vector_n_written += n;
520 if (
c->output_vector_n_written >=
vec_len (
c->output_vector))
566 struct sockaddr_in client_addr;
572 memset (
c, 0,
sizeof (
c[0]));
574 client_len =
sizeof (client_addr);
577 c->socket = accept (uf->file_descriptor,
578 (
struct sockaddr *) &client_addr,
579 (socklen_t *) & client_len);
594 .format =
"catchup accepted from 0x%lx",.format_args =
"i4",};
601 ed->addr = ntohl (client_addr.sin_addr.s_addr);
607 if ((setsockopt (
c->socket, IPPROTO_TCP,
608 TCP_NODELAY, (
void *) &one, sizeof (one))) < 0)
617 template.file_descriptor =
c->socket;
620 hash_set (msm->catchup_index_by_file_descriptor,
c->socket,
630 for (; port < 1 << 16; port++)
632 struct sockaddr_in
a;
634 memset (&a, 0,
sizeof (a));
636 a.sin_family = PF_INET;
637 a.sin_addr.s_addr = INADDR_ANY;
638 a.sin_port = htons (port);
640 if (bind (sock, (
struct sockaddr *) &a,
sizeof (a)) >= 0)
644 return port < 1 << 16 ? port : -1;
650 char *type,
uword udp_port)
653 struct ip_mreq mcast_req;
659 if ((ms->
socket = socket (PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0)
665 if ((setsockopt (ms->
socket, IPPROTO_IP,
666 IP_MULTICAST_TTL, (
void *) &ttl, sizeof (ttl))) < 0)
670 if (setsockopt (ms->
socket, SOL_SOCKET, SO_REUSEADDR, &one, sizeof (one)) <
675 ms->
tx_addr.sin_family = AF_INET;
678 ms->
tx_addr.sin_port = htons (udp_port);
684 memset (&mcast_req, 0,
sizeof (mcast_req));
685 mcast_req.imr_multiaddr.s_addr =
689 if ((setsockopt (ms->
socket, IPPROTO_IP,
690 IP_ADD_MEMBERSHIP, (
void *) &mcast_req,
691 sizeof (mcast_req))) < 0)
695 if (ioctl (ms->
socket, FIONBIO, &one) < 0)
701 socklen_t sl =
sizeof (len);
702 if (setsockopt (ms->
socket, SOL_SOCKET, SO_SNDBUF, &len, sl) < 0)
746 "from relay", port++);
751 msm->
ack_socket = socket (PF_INET, SOCK_DGRAM, IPPROTO_UDP);
757 if (ioctl (msm->
ack_socket, FIONBIO, &one) < 0)
778 template.file_descriptor =
780 template.private_data = (
uword) msm;
785 template.file_descriptor =
787 template.private_data = (
uword) msm;
792 template.file_descriptor =
794 template.private_data = (
uword) msm;
798 template.file_descriptor =
800 template.private_data = (
uword) msm;
806 template.private_data = (
uword) msm;
812 template.private_data = (
uword) msm;
821 u8 * set_output_vector)
827 if (set_output_vector)
849 struct sockaddr_in
addr;
854 memset (c, 0,
sizeof (*c));
856 c->
socket = socket (AF_INET, SOCK_STREAM, 0);
863 if (ioctl (c->
socket, FIONBIO, &one) < 0)
869 memset (&addr, 0,
sizeof (addr));
870 addr.sin_family = AF_INET;
880 .format =
"connecting to peer 0x%Lx",.format_args =
"i8",};
886 ed->peer = catchup_peer_id.
as_u64;
889 if (connect (c->
socket, (
const void *) &addr, sizeof (addr))
890 < 0 && errno != EINPROGRESS)
903 template.file_descriptor = c->
socket;
904 template.private_data = (
uword) msm;
912 mc_msg_catchup_request_t *mp;
916 mp->stream_index = stream_index;
936 struct sockaddr_in *sa;
939 fd = socket (PF_INET, AF_INET, 0);
946 ifr.ifr_addr.sa_family = AF_INET;
947 strncpy (ifr.ifr_name, if_name, sizeof (ifr.ifr_name) - 1);
948 if (ioctl (fd, SIOCGIFADDR, &ifr) < 0)
955 sa = (
void *) &ifr.ifr_addr;
956 clib_memcpy (ip4_address, &sa->sin_addr.s_addr, sizeof (ip4_address[0]));
958 if (ioctl (fd, SIOCGIFMTU, &ifr) < 0)
964 *mtu = ifr.ifr_mtu - ( 20 + 8);
973 int n_intfcs_to_probe)
997 for (i = 0; i < n_intfcs_to_probe; i++)
void mc_main_init(mc_main_t *mcm, char *tag)
#define vec_validate(V, I)
Make sure vector is long enough for given index (no header, unspecified alignment) ...
static void msg_handler(mc_main_t *mcm, u32 buffer_index, u32 handler_frees_buffer, void *_h)
#define hash_set(h, key, value)
sll srl srl sll sra u16x4 i
int catchup_server_socket
unix_file_function_t * read_function
#define hash_unset(h, key)
#define clib_error(format, args...)
void mc_msg_master_assert_handler(mc_main_t *mcm, mc_msg_master_assert_t *mp, u32 buffer_index)
static_always_inline mc_peer_id_t mc_socket_set_peer_id(u32 address_net_byte_order, u32 port_host_byte_order)
#define ELOG(em, f, data)
static clib_error_t * ack_socket_read_ready(unix_file_t *uf)
void mc_msg_user_ack_handler(mc_main_t *mcm, mc_msg_user_ack_t *mp, u32 buffer_index)
#define vec_add2(V, P, N)
Add N elements to end of vector V, return pointer to new elements in P.
static clib_error_t * catchup_server_read_ready(unix_file_t *uf)
void mc_msg_join_reply_handler(mc_main_t *mcm, mc_msg_join_reply_t *mp, u32 buffer_index)
#define pool_get(P, E)
Allocate an object E from a pool P (unspecified alignment).
#define clib_unix_error(format, args...)
mc_multicast_socket_t multicast_sockets[MC_N_TRANSPORT_TYPE]
static clib_error_t * join_socket_read_ready(unix_file_t *uf)
static clib_error_t * catchup_socket_error_ready(unix_file_t *uf)
static clib_error_t * catchup_socket_read_ready(unix_file_t *uf, int is_server)
#define VLIB_BUFFER_NEXT_PRESENT
i16 current_data
signed offset in data[], pre_data[] that we are currently processing.
static clib_error_t * tx_buffer(void *transport, mc_transport_type_t type, u32 buffer_index)
void mc_msg_catchup_reply_handler(mc_main_t *mcm, mc_msg_catchup_reply_t *mp, u32 catchup_opaque)
#define clib_error_return(e, args...)
#define VLIB_BUFFER_DEFAULT_FREE_LIST_BYTES
static clib_error_t * setup_mutlicast_socket(mc_socket_main_t *msm, mc_multicast_socket_t *ms, char *type, uword udp_port)
mc_peer_id_t our_catchup_peer_id
#define vec_resize(V, N)
Resize a vector (no header, unspecified alignment) Add N elements to end of given vector V...
static clib_error_t * sendmsg_helper(mc_socket_main_t *msm, int socket, struct sockaddr_in *tx_addr, u32 buffer_index)
static uword pointer_to_uword(const void *p)
mc_socket_catchup_t * catchups
static uword unix_file_add(unix_main_t *um, unix_file_t *template)
static u8 * format_socket_peer_id(u8 *s, va_list *args)
#define pool_elt_at_index(p, i)
Returns pointer to element at given index.
u32 base_multicast_udp_port_host_byte_order
static word find_and_bind_to_free_port(word sock, word port)
u16 current_length
Nbytes between current data and the end of this buffer.
static clib_error_t * catchup_client_write_ready(unix_file_t *uf)
#define clib_error_return_unix(e, args...)
static void * vlib_buffer_get_current(vlib_buffer_t *b)
Get pointer to current data to process.
#define pool_put(P, E)
Free an object E in pool P.
static clib_error_t * from_relay_socket_read_ready(unix_file_t *uf)
static u32 mc_socket_peer_id_get_port(mc_peer_id_t i)
static int find_interface_ip4_address(char *if_name, u32 *ip4_address, u32 *mtu)
static uword append_buffer_index_to_iovec(vlib_main_t *vm, u32 buffer_index, struct iovec **iovs_return)
uword * catchup_index_by_file_descriptor
static void * catchup_add_pending_output(mc_socket_catchup_t *c, uword n_bytes, u8 *set_output_vector)
mc_peer_id_t our_ack_peer_id
static void catchup_send_fun(void *transport_main, uword opaque, u8 *data)
clib_error_t * mc_socket_main_init(mc_socket_main_t *msm, char **intfc_probe_list, int n_intfcs_to_probe)
static void catchup_cleanup(mc_socket_main_t *msm, mc_socket_catchup_t *c, unix_main_t *um, unix_file_t *uf)
static clib_error_t * to_relay_socket_read_ready(unix_file_t *uf)
#define vec_free(V)
Free vector's memory (no header).
static clib_error_t * tx_ack(void *transport, mc_peer_id_t dest_peer_id, u32 buffer_index)
#define clib_memcpy(a, b, c)
#define ELOG_TYPE(f, fmt)
#define ELOG_TYPE_DECLARE(f)
#define UNIX_FILE_DATA_AVAILABLE_TO_WRITE
void mc_msg_user_request_handler(mc_main_t *mcm, mc_msg_user_request_t *mp, u32 buffer_index)
static clib_error_t * catchup_socket_write_ready(unix_file_t *uf, int is_server)
uword(* catchup_request_fun)(void *opaque, u32 stream_index, mc_peer_id_t catchup_peer_id)
void mc_msg_join_or_leave_request_handler(mc_main_t *mcm, mc_msg_join_or_leave_request_t *req, u32 buffer_index)
static clib_error_t * catchup_client_read_ready(unix_file_t *uf)
static clib_error_t * catchup_server_write_ready(unix_file_t *uf)
u32 next_buffer
Next buffer for this linked-list of buffers.
void(* file_update)(unix_file_t *file, unix_file_update_type_t update_type)
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
static clib_error_t * catchup_listen_read_ready(unix_file_t *uf)
struct sockaddr_in tx_addr
#define clib_unix_warning(format, args...)
void(* catchup_send_fun)(void *opaque, uword catchup_opaque, u8 *data_vector)
format_function_t * format_peer_id
static clib_error_t * recvmsg_helper(mc_socket_main_t *msm, int socket, struct sockaddr_in *rx_addr, u32 *buffer_index, u32 drop_message)
u32 if_ip4_address_net_byte_order
clib_error_t *(* tx_buffer)(void *opaque, mc_transport_type_t type, u32 buffer_index)
static clib_error_t * mastership_socket_read_ready(unix_file_t *uf)
clib_error_t *(* tx_ack)(void *opaque, mc_peer_id_t peer_id, u32 buffer_index)
void mc_msg_catchup_request_handler(mc_main_t *mcm, mc_msg_catchup_request_t *req, u32 catchup_opaque)
#define clib_error_return_code(e, code, flags, args...)
u32 multicast_tx_ip4_address_host_byte_order
static void vlib_buffer_free_one(vlib_main_t *vm, u32 buffer_index)
Free one buffer Shorthand to free a single buffer chain.
char * multicast_interface_name
u32 flags
buffer flags: VLIB_BUFFER_IS_TRACED: trace this buffer.
static u32 vlib_buffer_alloc(vlib_main_t *vm, u32 *buffers, u32 n_buffers)
Allocate buffers into supplied array.
struct vlib_main_t * vlib_main
static uword catchup_request_fun(void *transport_main, u32 stream_index, mc_peer_id_t catchup_peer_id)
static vlib_buffer_t * vlib_get_buffer(vlib_main_t *vm, u32 buffer_index)
Translate buffer index into buffer pointer.
void( mc_msg_handler_t)(mc_main_t *mcm, void *msg, u32 buffer_index)
static mc_socket_catchup_t * find_catchup_from_file_descriptor(mc_socket_main_t *msm, int file_descriptor)
static u32 mc_socket_peer_id_get_address(mc_peer_id_t i)
static void mc_byte_swap_msg_catchup_request(mc_msg_catchup_request_t *r)
static void unix_file_del(unix_main_t *um, unix_file_t *f)
static clib_error_t * socket_setup(mc_socket_main_t *msm)