10 #include <sys/socket.h>
14 #include <sys/epoll.h>
18 #include <daq_module_api.h>
22 #define DAQ_VPP_VERSION 1
25 #define VPP_DAQ_PAUSE() __builtin_ia32_pause ()
26 #elif defined(__aarch64__) || defined(__arm__)
27 #define VPP_DAQ_PAUSE() __asm__("yield")
29 #define VPP_DAQ_PAUSE()
33 {
"debug",
"Enable debugging output to stdout",
34 DAQ_VAR_DESC_FORBIDS_ARGUMENT },
39 #define SET_ERROR(modinst, ...) daq_base_api.set_errbuf (modinst, __VA_ARGS__)
41 typedef struct _vpp_msg_pool
43 DAQ_MsgPoolInfo_t info;
46 typedef struct _vpp_desc_data
54 typedef struct _vpp_bpool
61 typedef struct _vpp_qpair
68 volatile uint32_t *enq_head;
69 volatile uint32_t *deq_head;
83 typedef struct _vpp_context
90 DAQ_ModuleInstance_h modinst;
107 struct epoll_event *epoll_events;
115 const char *socket_name;
116 volatile bool interrupted;
125 while (!__atomic_compare_exchange_n (&p->lock, &
free, 1, 0, __ATOMIC_ACQUIRE,
128 while (__atomic_load_n (&p->lock, __ATOMIC_RELAXED))
138 __atomic_store_n (&p->lock, 0, __ATOMIC_RELEASE);
144 if (base_api->api_version != DAQ_BASE_API_VERSION ||
145 base_api->api_size != sizeof (DAQ_BaseAPI_t))
172 CMSG_SPACE (
sizeof (
int) * n_fds) + CMSG_SPACE (
sizeof (
struct ucred));
174 struct msghdr mh = {};
176 struct cmsghdr *cmsg;
178 iov[0].iov_base = (
void *) msg;
182 mh.msg_control = ctl;
183 mh.msg_controllen = ctl_sz;
185 memset (ctl, 0, ctl_sz);
189 return DAQ_ERROR_NODEV;
191 cmsg = CMSG_FIRSTHDR (&mh);
194 if (cmsg->cmsg_level == SOL_SOCKET)
196 if (cmsg->cmsg_type == SCM_CREDENTIALS)
200 else if (cmsg->cmsg_type == SCM_RIGHTS)
202 memcpy (fds, CMSG_DATA (cmsg), n_fds *
sizeof (
int));
205 cmsg = CMSG_NXTHDR (&mh, cmsg);
216 if (vc->shm_base != MAP_FAILED)
217 munmap (vc->shm_base, vc->shm_size);
219 if (vc->shm_fd != -1)
224 for (
int i = 0;
i < vc->num_bpools;
i++)
229 if (bp->base && bp->base != MAP_FAILED)
230 munmap (bp->base, bp->size);
237 for (
int i = 0;
i < vc->num_qpairs;
i++)
240 if (qp->enq_fd != -1)
242 if (qp->deq_fd != -1)
245 free (qp->desc_data);
250 free (vc->epoll_events);
252 if (vc->epoll_fd != -1)
253 close (vc->epoll_fd);
257 #define ERR(rv, ...) \
259 SET_ERROR (modinst, __VA_ARGS__); \
266 DAQ_ModuleInstance_h modinst,
void **ctxt_ptr)
269 int rval = DAQ_ERROR;
271 struct sockaddr_un sun = { .sun_family = AF_UNIX };
272 int i, fd = -1, shm_fd = -1;
285 ERR (DAQ_ERROR_NOMEM,
286 "%s: Couldn't allocate memory for the new VPP context!", __func__);
288 const char *varKey, *varValue;
289 daq_base_api.config_first_variable (modcfg, &varKey, &varValue);
292 if (!strcmp (varKey,
"debug"))
294 else if (!strcmp (varKey,
"input_mode"))
296 if (!strcmp (varValue,
"interrupt"))
298 else if (!strcmp (varValue,
"polling"))
301 else if (!strcmp (varKey,
"socket_name"))
303 vc->socket_name = varValue;
305 daq_base_api.config_next_variable (modcfg, &varKey, &varValue);
310 if (!vc->socket_name)
314 if ((fd = socket (AF_UNIX, SOCK_SEQPACKET, 0)) < 0)
315 ERR (DAQ_ERROR_NODEV,
"%s: Couldn't create socket!", __func__);
317 strncpy (sun.sun_path, vc->socket_name, sizeof (sun.sun_path) - 1);
319 if (connect (fd, (
struct sockaddr *) &sun,
sizeof (
struct sockaddr_un)) != 0)
320 ERR (DAQ_ERROR_NODEV,
"%s: Couldn't connect to socket! '%s'", __func__,
328 if (send (fd, &msg,
sizeof (msg), 0) !=
sizeof (msg))
329 ERR (DAQ_ERROR_NODEV,
"%s: Couldn't send connect message!", __func__);
336 ERR (DAQ_ERROR_NODEV,
"%s: Couldn't receive config message!", __func__);
338 vc->modinst = modinst;
349 vc->epoll_events =
calloc (vc->num_qpairs, sizeof (
struct epoll_event));
351 if (vc->bpools == 0 || vc->qpairs == 0)
352 ERR (DAQ_ERROR_NOMEM,
353 "%s: Couldn't allocate memory for the new VPP context!", __func__);
355 for (
i = 0;
i < vc->num_bpools;
i++)
356 vc->bpools[
i].fd = -1;
359 mmap (0, vc->shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, vc->shm_fd, 0);
361 if (base == MAP_FAILED)
362 ERR (DAQ_ERROR_NOMEM,
363 "%s: Couldn't map shared memory for the new VPP context!", __func__);
369 printf (
"[%s]\n", input);
370 printf (
" Shared memory size: %u\n", vc->shm_size);
371 printf (
" Number of buffer pools: %u\n", vc->num_bpools);
372 printf (
" Number of queue pairs: %u\n", vc->num_qpairs);
376 for (
int i = 0;
i < vc->num_bpools;
i++)
382 ERR (DAQ_ERROR_NODEV,
383 "%s: Failed to receive buffer pool message for the new "
387 bp->base = mmap (0, bp->size, PROT_READ, MAP_SHARED, bp->fd, 0);
389 if (bp->base == MAP_FAILED)
390 ERR (DAQ_ERROR_NOMEM,
391 "%s: Couldn't map shared memory for the new VPP context!",
394 printf (
" Buffer pool %u size: %u\n",
i, bp->size);
397 if ((vc->epoll_fd = epoll_create (1)) == -1)
398 ERR (DAQ_ERROR_NODEV,
399 "%s: Couldn't create epoll fd for the new VPP context!", __func__);
402 for (
int i = 0;
i < vc->num_qpairs;
i++)
404 struct epoll_event ev = { .events = EPOLLIN };
405 int fds[2] = { -1, -1 };
410 fds[0] == -1 || fds[1] == -1)
411 ERR (DAQ_ERROR_NODEV,
412 "%s: Failed to receive queu pair message for the new "
425 if (epoll_ctl (vc->epoll_fd, EPOLL_CTL_ADD, qp->enq_fd, &ev) == -1)
426 ERR (DAQ_ERROR_NODEV,
427 "%s: Failed to dequeue fd to epoll instance for the new "
431 qsz = qp->queue_size;
435 ERR (DAQ_ERROR_NOMEM,
436 "%s: Couldn't allocate memory for the new VPP context!",
439 for (
int j = 0; j < qsz; j++)
442 DAQ_PktHdr_t *pkthdr = &dd->pkthdr;
443 DAQ_Msg_t *msg = &dd->msg;
448 pkthdr->ingress_group = DAQ_PKTHDR_UNKNOWN;
449 pkthdr->egress_group = DAQ_PKTHDR_UNKNOWN;
451 msg->type = DAQ_MSG_TYPE_PACKET;
452 msg->hdr_len =
sizeof (DAQ_PktHdr_t);
454 msg->owner = vc->modinst;
460 printf (
" Queue pair %u:\n",
i);
461 printf (
" Size: %u\n", qp->queue_size);
462 printf (
" Enqueue fd: %u\n", qp->enq_fd);
463 printf (
" Dequeue fd: %u\n", qp->deq_fd);
488 vc->interrupted =
true;
496 memset (
stats, 0,
sizeof (DAQ_Stats_t));
508 uint32_t capabilities = DAQ_CAPA_BLOCK | DAQ_CAPA_UNPRIV_START;
518 static inline uint32_t
520 const DAQ_Msg_t *
msgs[],
unsigned max_recv)
523 uint32_t head,
next,
mask = qp->queue_size - 1;
529 next = qp->next_desc;
530 head = __atomic_load_n (qp->enq_head, __ATOMIC_ACQUIRE);
535 n_left = n_recv = max_recv;
540 uint32_t desc_index = qp->enq_ring[
next &
mask];
543 dd->pkthdr.pktlen = d->
length;
546 dd->msg.data_len = d->
length;
553 qp->next_desc =
next;
561 const DAQ_Msg_t *
msgs[], DAQ_RecvStatus *rstat)
564 uint32_t n_qpairs_left = vc->num_qpairs;
565 uint32_t n, n_recv = 0;
571 vc->interrupted =
false;
572 *rstat = DAQ_RSTAT_INTERRUPTED;
579 while (n_qpairs_left)
591 if (vc->next_qpair == vc->num_qpairs)
598 *rstat = DAQ_RSTAT_OK;
604 *rstat = DAQ_RSTAT_OK;
608 n_events = epoll_wait (vc->epoll_fd, vc->epoll_events, vc->num_qpairs, 1000);
612 *rstat = DAQ_RSTAT_TIMEOUT;
617 *rstat = errno == EINTR ? DAQ_RSTAT_TIMEOUT : DAQ_RSTAT_ERROR;
621 for (
int i = 0;
i < n_events;
i++)
624 VPPQueuePair *qp = vc->qpairs + vc->epoll_events[
i].data.u32;
632 (void) read (qp->enq_fd, &ctr, sizeof (ctr));
635 *rstat = DAQ_RSTAT_OK;
647 uint64_t counter_increment = 1;
648 int rv, retv = DAQ_SUCCESS;
651 mask = qp->queue_size - 1;
652 head = *qp->deq_head;
653 d = qp->descs + dd->index;
654 if (verdict == DAQ_VERDICT_PASS)
659 qp->deq_ring[head &
mask] = dd->index;
661 __atomic_store_n (qp->deq_head, head, __ATOMIC_RELEASE);
665 rv = write (qp->deq_fd, &counter_increment, sizeof (counter_increment));
667 if (
rv !=
sizeof (counter_increment))
680 vc->pool.info.available = 128;
681 vc->pool.info.size = 256;
683 *info = vc->pool.info;
690 DAQ_MODULE_API_VERSION,
691 sizeof (DAQ_ModuleAPI_t),
694 DAQ_TYPE_INTF_CAPABLE | DAQ_TYPE_INLINE_CAPABLE |
695 DAQ_TYPE_MULTI_INSTANCE,