21 #include <sys/ioctl.h> 22 #include <netinet/tcp.h> 27 u64 peer_id_as_u64 = va_arg (*args,
u64);
29 peer_id.
as_u64 = peer_id_as_u64;
43 u32 handler_frees_buffer,
51 h (mcm, the_msg, buffer_index);
52 if (! handler_frees_buffer)
59 struct iovec ** iovs_return)
63 u32 bi = buffer_index;
84 struct sockaddr_in * tx_addr,
89 word n_bytes, n_bytes_tx, n_retries;
91 memset (&h, 0,
sizeof (h));
93 h.msg_namelen =
sizeof (tx_addr[0]);
96 _vec_len (msm->
iovecs) = 0;
99 ASSERT (n_bytes <= msm->mc_main.transport.max_packet_size);
101 clib_error (
"sending packet larger than interace MTU %d bytes", n_bytes);
107 while ((n_bytes_tx = sendmsg (socket, &h, 0)) != n_bytes
110 if (n_bytes_tx != n_bytes)
118 .format =
"sendmsg-helper: %d retries",
121 struct {
u32 retries; } * ed = 0;
124 ed->retries = n_retries;
145 struct sockaddr_in tx_addr;
150 memset (&tx_addr, 0,
sizeof (tx_addr));
151 tx_addr.sin_family = AF_INET;
155 error =
sendmsg_helper (msm, msm->ack_socket, &tx_addr, buffer_index);
163 struct sockaddr_in * rx_addr,
169 uword n_left, n_alloc, n_mtu,
i, i_rx;
178 uword max_alloc = 8 * n_mtu;
181 _vec_len (msm->
rx_buffers) = n_left + n_alloc;
190 for (i = 0; i < n_mtu; i++)
194 msm->
iovecs[
i].iov_len = buffer_size;
196 _vec_len (msm->
iovecs) = n_mtu;
201 memset (&h, 0,
sizeof (h));
204 h.msg_name = rx_addr;
205 h.msg_namelen =
sizeof (rx_addr[0]);
210 n_bytes_left = recvmsg (socket, &h, 0);
211 if (n_bytes_left < 0)
228 b->
current_length = n_bytes_left < buffer_size ? n_bytes_left : buffer_size;
230 n_bytes_left -= buffer_size;
232 if (n_bytes_left <= 0)
278 if (! error && is_master)
282 mp->global_sequence = clib_host_to_net_u32 (mcm->relay_global_sequence);
283 mcm->relay_global_sequence++;
284 error =
sendmsg_helper (msm, ms_from_relay->socket, &ms_from_relay->tx_addr, bi);
323 switch (clib_host_to_net_u32 (mp->type))
325 case MC_MSG_TYPE_join_or_leave_request:
330 case MC_MSG_TYPE_join_reply:
399 _vec_len (c->input_vector) = l + n;
401 if (is_eof &&
vec_len (c->input_vector) > 0)
406 _vec_len (c->input_vector) = 0;
429 ELOG_TYPE (e,
"catchup_client_read_ready");
430 ELOG (&vm->elog_main, e, 0);
444 if (c->connect_in_progress)
448 c->connect_in_progress = 0;
449 len =
sizeof (value);
450 if (getsockopt (c->socket, SOL_SOCKET,
451 SO_ERROR, &value, &len) < 0)
471 if (n_this_write <= 0)
476 c->output_vector + c->output_vector_n_written,
478 }
while (n < 0 && errno == EAGAIN);
485 c->output_vector_n_written += n;
488 if (c->output_vector_n_written >=
vec_len (c->output_vector))
527 struct sockaddr_in client_addr;
533 memset(c, 0,
sizeof (c[0]));
535 client_len =
sizeof(client_addr);
538 c->socket = accept (uf->file_descriptor,
539 (
struct sockaddr *)&client_addr,
540 (socklen_t *)&client_len);
554 .format =
"catchup accepted from 0x%lx",
560 ed->addr = ntohl(client_addr.sin_addr.s_addr);
566 if ((setsockopt(c->socket, IPPROTO_TCP,
567 TCP_NODELAY, (
void *)&one,
sizeof(one))) < 0) {
575 template.file_descriptor = c->socket;
578 hash_set (msm->catchup_index_by_file_descriptor, c->socket, c - msm->catchups);
586 for (; port < 1 << 16; port++)
588 struct sockaddr_in
a;
590 memset (&a, 0,
sizeof(a));
592 a.sin_family = PF_INET;
593 a.sin_addr.s_addr = INADDR_ANY;
594 a.sin_port = htons (port);
596 if (bind (sock, (
struct sockaddr *) &a,
sizeof (a)) >= 0)
600 return port < 1 << 16 ? port : -1;
610 struct ip_mreq mcast_req;
616 if ((ms->
socket = socket (PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0)
622 if ((setsockopt(ms->
socket, IPPROTO_IP,
623 IP_MULTICAST_TTL, (
void *)&ttl,
sizeof(ttl))) < 0)
627 if (setsockopt(ms->
socket, SOL_SOCKET, SO_REUSEADDR, &one,
sizeof(one)) < 0)
631 ms->
tx_addr.sin_family = AF_INET;
633 ms->
tx_addr.sin_port = htons (udp_port);
639 memset (&mcast_req, 0,
sizeof (mcast_req));
643 if ((setsockopt(ms->
socket, IPPROTO_IP,
644 IP_ADD_MEMBERSHIP, (
void *)&mcast_req,
645 sizeof (mcast_req))) < 0)
648 if (ioctl (ms->
socket, FIONBIO, &one) < 0)
654 socklen_t sl =
sizeof (len);
655 if (setsockopt(ms->
socket, SOL_SOCKET, SO_SNDBUF, &len, sl) < 0)
705 msm->
ack_socket = socket (PF_INET, SOCK_DGRAM, IPPROTO_UDP);
711 if (ioctl (msm->
ack_socket, FIONBIO, &one) < 0)
732 template.private_data = (
uword) msm;
738 template.private_data = (
uword) msm;
744 template.private_data = (
uword) msm;
749 template.private_data = (
uword) msm;
755 template.private_data = (
uword) msm;
761 template.private_data = (
uword) msm;
775 if (set_output_vector)
797 struct sockaddr_in
addr;
802 memset (c, 0,
sizeof (*c));
804 c->
socket = socket(AF_INET, SOCK_STREAM, 0);
811 if (ioctl (c->
socket, FIONBIO, &one) < 0)
817 memset(&addr, 0,
sizeof(addr));
818 addr.sin_family = AF_INET;
827 .format =
"connecting to peer 0x%Lx",
830 struct {
u64 peer; } * ed;
832 ed->peer = catchup_peer_id.
as_u64;
835 if (connect(c->
socket, (
const void *)&addr,
sizeof(addr))
836 < 0 && errno != EINPROGRESS)
849 template.file_descriptor = c->
socket;
850 template.private_data = (
uword) msm;
857 mc_msg_catchup_request_t * mp;
860 mp->stream_index = stream_index;
879 struct sockaddr_in * sa;
882 fd = socket (PF_INET, AF_INET, 0);
888 ifr.ifr_addr.sa_family = AF_INET;
889 strncpy (ifr.ifr_name, if_name,
sizeof(ifr.ifr_name)-1);
890 if (ioctl (fd, SIOCGIFADDR, &ifr) < 0) {
895 sa = (
void *) &ifr.ifr_addr;
896 clib_memcpy (ip4_address, &sa->sin_addr.s_addr, sizeof (ip4_address[0]));
898 if (ioctl (fd, SIOCGIFMTU, &ifr) < 0)
901 *mtu = ifr.ifr_mtu - ( 20 + 8);
910 int n_intfcs_to_probe)
932 for (i = 0; i < n_intfcs_to_probe; i++)
void mc_main_init(mc_main_t *mcm, char *tag)
#define vec_validate(V, I)
Make sure vector is long enough for given index (no header, unspecified alignment) ...
#define clib_error_return_code(e, code, flags, args...)
#define hash_set(h, key, value)
sll srl srl sll sra u16x4 i
int catchup_server_socket
unix_file_function_t * read_function
#define hash_unset(h, key)
static void(BVT(clib_bihash)*h, BVT(clib_bihash_value)*v)
bad routing header type(not 4)") sr_error (NO_MORE_SEGMENTS
void mc_msg_master_assert_handler(mc_main_t *mcm, mc_msg_master_assert_t *mp, u32 buffer_index)
static_always_inline mc_peer_id_t mc_socket_set_peer_id(u32 address_net_byte_order, u32 port_host_byte_order)
always_inline uword unix_file_add(unix_main_t *um, unix_file_t *template)
#define clib_error(format, args...)
#define ELOG(em, f, data)
static clib_error_t * ack_socket_read_ready(unix_file_t *uf)
void mc_msg_user_ack_handler(mc_main_t *mcm, mc_msg_user_ack_t *mp, u32 buffer_index)
#define vec_add2(V, P, N)
Add N elements to end of vector V, return pointer to new elements in P.
static clib_error_t * catchup_server_read_ready(unix_file_t *uf)
clib_error_t *(* tx_buffer)(void *opaque, mc_transport_type_t type, u32 buffer_index)
void mc_msg_join_reply_handler(mc_main_t *mcm, mc_msg_join_reply_t *mp, u32 buffer_index)
always_inline void * vlib_buffer_get_current(vlib_buffer_t *b)
Get pointer to current data to process.
mc_multicast_socket_t multicast_sockets[MC_N_TRANSPORT_TYPE]
static clib_error_t * join_socket_read_ready(unix_file_t *uf)
static clib_error_t * catchup_socket_error_ready(unix_file_t *uf)
static clib_error_t * catchup_socket_read_ready(unix_file_t *uf, int is_server)
uword(* catchup_request_fun)(void *opaque, u32 stream_index, mc_peer_id_t catchup_peer_id)
i16 current_data
signed offset in data[], pre_data[] that we are currently processing.
static clib_error_t * tx_buffer(void *transport, mc_transport_type_t type, u32 buffer_index)
void mc_msg_catchup_reply_handler(mc_main_t *mcm, mc_msg_catchup_reply_t *mp, u32 catchup_opaque)
always_inline u32 mc_socket_peer_id_get_address(mc_peer_id_t i)
void(* catchup_send_fun)(void *opaque, uword catchup_opaque, u8 *data_vector)
static clib_error_t * setup_mutlicast_socket(mc_socket_main_t *msm, mc_multicast_socket_t *ms, char *type, uword udp_port)
mc_peer_id_t our_catchup_peer_id
#define vec_resize(V, N)
Resize a vector (no header, unspecified alignment) Add N elements to end of given vector V...
static clib_error_t * sendmsg_helper(mc_socket_main_t *msm, int socket, struct sockaddr_in *tx_addr, u32 buffer_index)
always_inline void mc_byte_swap_msg_catchup_request(mc_msg_catchup_request_t *r)
static uword pointer_to_uword(const void *p)
#define VLIB_BUFFER_NEXT_PRESENT
mc_socket_catchup_t * catchups
always_inline void vlib_buffer_free_one(vlib_main_t *vm, u32 buffer_index)
Free one buffer Shorthand to free a single buffer chain.
static u8 * format_socket_peer_id(u8 *s, va_list *args)
#define pool_elt_at_index(p, i)
u32 base_multicast_udp_port_host_byte_order
static word find_and_bind_to_free_port(word sock, word port)
#define VLIB_BUFFER_DEFAULT_FREE_LIST_BYTES
u16 current_length
Nbytes between current data and the end of this buffer.
static clib_error_t * catchup_client_write_ready(unix_file_t *uf)
#define clib_error_return_unix(e, args...)
static clib_error_t * from_relay_socket_read_ready(unix_file_t *uf)
static int find_interface_ip4_address(char *if_name, u32 *ip4_address, u32 *mtu)
static uword append_buffer_index_to_iovec(vlib_main_t *vm, u32 buffer_index, struct iovec **iovs_return)
uword * catchup_index_by_file_descriptor
static void * catchup_add_pending_output(mc_socket_catchup_t *c, uword n_bytes, u8 *set_output_vector)
clib_error_t *(* tx_ack)(void *opaque, mc_peer_id_t peer_id, u32 buffer_index)
mc_peer_id_t our_ack_peer_id
static void catchup_send_fun(void *transport_main, uword opaque, u8 *data)
clib_error_t * mc_socket_main_init(mc_socket_main_t *msm, char **intfc_probe_list, int n_intfcs_to_probe)
static void catchup_cleanup(mc_socket_main_t *msm, mc_socket_catchup_t *c, unix_main_t *um, unix_file_t *uf)
static clib_error_t * to_relay_socket_read_ready(unix_file_t *uf)
#define vec_free(V)
Free vector's memory (no header).
static clib_error_t * tx_ack(void *transport, mc_peer_id_t dest_peer_id, u32 buffer_index)
#define clib_memcpy(a, b, c)
#define clib_unix_warning(format, args...)
#define ELOG_TYPE(f, fmt)
#define ELOG_TYPE_DECLARE(f)
void mc_msg_user_request_handler(mc_main_t *mcm, mc_msg_user_request_t *mp, u32 buffer_index)
static clib_error_t * catchup_socket_write_ready(unix_file_t *uf, int is_server)
always_inline void unix_file_del(unix_main_t *um, unix_file_t *f)
void mc_msg_join_or_leave_request_handler(mc_main_t *mcm, mc_msg_join_or_leave_request_t *req, u32 buffer_index)
always_inline void msg_handler(mc_main_t *mcm, u32 buffer_index, u32 handler_frees_buffer, void *_h)
static clib_error_t * catchup_client_read_ready(unix_file_t *uf)
static clib_error_t * catchup_server_write_ready(unix_file_t *uf)
u32 next_buffer
Next buffer for this linked-list of buffers.
u32 vlib_buffer_alloc(vlib_main_t *vm, u32 *buffers, u32 n_buffers)
Allocate buffers into supplied array.
#define clib_unix_error(format, args...)
always_inline u32 mc_socket_peer_id_get_port(mc_peer_id_t i)
#define UNIX_FILE_DATA_AVAILABLE_TO_WRITE
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
static clib_error_t * catchup_listen_read_ready(unix_file_t *uf)
struct sockaddr_in tx_addr
format_function_t * format_peer_id
void(* file_update)(unix_file_t *file, unix_file_update_type_t update_type)
static clib_error_t * recvmsg_helper(mc_socket_main_t *msm, int socket, struct sockaddr_in *rx_addr, u32 *buffer_index, u32 drop_message)
u32 if_ip4_address_net_byte_order
static clib_error_t * mastership_socket_read_ready(unix_file_t *uf)
void mc_msg_catchup_request_handler(mc_main_t *mcm, mc_msg_catchup_request_t *req, u32 catchup_opaque)
u32 multicast_tx_ip4_address_host_byte_order
#define clib_error_return(e, args...)
char * multicast_interface_name
u32 flags
buffer flags: VLIB_BUFFER_IS_TRACED: trace this buffer.
struct vlib_main_t * vlib_main
always_inline vlib_buffer_t * vlib_get_buffer(vlib_main_t *vm, u32 buffer_index)
Translate buffer index into buffer pointer.
static uword catchup_request_fun(void *transport_main, u32 stream_index, mc_peer_id_t catchup_peer_id)
void( mc_msg_handler_t)(mc_main_t *mcm, void *msg, u32 buffer_index)
static mc_socket_catchup_t * find_catchup_from_file_descriptor(mc_socket_main_t *msm, int file_descriptor)
static clib_error_t * socket_setup(mc_socket_main_t *msm)