36 session->session_index = session - em->
sessions;
45 CHECK (ECHO_FAIL_TEST_ASSERT_RX_TOTAL,
47 "Invalid amount of data received");
48 CHECK (ECHO_FAIL_TEST_ASSERT_TX_TOTAL,
50 "Invalid amount of data sent");
52 CHECK (ECHO_FAIL_TEST_ASSERT_ALL_SESSIONS_CLOSED,
54 "Some sessions are still open");
67 ECHO_FAIL (ECHO_FAIL_SEND_IO_EVT,
"app_send_io_evt_to_vpp errored %d",
90 ECHO_FAIL (ECHO_FAIL_SOCKET_CONNECT,
"socket connect failed");
96 ECHO_FAIL (ECHO_FAIL_INIT_SHM_API,
"init shm api failed");
104 ECHO_FAIL (ECHO_FAIL_SHMEM_CONNECT,
"shmem connect failed");
122 f64 deltat = start_evt_missing || end_evt_missing ? 0 :
125 if (start_evt_missing)
126 ECHO_FAIL (ECHO_FAIL_MISSING_START_EVENT,
127 "Expected event %v to happen, but it did not!", start_evt);
131 "Expected event %v to happen, but it did not!", end_evt);
133 fformat (stdout,
"vpp_echo JSON stats:\n{\n");
134 fformat (stdout,
" \"role\": \"%s\",\n",
136 fformat (stdout,
" \"time\": \"%.9f\",\n", deltat);
137 fformat (stdout,
" \"start_evt\": \"%v\",\n", start_evt);
138 fformat (stdout,
" \"start_evt_missing\": \"%s\",\n",
139 start_evt_missing ?
"True" :
"False");
140 fformat (stdout,
" \"end_evt\": \"%v\",\n", end_evt);
141 fformat (stdout,
" \"end_evt_missing\": \"%s\",\n",
142 end_evt_missing ?
"True" :
"False");
145 fformat (stdout,
" \"closing\": {\n");
146 fformat (stdout,
" \"reset\": { \"q\": %d, \"s\": %d },\n",
148 fformat (stdout,
" \"close\": { \"q\": %d, \"s\": %d },\n",
150 fformat (stdout,
" \"active\": { \"q\": %d, \"s\": %d },\n",
152 fformat (stdout,
" \"clean\": { \"q\": %d, \"s\": %d }\n",
155 fformat (stdout,
" \"results\": {\n");
174 f64 deltat = start_evt_missing || end_evt_missing ? 0 :
177 if (start_evt_missing)
178 ECHO_FAIL (ECHO_FAIL_MISSING_START_EVENT,
179 "Expected event %v to happen, but it did not!", start_evt);
183 "Expected event %v to happen, but it did not!", end_evt);
185 fformat (stdout,
"Timing %v:%v\n", start_evt, end_evt);
186 if (start_evt_missing)
187 fformat (stdout,
"Missing Start Timing Event (%v)!\n", start_evt);
189 fformat (stdout,
"Missing End Timing Event (%v)!\n", end_evt);
190 fformat (stdout,
"-------- TX --------\n");
191 fformat (stdout,
"%lld bytes (%lld mbytes, %lld gbytes) in %.6f seconds\n",
195 fformat (stdout,
"%.4f Gbit/second\n",
197 fformat (stdout,
"-------- RX --------\n");
198 fformat (stdout,
"%lld bytes (%lld mbytes, %lld gbytes) in %.6f seconds\n",
202 fformat (stdout,
"%.4f Gbit/second\n",
204 fformat (stdout,
"--------------------\n");
205 fformat (stdout,
"Received close on %d streams (and %d Quic conn)\n",
207 fformat (stdout,
"Received reset on %d streams (and %d Quic conn)\n",
209 fformat (stdout,
"Sent close on %d streams (and %d Quic conn)\n",
211 fformat (stdout,
"Discarded %d streams (and %d Quic conn)\n",
224 ECHO_LOG (1,
"[%lu/%lu] -> S(%x) -> [%lu/%lu]",
241 u32 *session_indexes = 0, *session_index;
246 if (s->session_state == ECHO_SESSION_STATE_CLOSED)
247 vec_add1 (session_indexes, s->session_index);}
267 for (i = 0; i < n_read; i++)
272 ECHO_LOG (0,
"Session 0x%lx byte %lld was 0x%x expected 0x%x",
277 ECHO_LOG (0,
"Too many errors, hiding next ones");
279 ECHO_FAIL (ECHO_FAIL_TEST_BYTES_ERR,
"test-bytes errored");
296 s->bytes_received += n_read;
297 s->bytes_to_receive -= n_read;
306 if (!bytes_this_chunk)
312 s->bytes_to_send -= n_sent;
313 s->bytes_sent += n_sent;
321 while (n_sent < len && !em->time_to_stop)
348 int n_read, n_sent = 0;
383 if (n_sent || n_read)
390 ECHO_LOG (1,
"Idle FIFOs TX:%dB RX:%dB",
409 ECHO_LOG (1,
"Thread %u exiting, no sessions to care for", idx);
415 u32 n_closed_sessions = 0;
421 for (i = 0; !em->
time_to_stop; i = (i + 1) % thread_n_sessions)
423 n_closed_sessions = i == 0 ? 0 : n_closed_sessions;
428 switch (s->session_state)
446 if (n_closed_sessions == thread_n_sessions)
460 clib_net_to_host_u32 (mp->
retval));
465 clib_net_to_host_u16 (mp->
lcl_port));
491 ECHO_FAIL (ECHO_FAIL_ACCEPTED_WAIT_FOR_SEG_ALLOC,
492 "accepted wait_for_segment_allocation errored");
497 rx_fifo->client_session_index = session->session_index;
499 tx_fifo->client_session_index = session->session_index;
501 session->rx_fifo = rx_fifo;
502 session->tx_fifo = tx_fifo;
506 sizeof (ip46_address_t));
507 session->transport.is_ip4 = mp->
rmt.is_ip4;
508 session->transport.rmt_port = mp->
rmt.port;
510 sizeof (ip46_address_t));
547 clib_net_to_host_u32 (mp->
retval));
554 ECHO_FAIL (ECHO_FAIL_CONNECTED_WAIT_FOR_SEG_ALLOC,
555 "connected wait_for_segment_allocation errored");
560 rx_fifo->client_session_index = session->session_index;
562 tx_fifo->client_session_index = session->session_index;
564 session->rx_fifo = rx_fifo;
565 session->tx_fifo = tx_fifo;
573 sizeof (ip46_address_t));
574 session->transport.is_ip4 = mp->
lcl.is_ip4;
575 session->transport.lcl_port = mp->
lcl.port;
577 sizeof (ip46_address_t));
582 session->session_index, 0 );
635 switch (e->event_type)
655 ECHO_LOG (0,
"unhandled event %u", e->event_type);
691 for (i = 0; i < n_msgs; i++)
716 ECHO_FAIL (ECHO_FAIL_APP_ATTACH,
"Application failed to attach");
731 for (i = 0; i <
vec_len (msg_vec); i++)
767 ECHO_FAIL (ECHO_FAIL_SERVER_DISCONNECT_TIMEOUT,
768 "Timeout waiting for state disconnected");
779 "Usage: vpp_echo [socket-name SOCKET] [client|server] [uri URI] [OPTIONS]\n" 780 "Generates traffic and assert correct teardown of the QUIC hoststack\n" 782 " socket-name PATH Specify the binary socket path to connect to VPP\n" 783 " use-svm-api Use SVM API to connect to VPP\n" 784 " test-bytes[:assert] Check data correctness when receiving (assert fails on first error)\n" 785 " fifo-size N Use N Kb fifos\n" 786 " mq-size N Use N event slots for vpp_echo <-> vpp events\n" 787 " rx-buf N[Kb|Mb|GB] Use N[Kb|Mb|GB] RX buffer\n" 788 " tx-buf N[Kb|Mb|GB] Use N[Kb|Mb|GB] TX test buffer\n" 789 " appns NAMESPACE Use the namespace NAMESPACE\n" 790 " all-scope all-scope option\n" 791 " local-scope local-scope option\n" 792 " global-scope global-scope option\n" 793 " secret SECRET set namespace secret\n" 794 " chroot prefix PATH Use PATH as memory root path\n" 795 " sclose=[Y|N|W] When a stream is done, pass[N] send[Y] or wait[W] for close\n" 797 " time START:END Time between evts START & END, events being :\n" 798 " start - Start of the app\n" 799 " qconnect - first Connection connect sent\n" 800 " qconnected - last Connection connected\n" 801 " sconnect - first Stream connect sent\n" 802 " sconnected - last Stream got connected\n" 803 " lastbyte - Last expected byte received\n" 804 " exit - Exiting of the app\n" 805 " json Output global stats in json\n" 806 " log=N Set the log level to [0: no output, 1:errors, 2:log]\n" 808 " nclients N Open N clients sending data\n" 809 " nthreads N Use N busy loop threads for data [in addition to main & msg queue]\n" 810 " TX=1337[Kb|Mb|GB] Send 1337 [K|M|G]bytes, use TX=RX to reflect the data\n" 811 " RX=1337[Kb|Mb|GB] Expect 1337 [K|M|G]bytes\n" "\n");
818 fprintf (stderr,
"\nDefault configuration is :\n" 819 " server nclients 1/1 RX=64Kb TX=RX\n" 820 " client nclients 1/1 RX=64Kb TX=64Kb\n");
866 else if (
unformat (a,
"chroot prefix %s", &chroot_prefix))
868 else if (
unformat (a,
"uri %s", &uri))
874 else if (
unformat (a,
"test-bytes:assert"))
876 else if (
unformat (a,
"test-bytes"))
880 else if (
unformat (a,
"use-svm-api"))
882 else if (
unformat (a,
"fifo-size %d", &tmp))
902 em->
appns_flags |= (APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE
903 | APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE);
904 else if (
unformat (a,
"local-scope"))
905 em->
appns_flags = APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE;
906 else if (
unformat (a,
"global-scope"))
907 em->
appns_flags = APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
978 ECHO_FAIL (ECHO_FAIL_INVALID_URI,
"Unable to process uri");
983 static void __clib_constructor
1000 u32 rpc_queue_size = 64 << 10;
1026 em->
uri =
format (0,
"%s%c",
"tcp://0.0.0.0/1234", 0);
1033 ECHO_FAIL (ECHO_FAIL_PROTOCOL_NOT_SUPPORTED,
1034 "Protocol %U is not supported",
1057 cfg->consumer_pid = getpid ();
1059 cfg->q_nitems = rpc_queue_size;
1060 cfg->ring_cfgs = rc;
1068 app_name = em->i_am_master ?
"echo_server" :
"echo_client";
1072 ECHO_FAIL (ECHO_FAIL_CONNECT_TO_VPP,
"Couldn't connect to vpp");
1083 "Couldn't attach to vpp, did you run <session enable> ?");
1086 if (pthread_create (&em->mq_thread_handle,
1089 ECHO_FAIL (ECHO_FAIL_PTHREAD_CREATE,
"pthread create errored");
1092 for (
i = 0;
i < em->n_rx_threads;
i++)
1093 if (pthread_create (&em->data_thread_handles[
i],
1097 "pthread create errored (index %d)",
i);
1100 if (em->i_am_master)
1110 ECHO_FAIL (ECHO_FAIL_DETACH,
"Couldn't detach from vpp");
1114 pthread_join (em->mq_thread_handle, (
void **) &rv);
1117 ECHO_FAIL (ECHO_FAIL_MQ_PTHREAD,
"mq pthread errored %d", rv);
1120 if (em->use_sock_api)
1126 if (em->output_json)
1131 exit (em->has_failed);
#define vec_validate(V, I)
Make sure vector is long enough for given index (no header, unspecified alignment) ...
uword unformat_data(unformat_input_t *input, va_list *args)
static void echo_session_prealloc(echo_main_t *em)
void * svm_msg_q_msg_data(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Get data for message in queue.
teardown_stat_t clean_count
static_always_inline void clib_spinlock_unlock(clib_spinlock_t *p)
int vl_socket_client_init_shm(vl_api_shm_elem_config_t *config, int want_pthread)
static_always_inline void clib_spinlock_lock(clib_spinlock_t *p)
u8 * echo_format_timing_event(u8 *s, va_list *args)
struct echo_main_t::@483 uri_elts
int vl_client_connect_to_vlib(const char *svm_name, const char *client_name, int rx_queue_size)
echo_session_t * sessions
static void * echo_mq_thread_fn(void *arg)
clib_spinlock_t sid_vpp_handles_lock
static int echo_process_each_proto_opts(unformat_input_t *a)
u8 use_sock_api
Flag that decides if socket, instead of svm, api is used to connect to vpp.
svm_queue_t * vl_input_queue
uword vpp_event_queue_address
int my_client_index
All VLIB-side message handlers use my_client_index to identify the queue / client.
#define clib_memcpy_fast(a, b, c)
static void echo_session_dequeue_notify(echo_session_t *s)
static void test_recv_bytes(echo_main_t *em, echo_session_t *s, u8 *rx_buf, u32 n_read)
static u8 * format_api_error(u8 *s, va_list *args)
clib_memset(h->entries, 0, sizeof(h->entries[0]) *entries)
void echo_process_uri(echo_main_t *em)
static f64 clib_time_now(clib_time_t *c)
static int mirror_data_chunk(echo_main_t *em, echo_session_t *s, u8 *tx_buf, u64 len)
uword echo_unformat_timing_event(unformat_input_t *input, va_list *args)
static u8 svm_msg_q_is_empty(svm_msg_q_t *mq)
Check if message queue is empty.
#define vec_add2(V, P, N)
Add N elements to end of vector V, return pointer to new elements in P.
static void print_global_json_stats(echo_main_t *em)
int vl_socket_client_connect(char *socket_path, char *client_name, u32 socket_buffer_size)
volatile connection_state_t state
void(* reset_cb)(session_reset_msg_t *mp, echo_session_t *s)
#define pool_get(P, E)
Allocate an object E from a pool P (unspecified alignment).
static void echo_free_sessions(echo_main_t *em)
int wait_for_state_change(echo_main_t *em, connection_state_t state, f64 timeout)
void echo_send_disconnect_session(u64 handle, u32 opaque)
static void session_accepted_handler(session_accepted_msg_t *mp)
foreach_app_session_field u64 vpp_session_handle
#define vec_reset_length(v)
Reset vector length to zero NULL-pointer tolerant.
static void handle_mq_event(session_event_t *e)
struct _svm_fifo svm_fifo_t
void(* connected_cb)(session_connected_bundled_msg_t *mp, u32 session_index, u8 is_failed)
static void echo_check_closed_listener(echo_main_t *em, echo_session_t *s)
static void session_reset_handler(session_reset_msg_t *mp)
static void echo_process_rpcs(echo_main_t *em)
void svm_region_exit(void)
static void app_alloc_ctrl_evt_to_vpp(svm_msg_q_t *mq, app_session_evt_t *app_evt, u8 evt_type)
static int app_send(app_session_t *s, u8 *data, u32 len, u8 noblock)
void echo_send_attach(echo_main_t *em)
echo_session_t * echo_session_new(echo_main_t *em)
static void clib_mem_set_thread_index(void)
#define pool_foreach(VAR, POOL, BODY)
Iterate through pool.
svm_msg_q_t * svm_msg_q_alloc(svm_msg_q_cfg_t *cfg)
Allocate message queue.
volatile u64 bytes_received
void echo_session_handle_add_del(echo_main_t *em, u64 handle, u32 sid)
static u32 svm_fifo_max_dequeue(svm_fifo_t *f)
Fifo max bytes to dequeue.
int main(int argc, char **argv)
static void clients_run(echo_main_t *em)
static int app_send_io_evt_to_vpp(svm_msg_q_t *mq, u32 session_index, u8 evt_type, u8 noblock)
Send fifo io event to vpp worker thread.
#define vec_elt_at_index(v, i)
Get vector value at index i checking that i is in bounds.
clib_spinlock_t segment_handles_lock
struct vl_shmem_hdr_ * shmem_hdr
Binary API shared-memory segment header pointer.
#define ECHO_FAIL(fail, _fmt, _args...)
static void clib_spinlock_init(clib_spinlock_t *p)
void vl_set_memory_root_path(const char *name)
static int echo_mq_dequeue_batch(svm_msg_q_t *mq, svm_msg_q_msg_t *msg_vec, u32 n_max_msg)
static int app_recv(app_session_t *s, u8 *data, u32 len)
#define pool_elt_at_index(p, i)
Returns pointer to element at given index.
void(* accepted_cb)(session_accepted_msg_t *mp, echo_session_t *session)
void(* cleanup_cb)(echo_session_t *s, u8 parent_died)
#define pool_put(P, E)
Free an object E in pool P.
void echo_api_hookup(echo_main_t *em)
#define SESSION_INVALID_INDEX
uword vpp_event_queue_address
fifo_segment_main_t segment_main
void clib_time_init(clib_time_t *c)
void(* bound_uri_cb)(session_bound_msg_t *mp, echo_session_t *session)
data_source_t data_source
static void server_run(echo_main_t *em)
API main structure, used by both vpp and binary API clients.
static void __clib_constructor vpp_echo_init()
static void app_send_ctrl_evt_to_vpp(svm_msg_q_t *mq, app_session_evt_t *app_evt)
static u8 svm_fifo_set_event(svm_fifo_t *f)
Set fifo event flag.
static void print_global_stats(echo_main_t *em)
int connect_to_vpp(char *name)
void echo_send_connect(u8 *uri, u32 opaque)
static u8 svm_fifo_needs_deq_ntf(svm_fifo_t *f, u32 n_last_deq)
Check if fifo needs dequeue notification.
void vl_socket_client_disconnect(void)
echo_proto_cb_vft_t * available_proto_cb_vft[TRANSPORT_N_PROTO]
static void svm_msg_q_unlock(svm_msg_q_t *mq)
Unlock message queue.
static int svm_msg_q_timedwait(svm_msg_q_t *mq, double timeout)
Timed wait for message queue event.
svm_msg_q_t * our_event_queue
#define vec_free(V)
Free vector's memory (no header).
void fifo_segment_main_init(fifo_segment_main_t *sm, u64 baseva, u32 timeout_in_seconds)
uword * shared_segment_handles
#define ECHO_LOG(lvl, _fmt, _args...)
void echo_notify_event(echo_main_t *em, echo_test_evt_t e)
u32 *volatile data_thread_args
#define HIGH_SEGMENT_BASEVA
blocking call - best used in combination with condvars, for eventfds we don't yield the cpu ...
static void echo_handle_data(echo_main_t *em, echo_session_t *s, u8 *rx_buf)
svm_queue_t * vl_input_queue
teardown_stat_t reset_count
struct echo_main_t::@482 timing
#define hash_create(elts, value_bytes)
#define uword_to_pointer(u, type)
static void print_usage_and_exit(void)
static uword hash_elts(void *v)
uword unformat_transport_proto(unformat_input_t *input, va_list *args)
static void session_bound_handler(session_bound_msg_t *mp)
void echo_send_unbind(echo_main_t *em)
static void svm_fifo_clear_deq_ntf(svm_fifo_t *f)
Clear the want notification flag and set has notification.
echo_session_t * echo_get_session_from_handle(echo_main_t *em, u64 handle)
static void init_error_string_table(vat_main_t *vam)
void echo_update_count_on_session_close(echo_main_t *em, echo_session_t *s)
static void * echo_data_thread_fn(void *arg)
void(* set_defaults_before_opts_cb)(void)
#define clib_atomic_fetch_add(a, b)
static void echo_set_each_proto_defaults_before_opts(echo_main_t *em)
void echo_send_detach(echo_main_t *em)
template key/value backing page structure
teardown_stat_t close_count
void svm_msg_q_free_msg(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Free message buffer.
static int recv_data_chunk(echo_main_t *em, echo_session_t *s, u8 *rx_buf)
volatile u64 bytes_to_receive
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
void echo_process_opts(int argc, char **argv)
void echo_send_listen(echo_main_t *em)
void(* print_usage_cb)(void)
void(* set_defaults_after_opts_cb)(void)
teardown_stat_t active_count
uword * session_index_by_vpp_handles
int wait_for_segment_allocation(u64 segment_handle)
static void echo_assert_test_suceeded(echo_main_t *em)
svm_msg_q_t * rpc_msq_queue
static void session_disconnected_handler(session_disconnected_msg_t *mp)
void vl_client_disconnect_from_vlib(void)
static u32 svm_msg_q_size(svm_msg_q_t *mq)
Check length of message queue.
echo_proto_cb_vft_t * proto_cb_vft
#define vec_foreach(var, vec)
Vector iterator.
int echo_send_rpc(echo_main_t *em, void *fp, void *arg, u32 opaque)
pthread_t * data_thread_handles
int(* process_opts_cb)(unformat_input_t *a)
static int send_data_chunk(echo_session_t *s, u8 *tx_buf, int offset, int len)
void(* echo_rpc_t)(void *arg, u32 opaque)
void * clib_mem_init_thread_safe(void *memory, uword memory_size)
static int svm_msg_q_lock(svm_msg_q_t *mq)
Lock, or block trying, the message queue.
static void session_connected_handler(session_connected_msg_t *mp)
u8 * format_transport_proto(u8 *s, va_list *args)
u8 send_stream_disconnects
struct echo_main_t::@481 stats
uword echo_unformat_close(unformat_input_t *input, va_list *args)
void(* disconnected_cb)(session_disconnected_msg_t *mp, echo_session_t *s)
void svm_msg_q_sub_w_lock(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Consumer dequeue one message from queue with mutex held.
static void stop_signal(int signum)