diff --git a/CMakeLists.txt b/CMakeLists.txt index e0fe60f..7777703 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -81,7 +81,9 @@ add_library(libef STATIC src/ef-payload.c src/ef-profinet.c src/ef-ptp.c + src/ef-rate.c src/ef-sv.c + src/ef-txring.c src/ef-udp.c src/ef-vlan.c ${version_file} @@ -123,6 +125,7 @@ add_executable(ef-tests test/test-padding.cxx test/test-alloc-free.cxx test/test-mld-zero-width.cxx + test/test-rate-limit.cxx ) target_compile_options(ef-tests PRIVATE ${EF_SANITIZE_FLAGS}) diff --git a/src/ef-args.c b/src/ef-args.c index f579e0d..02c5894 100644 --- a/src/ef-args.c +++ b/src/ef-args.c @@ -4,6 +4,7 @@ #include #include #include +#include int argc_frame(int argc, const char *argv[], frame_t *f) { int i, j, res, offset; @@ -79,6 +80,11 @@ void cmd_destruct(cmd_t *c) { if (c->frame_mask_buf) bfree(c->frame_mask_buf); + if (c->mmsg) + free(c->mmsg); + if (c->miov) + free(c->miov); + memset(c, 0, sizeof(*c)); } @@ -97,11 +103,29 @@ void print_help() { po(" -h Top level help message.\n"); po(" -p No pad. Skip padding frames to 60 bytes,\n"); po(" allowing runt frames to be sent or matched as-is.\n"); - po(" -t When listening on an interface (rx),\n"); - po(" When listening on an interface (rx), the tool will always\n"); - po(" listen during the entire timeout period. This is needed,\n"); - po(" as we must also check that no frames are received during\n"); - po(" the test. Default is 100ms.\n"); + po(" -Q Set PACKET_QDISC_BYPASS on all sockets.\n"); + po(" Skips the Linux qdisc layer entirely, reducing TX CPU cost.\n"); + po(" -r Use PACKET_TX_RING (TPACKET_V2) for TX.\n"); + po(" Per-cmd mmap ring; one atomic store per frame plus a periodic\n"); + po(" send() kick. Off by default; the env var EF_TX_RING=1 has the\n"); + po(" same effect as -r.\n"); + po(" -m Batch TX with sendmmsg in the rate path.\n"); + po(" Sends up to 'burst' frames per syscall using a pre-built\n"); + po(" mmsghdr vector. Requires a 'rate ...' on the tx command\n"); + po(" (use 'rate G' as a near-unlimited rate to opt in).\n"); + po(" Same effect via env: EF_USE_SENDMMSG=1.\n"); + po(" --ignore-link-down On the PACKET_TX_RING path (-r), retry\n"); + po(" silently when send() returns ENETDOWN instead of treating it\n"); + po(" as fatal. Useful for tests that toggle the link mid-run.\n"); + po(" -t Wall-clock deadline. Default 100ms.\n"); + po(" RX: the tool always listens for the full timeout period\n"); + po(" so we can verify that no unexpected frames arrive.\n"); + po(" TX:\n"); + po(" 'rep N' (with or without 'rate'): runs to completion,\n"); + po(" ignoring -t. Explicit rep is the user contract.\n"); + po(" 'rate ...' with no rep: stops at -t.\n"); + po(" no rep, no rate: a single frame is sent and the loop\n"); + po(" exits as soon as RX (if any) is satisfied.\n"); po("\n"); po(" -c ,[],[],[],[cnt]\n"); po(" Use tcpdump to capture traffic on an interface while the\n"); @@ -113,7 +137,7 @@ void print_help() { po("\n"); po("Valid commands:\n"); po(" tx: Transmit a frame on a interface. Syntax:\n"); - po(" tx FRAME | help\n"); + po(" tx [rep ] [rate ] [burst ] FRAME | help\n"); po("\n"); po(" rx: Specify a frame which is expected to be received. If no \n"); po(" frame is specified, then the expectation is that no\n"); @@ -166,6 +190,21 @@ void print_help() { po(" Note that the repeat flag must follow the tx key-word\n"); po(" Results must be viewed through the PC or DUT interface counters, i.e. outside of 'ef'\n"); po("\n"); + po("TX rate limiting:\n"); + po(" 'rate ' limits TX to the given packets per second.\n"); + po(" 'rate K|M|G' limits TX to the given wire rate in Kbps/Mbps/Gbps.\n"); + po(" Wire rate includes preamble, SFD, FCS and IFG (24 bytes overhead).\n"); + po(" 'rate' without 'rep' implies infinite repeat, bounded by -t timeout.\n"); + po(" 'rep', 'rate' and 'burst' can appear in any order.\n"); + po(" 'burst ' overrides the token-bucket burst size (default: 10%%\n"); + po(" of pps, clamped to [1, 1024]). Useful at low rates where the\n"); + po(" default burst would send an unwanted packet storm.\n"); + po("Examples:\n"); + po(" ef -t 5000 tx eth0 rate 1000 eth dmac ::1 smac ::2\n"); + po(" ef tx eth0 rep 500 rate 100 eth dmac ::1 smac ::2\n"); + po(" ef -t 5000 tx eth0 rate 1G eth dmac ::1 smac ::2\n"); + po(" ef -t 5000 tx eth0 rate 100M eth dmac ::1 smac ::2\n"); + po("\n"); } int argc_cmd(int argc, const char *argv[], cmd_t *c) { @@ -223,11 +262,54 @@ int argc_cmd(int argc, const char *argv[], cmd_t *c) { } if (c->type == CMD_TYPE_TX) { + int rep_given = 0, kw; + c->repeat = 1; - if (strcmp(argv[i], "rep") == 0 || strcmp(argv[i], "repeat") == 0) { - c->repeat = atoi(argv[i+1]); - i += 2; + c->rate_pps = 0; + c->rate_bps = 0; + c->rate_burst = 0; + + for (kw = 0; kw < 3 && i < argc; kw++) { + if (strcmp(argv[i], "rep") == 0 || + strcmp(argv[i], "repeat") == 0) { + if (i + 1 >= argc) + break; + c->repeat = atoi(argv[i + 1]); + rep_given = 1; + i += 2; + } else if (strcmp(argv[i], "rate") == 0) { + if (i + 1 >= argc) + break; + + const char *val = argv[i + 1]; + char *end; + unsigned long long num = strtoull(val, &end, 10); + + if (end != val && (*end == 'K' || *end == 'k')) { + c->rate_bps = num * 1000ULL; + } else if (end != val && (*end == 'M' || *end == 'm')) { + c->rate_bps = num * 1000000ULL; + } else if (end != val && (*end == 'G' || *end == 'g')) { + c->rate_bps = num * 1000000000ULL; + } else { + c->rate_pps = (uint32_t)num; + } + + i += 2; + } else if (strcmp(argv[i], "burst") == 0) { + if (i + 1 >= argc) + break; + c->rate_burst = atoi(argv[i + 1]); + i += 2; + } else { + break; + } } + + // rate without rep implies infinite repeat + if ((c->rate_pps > 0 || c->rate_bps > 0) && !rep_given) + c->repeat = UINT32_MAX; + c->rep_explicit = rep_given; } //po("%d, i=%d/%d %s\n", __LINE__, i, argc, argv[i]); @@ -411,13 +493,39 @@ int argc_cmds(int argc, const char *argv[]) { int NO_PAD = 0; int TIME_OUT_MS = 100; +int QDISC_BYPASS = 0; +int TX_RING = 0; +int MMSG_TX = 0; +int IGNORE_LINK_DOWN = 0; parse_err_ctx_t PARSE_ERR_CTX; int main_(int argc, const char *argv[]) { + static const struct option long_opts[] = { + { "ignore-link-down", no_argument, NULL, 1 }, + { NULL, 0, NULL, 0 }, + }; int opt; - while ((opt = getopt(argc, (char * const*)argv, "pvht:c:")) != -1) { + while ((opt = getopt_long(argc, (char * const*)argv, "pQvhrmt:c:", + long_opts, NULL)) != -1) { switch (opt) { + case 1: // --ignore-link-down + IGNORE_LINK_DOWN = 1; + break; + + case 'Q': + QDISC_BYPASS = 1; + break; + + + case 'r': + TX_RING = 1; + break; + + case 'm': + MMSG_TX = 1; + break; + case 'p': NO_PAD = 1; break; diff --git a/src/ef-exec.c b/src/ef-exec.c index 23518f2..94eb9bd 100644 --- a/src/ef-exec.c +++ b/src/ef-exec.c @@ -1,7 +1,10 @@ +#define _GNU_SOURCE #include "ef.h" #include #include +#include +#include #include #include #include @@ -63,6 +66,15 @@ int raw_socket(const char *name) { return -1; } + if (QDISC_BYPASS) { + val = 1; + if (setsockopt(s, SOL_PACKET, PACKET_QDISC_BYPASS, + &val, sizeof(val)) < 0) { + po("%s:%d PACKET_QDISC_BYPASS not supported on %s: %m\n", + __FILE__, __LINE__, name); + } + } + // Make sure that the socket is empty before started. // // Warning: I have no idea why this is needed, but otherwise I see that the @@ -119,57 +131,269 @@ int add_cmd_to_resource(cmd_t *c, int res_max, int res_valid, return 1; } -int rfds_wfds_fill(cmd_socket_t *resources, int res_valid, fd_set *rfds, - fd_set *wfds) { +// Returns the number of pfd entries with a non-zero events mask, or +// -1 if no resource has anything to wait for. +int pfds_fill(cmd_socket_t *resources, int res_valid, struct pollfd *pfds, + int has_rate) { cmd_t *cmd_ptr; - int i, fd_set_cnt, fd_max; - - fd_max = 0; - fd_set_cnt = 0; - - FD_ZERO(rfds); - FD_ZERO(wfds); + int i, active = 0; for (i = 0; i < res_valid; i++) { + short events = 0; + resources[i].has_rx = 0; resources[i].has_tx = 0; cmd_ptr = resources[i].cmd; while (cmd_ptr) { - // We must listen even if done, as we need to confirm that no other - // frames are receiwed if (cmd_ptr->type == CMD_TYPE_RX) resources[i].has_rx = 1; - - // Only TX is not done if (cmd_ptr->type == CMD_TYPE_TX && cmd_ptr->done == 0) - resources[i].has_tx = 1; - + resources[i].has_tx = resources[i].has_tx || !has_rate || rate_can_send(cmd_ptr); cmd_ptr = cmd_ptr->next; } - if (resources[i].has_rx) { - FD_SET(resources[i].fd, rfds); - fd_max = MAX(resources[i].fd, fd_max); - fd_set_cnt++; + if (resources[i].has_rx) + events |= POLLIN; + if (resources[i].has_tx) + events |= POLLOUT; + + pfds[i].fd = resources[i].fd; + pfds[i].events = events; + pfds[i].revents = 0; + + if (events) + active++; + } + + return active ? active : -1; +} + +int send_ready_pfds(cmd_socket_t *resources, int res_valid, + struct pollfd *pfds) { + int i, res, tx_done; + cmd_t *cmd_ptr; + buf_t *b; + + while (1) { + tx_done = 1; + for (i = 0; i < res_valid; i++) { + if (!(pfds[i].revents & POLLOUT)) + continue; + + // TX the first not "done" frame. + for (cmd_ptr = resources[i].cmd; cmd_ptr; cmd_ptr = cmd_ptr->next) { + if (cmd_ptr->type != CMD_TYPE_TX) + continue; + + if (cmd_ptr->done) + continue; + + b = cmd_ptr->frame_buf; + res = send(resources[i].fd, b->data, b->size, 0); + cmd_ptr->repeat--; + + if (cmd_ptr->repeat > 0) { + tx_done = 0; + } + + if ((size_t)res == b->size && cmd_ptr->repeat == 0) { + po("TX %16s: ", cmd_ptr->arg0); + if (cmd_ptr->name) { + po("name %s", cmd_ptr->name); + } else { + print_hex_str(1, b->data, b->size); + } + po("\n"); + cmd_ptr->done = 1; + } + break; + } } + if (tx_done > 0) + break; + } - if (resources[i].has_tx) { - FD_SET(resources[i].fd, wfds); - fd_max = MAX(resources[i].fd, fd_max); - fd_set_cnt++; + return 0; +} + + +int send_rate_ready_pfds(cmd_socket_t *resources, int res_valid, + struct pollfd *pfds) { + int i, res; + cmd_t *cmd_ptr; + buf_t *b; + + + for (i = 0; i < res_valid; i++) { + if (!(pfds[i].revents & POLLOUT)) + continue; + + for (cmd_ptr = resources[i].cmd; cmd_ptr; cmd_ptr = cmd_ptr->next) { + if (cmd_ptr->type != CMD_TYPE_TX) + continue; + + if (cmd_ptr->done) + continue; + + if (!rate_can_send(cmd_ptr)) + continue; + + if (MMSG_TX && cmd_ptr->mmsg) { + // Batch up to rate_burst_available (or full burst when + // rate is unlimited) into a single sendmmsg syscall. + int avail = rate_burst_available(cmd_ptr); + int sent; + + if (avail <= 0) + continue; + if ((uint32_t)avail > cmd_ptr->repeat) + avail = cmd_ptr->repeat; + + // MSG_DONTWAIT: sendmmsg returns the number of fully-sent + // messages (0..avail). -1 + EAGAIN/EWOULDBLOCK means none + // could be sent without blocking; we'll come back next + // ppoll wake. Same non-blocking property as the txring + // kick. + sent = sendmmsg(resources[i].fd, cmd_ptr->mmsg, avail, + MSG_DONTWAIT); + if (sent < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK || + errno == EINTR || errno == ENOBUFS) + break; + pe("TX-ERR %16s: sendmmsg: %m\n", cmd_ptr->arg0); + cmd_ptr->done = 1; + resources[i].tx_err_cnt++; + break; + } + if (sent == 0) + break; + + rate_consume_n(cmd_ptr, sent); + cmd_ptr->repeat -= sent; + + if (cmd_ptr->repeat == 0) { + b = cmd_ptr->frame_buf; + po("TX %16s: ", cmd_ptr->arg0); + if (cmd_ptr->name) { + po("name %s", cmd_ptr->name); + } else { + print_hex_str(1, b->data, b->size); + } + po("\n"); + cmd_ptr->done = 1; + } + break; + } + + b = cmd_ptr->frame_buf; + res = send(resources[i].fd, b->data, b->size, 0); + if (res < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK || + errno == EINTR || errno == ENOBUFS) + break; + pe("TX-ERR %16s: send: %m\n", cmd_ptr->arg0); + cmd_ptr->done = 1; + resources[i].tx_err_cnt++; + break; + } + if ((size_t)res != b->size) + break; + + rate_consume(cmd_ptr); + cmd_ptr->repeat--; + + if (cmd_ptr->repeat == 0) { + po("TX %16s: ", cmd_ptr->arg0); + if (cmd_ptr->name) { + po("name %s", cmd_ptr->name); + } else { + print_hex_str(1, b->data, b->size); + } + po("\n"); + cmd_ptr->done = 1; + } + break; } } - if (fd_set_cnt) - return fd_max; + return 0; +} + +int send_txring_ready_pfds(cmd_socket_t *resources, int res_valid, + struct pollfd *pfds) { + cmd_t *cmd_ptr; + buf_t *b; + int i; - return -1; + for (i = 0; i < res_valid; i++) { + if (!(pfds[i].revents & POLLOUT)) + continue; + + for (cmd_ptr = resources[i].cmd; cmd_ptr; cmd_ptr = cmd_ptr->next) { + int submitted; + int budget; + size_t in_flight; + + if (cmd_ptr->type != CMD_TYPE_TX) + continue; + if (cmd_ptr->done) + continue; + if (!cmd_ptr->txring_map) + continue; + + // Budget uses unsigned math so rate-without-rep + // (repeat = UINT32_MAX) does not wrap to -1. + if (cmd_ptr->repeat > 0) { + if (cmd_ptr->rate_pps > 0) + budget = rate_burst_available(cmd_ptr); + else + budget = (int)cmd_ptr->txring_frame_nr; + if (budget > 0) { + if ((uint32_t)budget > cmd_ptr->repeat) + budget = (int)cmd_ptr->repeat; + submitted = txring_send(cmd_ptr, resources[i].fd, budget); + if (submitted < 0) { + cmd_ptr->done = 1; + resources[i].tx_err_cnt++; + break; + } + if (submitted > 0) { + if (cmd_ptr->rate_pps > 0) + rate_consume_n(cmd_ptr, submitted); + cmd_ptr->repeat -= (uint32_t)submitted; + } + } + } + + // Done only when the kernel has drained every slot we ever + // flipped, not just when repeat hits zero. Kick periodically + // while waiting; ppoll wakes us when something drains. + in_flight = txring_unsent(cmd_ptr); + if (cmd_ptr->repeat == 0) { + if (in_flight == 0) { + b = cmd_ptr->frame_buf; + po("TX %16s: ", cmd_ptr->arg0); + if (cmd_ptr->name) + po("name %s", cmd_ptr->name); + else + print_hex_str(1, b->data, b->size); + po("\n"); + cmd_ptr->done = 1; + } else { + txring_kick(resources[i].fd); + } + } + break; + } + } + + return 0; } -int rfds_wfds_process(cmd_socket_t *resources, int res_valid, fd_set *rfds, - fd_set *wfds) { - int i, res, match, old_size, tx_done; +int pfds_process(cmd_socket_t *resources, int res_valid, struct pollfd *pfds, + int has_rate) { + int i, res, match, old_size; buf_t *b; cmd_t *cmd_ptr; @@ -180,7 +404,7 @@ int rfds_wfds_process(cmd_socket_t *resources, int res_valid, fd_set *rfds, struct iovec iov = {}; struct msghdr msg = {}; - if (!FD_ISSET(resources[i].fd, rfds)) + if (!(pfds[i].revents & POLLIN)) continue; // read the frame, and try to match it @@ -274,46 +498,11 @@ int rfds_wfds_process(cmd_socket_t *resources, int res_valid, fd_set *rfds, bfree(b); } - while(1) { - tx_done = 1; - for (i = 0; i < res_valid; i++) { - if (!FD_ISSET(resources[i].fd, wfds)) - continue; - - // TX the first not "done" frame. - for (cmd_ptr = resources[i].cmd; cmd_ptr; cmd_ptr = cmd_ptr->next) { - if (cmd_ptr->type != CMD_TYPE_TX) - continue; - - if (cmd_ptr->done) - continue; - - b = cmd_ptr->frame_buf; - res = send(resources[i].fd, b->data, b->size, 0); - cmd_ptr->repeat--; - - if (cmd_ptr->repeat > 0) { - tx_done = 0; - } - - if ((size_t)res == b->size && cmd_ptr->repeat == 0) { - po("TX %16s: ", cmd_ptr->arg0); - if (cmd_ptr->name) { - po("name %s", cmd_ptr->name); - } else { - print_hex_str(1, b->data, b->size); - } - po("\n"); - cmd_ptr->done = 1; - } - break; - } - } - if (tx_done > 0) - break; - } - - return 0; + if (TX_RING) + return send_txring_ready_pfds(resources, res_valid, pfds); + if (has_rate) + return send_rate_ready_pfds(resources, res_valid, pfds); + return send_ready_pfds(resources, res_valid, pfds); } static int copy_cmd_by_name(const char *name, int cnt, cmd_t *cmds, cmd_t *dst) { @@ -375,12 +564,58 @@ int pcap_append(cmd_t *c) { } #endif +// Returns 1 if we are past ts_end, 0 otherwise. On return ts_left is +// ts_end - ts_now, or zero if past. +static int update_timeleft(struct timespec *ts_now, struct timespec *ts_end, + struct timespec *ts_left) +{ + clock_gettime(CLOCK_MONOTONIC, ts_now); + if (ts_less(ts_end, ts_now)) { + ts_clear(ts_left); + return 1; + } + ts_sub(ts_end, ts_now, ts_left); + return 0; +} + +// Sleep until the shorter of ts_pace and ts_left elapses, using +// CLOCK_MONOTONIC + TIMER_ABSTIME so we don't drift across iterations. +static void wait_for_tokens(const struct timespec *ts_pace, + const struct timespec *ts_left) { + const struct timespec *ts = ts_pace; + struct timespec ts_dl; + + if (ts_isset(ts_left) && ts_less(ts_left, ts)) + ts = ts_left; + + clock_gettime(CLOCK_MONOTONIC, &ts_dl); + ts_add(&ts_dl, ts, &ts_dl); + clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &ts_dl, NULL); +} + +// True if any TX cmd still owes an explicit-rep contract. +static int explicit_rep_pending(int cnt, const cmd_t *cmds) +{ + int i; + for (i = 0; i < cnt; i++) { + if (cmds[i].type != CMD_TYPE_TX) + continue; + if (cmds[i].done) + continue; + if (!cmds[i].rep_explicit) + continue; + return 1; + } + return 0; +} + int exec_cmds(int cnt, cmd_t *cmds) { - struct timeval tv_now, tv_left, tv_begin, tv_end; - int i, res, fd_max, err = 0; + struct timespec ts_now, ts_left, ts_begin, ts_end; + int i, res, npfds, err = 0; int res_valid = 0; + int has_rate = 0; cmd_socket_t resources[100] = {}; - fd_set rfds, wfds; + struct pollfd pfds[100]; cmd_t *cmd_ptr; // Print inventory of named frames @@ -422,6 +657,13 @@ int exec_cmds(int cnt, cmd_t *cmds) { if (err) return err; + // Convert wire-rate bps to pps now that frame_buf is resolved. + for (i = 0; i < cnt; i++) { + if (cmds[i].rate_bps > 0 && cmds[i].frame_buf) + cmds[i].rate_pps = rate_bps_to_pps(cmds[i].rate_bps, + cmds[i].frame_buf->size); + } + #ifdef HAS_LIBPCAP for (i = 0; i < cnt; i++) { if (cmds[i].type != CMD_TYPE_PCAP) @@ -467,39 +709,119 @@ int exec_cmds(int cnt, cmd_t *cmds) { return -1; } - timerclear(&tv_now); - timerclear(&tv_end); - timerclear(&tv_left); - timerclear(&tv_begin); + // EF_USE_SENDMMSG=1 enables the sendmmsg batched TX path, same as + // -m. Must be evaluated before rate_init so the mmsg/miov vectors + // get allocated for rate-limited cmds. + if (!MMSG_TX) { + const char *env = getenv("EF_USE_SENDMMSG"); + if (env && *env && strcmp(env, "0") != 0) + MMSG_TX = 1; + } - tv_left.tv_sec = TIME_OUT_MS / 1000; - tv_left.tv_usec = (TIME_OUT_MS - (tv_left.tv_sec * 1000)) * 1000; + // Initialize the rate path for any TX cmd that has explicit rate, or + // any TX cmd at all when -m is set. + for (i = 0; i < cnt; i++) { + if (cmds[i].type != CMD_TYPE_TX) + continue; + if (cmds[i].rate_pps > 0 || MMSG_TX) { + has_rate = 1; + rate_init(&cmds[i]); + } + } + + // EF_TX_RING=1 enables PACKET_TX_RING, same as -r. Off by default; + // there is no auto-pick based on rep count. + if (!TX_RING) { + const char *env = getenv("EF_TX_RING"); + if (env && *env && strcmp(env, "0") != 0) + TX_RING = 1; + } + + if (TX_RING) { + for (i = 0; i < res_valid; i++) { + cmd_t *cp; + for (cp = resources[i].cmd; cp; cp = cp->next) { + if (cp->type != CMD_TYPE_TX) + continue; + if (txring_init(cp, resources[i].fd) < 0) + return -1; + } + } + } + + ts_clear(&ts_now); + ts_clear(&ts_end); + ts_clear(&ts_left); + ts_clear(&ts_begin); - gettimeofday(&tv_begin, 0); - timeradd(&tv_begin, &tv_left, &tv_end); + ts_left.tv_sec = TIME_OUT_MS / 1000; + ts_left.tv_nsec = (long)(TIME_OUT_MS % 1000) * 1000000L; + + clock_gettime(CLOCK_MONOTONIC, &ts_begin); + ts_add(&ts_begin, &ts_left, &ts_end); + int tx_pending = 0; while (1) { - fd_max = rfds_wfds_fill(resources, res_valid, &rfds, &wfds); - if (fd_max < 0) { - break; + struct timespec ts_pace; + ts_clear(&ts_pace); + + if (has_rate) { + tx_pending = rate_refill_cmds(cnt, cmds, &ts_pace); + if (!tx_pending) + has_rate = 0; // fall back to non-ratelimited logic } - res = select(fd_max + 1, &rfds, &wfds, 0, &tv_left); - gettimeofday(&tv_now, 0); - if (timercmp(&tv_now, &tv_end, >)) { - break; + npfds = pfds_fill(resources, res_valid, pfds, has_rate); + if (npfds < 0) { + if (!tx_pending) + break; + wait_for_tokens(&ts_pace, &ts_left); + if (update_timeleft(&ts_now, &ts_end, &ts_left)) { + // -t is a hard cap unless we still owe explicit-rep frames. + if (!explicit_rep_pending(cnt, cmds)) + break; + } + continue; + } + + struct timespec ts_to; + if (ts_isset(&ts_pace)) { + ts_to = ts_pace; + if (ts_isset(&ts_left) && ts_less(&ts_left, &ts_to)) + ts_to = ts_left; + } else if (ts_isset(&ts_left)) { + ts_to = ts_left; + } else { + ts_to.tv_sec = 0; + ts_to.tv_nsec = 250000; + } + + res = ppoll(pfds, res_valid, &ts_to, NULL); + if (update_timeleft(&ts_now, &ts_end, &ts_left)) { + if (!explicit_rep_pending(cnt, cmds)) + break; } - timersub(&tv_end, &tv_now, &tv_left); if (res == 0) { + if (tx_pending || explicit_rep_pending(cnt, cmds)) + continue; break; } else if (res < 0) { break; } - rfds_wfds_process(resources, res_valid, &rfds, &wfds); + pfds_process(resources, res_valid, pfds, has_rate); } - // close resources + // close resources. munmap any TX rings before closing the socket + // since the mapping is owned by the socket lifetime. + if (TX_RING) { + for (i = 0; i < res_valid; i++) { + cmd_t *cp; + for (cp = resources[i].cmd; cp; cp = cp->next) + if (cp->type == CMD_TYPE_TX) + txring_close(cp); + } + } for (i = 0; i < res_valid; i++) { if (resources[i].fd >= 0) { close(resources[i].fd); @@ -510,6 +832,7 @@ int exec_cmds(int cnt, cmd_t *cmds) { // check results for (i = 0; i < res_valid; i++) { err += resources[i].rx_err_cnt; + err += resources[i].tx_err_cnt; for (cmd_ptr = resources[i].cmd; cmd_ptr; cmd_ptr = cmd_ptr->next) { if (cmd_ptr->type != CMD_TYPE_RX) diff --git a/src/ef-rate.c b/src/ef-rate.c new file mode 100644 index 0000000..731738d --- /dev/null +++ b/src/ef-rate.c @@ -0,0 +1,176 @@ +#define _GNU_SOURCE +#include "ef.h" + +#include +#include + +#define RATE_MILLIPKT 1000LL +#define NSEC_PER_SEC 1000000000LL + +/* Ethernet wire overhead: preamble(7) + SFD(1) + FCS(4) + IFG(12) */ +#define ETH_WIRE_OVERHEAD 24 + +static int clamp(int v, int lo, int hi) +{ + return v < lo ? lo : (v > hi ? hi : v); +} + +void rate_init(cmd_t *c) +{ + int i, burst; + + // Auto-compute burst: 10% of pps, clamped to [1, RATE_BURST]. + if (c->rate_burst > 0) + burst = clamp(c->rate_burst, 1, RATE_BURST); + else if (c->rate_pps > 0) + burst = clamp((int)(c->rate_pps / 10), 1, RATE_BURST); + else + burst = RATE_BURST; + + c->rate_burst = burst; + c->tb_rate = (int64_t)c->rate_pps * RATE_MILLIPKT; + c->tb_max = (int64_t)burst * RATE_MILLIPKT; + c->tb_tokens = c->tb_max; + clock_gettime(CLOCK_MONOTONIC, &c->tb_last); + + // Pre-build the sendmmsg vector when -m is set. All entries point to + // the same frame buffer; the caller varies vlen at send time. Skipped + // when -m is off so plain rate-limited cmds pay no extra memory. + if (MMSG_TX && c->frame_buf) { + c->mmsg = calloc(burst, sizeof(*c->mmsg)); + c->miov = calloc(burst, sizeof(*c->miov)); + if (!c->mmsg || !c->miov) { + pe("rate_init: out of memory (burst=%d)\n", burst); + free(c->mmsg); c->mmsg = NULL; + free(c->miov); c->miov = NULL; + return; + } + for (i = 0; i < burst; i++) { + c->miov[i].iov_base = c->frame_buf->data; + c->miov[i].iov_len = c->frame_buf->size; + c->mmsg[i].msg_hdr.msg_iov = &c->miov[i]; + c->mmsg[i].msg_hdr.msg_iovlen = 1; + } + } +} + +void rate_refill(cmd_t *c, struct timespec *now) +{ + int64_t sec, nsec, add; + + if (c->rate_pps == 0) + return; + + sec = now->tv_sec - c->tb_last.tv_sec; + nsec = now->tv_nsec - c->tb_last.tv_nsec; + if (nsec < 0) { + sec -= 1; + nsec += NSEC_PER_SEC; + } + + if (sec < 0) + return; + + /* sec * tb_rate + (nsec * rate_pps) / 1e6 - split to avoid overflow */ + add = sec * c->tb_rate + + (nsec * (int64_t)c->rate_pps) / (NSEC_PER_SEC / RATE_MILLIPKT); + + if (add <= 0) + return; + + c->tb_tokens += add; + if (c->tb_tokens > c->tb_max) + c->tb_tokens = c->tb_max; + c->tb_last = *now; +} + +int rate_can_send(cmd_t *c) +{ + if (c->rate_pps == 0) + return 1; + return c->tb_tokens >= RATE_MILLIPKT; +} + +void rate_consume(cmd_t *c) +{ + if (c->rate_pps == 0) + return; + c->tb_tokens -= RATE_MILLIPKT; +} + +// Spend N tokens at once. Used by the TX_RING path which submits a +// burst per kick rather than one frame at a time. +void rate_consume_n(cmd_t *c, int n) +{ + if (c->rate_pps == 0 || n <= 0) + return; + c->tb_tokens -= (int64_t)n * RATE_MILLIPKT; +} + +// Whole tokens currently available, clamped to the configured burst. +int rate_burst_available(cmd_t *c) +{ + int64_t pkts; + if (c->rate_pps == 0) + return c->rate_burst > 0 ? c->rate_burst : RATE_BURST; + pkts = c->tb_tokens / RATE_MILLIPKT; + if (pkts < 0) + pkts = 0; + if (c->rate_burst > 0 && pkts > c->rate_burst) + pkts = c->rate_burst; + return (int)pkts; +} + +int64_t rate_ns_until_token(cmd_t *c) +{ + int64_t deficit; + + if (c->rate_pps == 0) + return 0; + + if (c->tb_tokens >= RATE_MILLIPKT) + return 0; + + deficit = RATE_MILLIPKT - c->tb_tokens; + return (deficit * NSEC_PER_SEC) / c->tb_rate; +} + +uint32_t rate_bps_to_pps(uint64_t bps, size_t frame_len) +{ + uint64_t wire_bits = (uint64_t)(frame_len + ETH_WIRE_OVERHEAD) * 8; + uint64_t pps = bps / wire_bits; + + return pps > 0 ? (uint32_t)pps : 1; +} + +int rate_refill_cmds(int cnt, cmd_t *cmds, struct timespec *ts_pace) +{ + int64_t min_wait_ns = -1; + struct timespec ts_now; + int tx_pending = 0; + + clock_gettime(CLOCK_MONOTONIC, &ts_now); + + for (int i = 0; i < cnt; i++) { + if (cmds[i].type != CMD_TYPE_TX || cmds[i].done) + continue; + + tx_pending = 1; + + if (cmds[i].rate_pps > 0) { + rate_refill(&cmds[i], &ts_now); + int64_t w = rate_ns_until_token(&cmds[i]); + if (min_wait_ns < 0 || w < min_wait_ns) + min_wait_ns = w; + } else { + min_wait_ns = 0; + } + } + + if (min_wait_ns > 0) { + ts_pace->tv_sec = min_wait_ns / NSEC_PER_SEC; + ts_pace->tv_nsec = min_wait_ns % NSEC_PER_SEC; + } + + return tx_pending; +} diff --git a/src/ef-txring.c b/src/ef-txring.c new file mode 100644 index 0000000..7af666b --- /dev/null +++ b/src/ef-txring.c @@ -0,0 +1,224 @@ +#include "ef.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// PACKET_TX_RING TPACKET_V2 slot layout (kernel reads tx data from here): +// [ tpacket2_hdr | pad | frame data ] +// ^ TPACKET_ALIGN(sizeof(tpacket2_hdr)) +// TPACKET2_HDRLEN includes sizeof(sockaddr_ll) which is only used on the +// RX path; on TX the kernel reads frame data at hdrlen - sizeof(sockaddr_ll). +#define FRAME_DATA_OFF (TPACKET2_HDRLEN - sizeof(struct sockaddr_ll)) +#define TXRING_TARGET_FRAMES 1024 // fallback when ETHTOOL_GRINGPARAM fails + +static inline struct tpacket2_hdr *slot_hdr(cmd_t *c, size_t idx) { + return (struct tpacket2_hdr *)((char *)c->txring_map + + idx * c->txring_frame_size); +} + +static size_t next_pow2(size_t x) { + size_t p = 64; + while (p < x) + p <<= 1; + return p; +} + +static size_t nic_tx_ring_pending(int fd, const char *ifname) { + struct ethtool_ringparam erp = {}; + struct ifreq ifr = {}; + + erp.cmd = ETHTOOL_GRINGPARAM; + strncpy(ifr.ifr_name, ifname, IFNAMSIZ - 1); + ifr.ifr_data = (caddr_t)&erp; + if (ioctl(fd, SIOCETHTOOL, &ifr) < 0) + return 0; + return erp.tx_pending; +} + +static size_t target_frames_from_env(size_t fallback) { + const char *env = getenv("EF_TXRING_FRAMES"); + long n; + + if (!env || !*env) + return fallback; + n = strtol(env, NULL, 10); + if (n < 2 || n > (1L << 20) || (n & (n - 1)) != 0) { + pe("EF_TXRING_FRAMES=%s invalid (need power of 2 in [2, 1M]); using %zu\n", + env, fallback); + return fallback; + } + return (size_t)n; +} + +int txring_init(cmd_t *c, int fd) { + struct tpacket_req req = {}; + size_t frame_size, block_size, frames_per_block, block_nr; + size_t target_frames, nic_depth; + long page_size; + int ver; + size_t i; + + if (!c->frame_buf) + return -1; + + // Match the NIC tx ring depth (rounded to a power of two for + // PACKET_TX_RING). Going larger just shifts the queue into the + // qdisc; going smaller starves the kernel pipeline. + nic_depth = nic_tx_ring_pending(fd, c->arg0); + target_frames = nic_depth < 2 ? TXRING_TARGET_FRAMES + : next_pow2(nic_depth); + target_frames = target_frames_from_env(target_frames); + + frame_size = next_pow2(FRAME_DATA_OFF + c->frame_buf->size); + + // block_size is a multiple of both frame_size and page_size. For + // frame_size <= page_size both are powers of two so page_size + // works; for jumbos use frame_size directly. + page_size = sysconf(_SC_PAGESIZE); + block_size = frame_size > (size_t)page_size ? frame_size + : (size_t)page_size; + frames_per_block = block_size / frame_size; + block_nr = (target_frames + frames_per_block - 1) / + frames_per_block; + + ver = TPACKET_V2; + if (setsockopt(fd, SOL_PACKET, PACKET_VERSION, &ver, sizeof(ver)) < 0) { + pe("txring: PACKET_VERSION: %m\n"); + return -1; + } + + req.tp_block_size = block_size; + req.tp_block_nr = block_nr; + req.tp_frame_size = frame_size; + req.tp_frame_nr = frames_per_block * block_nr; + if (setsockopt(fd, SOL_PACKET, PACKET_TX_RING, &req, sizeof(req)) < 0) { + pe("txring: PACKET_TX_RING (frame=%zu block=%zu nr=%u): %m\n", + frame_size, block_size, req.tp_block_nr); + return -1; + } + + if ((req.tp_frame_nr & (req.tp_frame_nr - 1)) != 0) { + pe("txring: frame_nr=%u must be a power of 2\n", req.tp_frame_nr); + return -1; + } + + c->txring_map_len = (size_t)req.tp_block_size * req.tp_block_nr; + c->txring_frame_size = req.tp_frame_size; + c->txring_frame_nr = req.tp_frame_nr; + c->txring_mask = req.tp_frame_nr - 1; + c->txring_map = mmap(NULL, c->txring_map_len, PROT_READ | PROT_WRITE, + MAP_SHARED, fd, 0); + if (c->txring_map == MAP_FAILED) { + pe("txring: mmap: %m\n"); + c->txring_map = NULL; + return -1; + } + + // Prewrite the static frame into every slot; the hot path only + // flips tp_status. + for (i = 0; i < c->txring_frame_nr; i++) { + struct tpacket2_hdr *h = slot_hdr(c, i); + memcpy((char *)h + FRAME_DATA_OFF, c->frame_buf->data, + c->frame_buf->size); + h->tp_len = c->frame_buf->size; + } + + c->txring_head = 0; + return 0; +} + +// Flip up to 'budget' AVAILABLE slots into SEND_REQUEST and kick the +// kernel. Returns the number flipped. qdisc-rejected slots come back +// as SEND_REQUEST so a future kick retries them. +int txring_send(cmd_t *c, int fd, int budget) { + struct tpacket2_hdr *h; + size_t head, mask; + int filled = 0; + uint32_t st; + + if (!c->txring_map || budget <= 0) + return 0; + + head = c->txring_head; + mask = c->txring_mask; + + // tp_status is shared with the kernel (which may run on a different + // core), so plain loads/stores are not safe: the compiler could fuse + // or reorder them, and without barriers a CPU could observe the new + // status before the slot contents the kernel just wrote. + // ACQUIRE on the load pairs with the kernel's release when it sets + // AVAILABLE - guarantees that any frame metadata the kernel touched + // is visible to us before we read the status. + // RELEASE on the store pairs with the kernel's acquire when it + // picks up SEND_REQUEST - guarantees the prewritten frame bytes + // are visible to the kernel before it sees the new status. + while (filled < budget) { + h = slot_hdr(c, head); + st = __atomic_load_n(&h->tp_status, __ATOMIC_ACQUIRE); + if (st != TP_STATUS_AVAILABLE) + break; + __atomic_store_n(&h->tp_status, TP_STATUS_SEND_REQUEST, + __ATOMIC_RELEASE); + head = (head + 1) & mask; + filled++; + } + + c->txring_head = head; + + if (filled > 0) { + int n = send(fd, NULL, 0, MSG_DONTWAIT); + if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK && + errno != EINTR && errno != ENOBUFS && + !(IGNORE_LINK_DOWN && errno == ENETDOWN)) { + pe("TX-ERR %16s: send (txring): %m\n", c->arg0); + return -1; + } + } + + return filled; +} + +// Slots in SEND_REQUEST are kernel-owned (queued or retrying); +// AVAILABLE means drained. +size_t txring_unsent(const cmd_t *c) { + struct tpacket2_hdr *h; + size_t i, unsent = 0; + + if (!c->txring_map) + return 0; + for (i = 0; i < c->txring_frame_nr; i++) { + h = (struct tpacket2_hdr *)((char *)c->txring_map + + i * c->txring_frame_size); + if (__atomic_load_n(&h->tp_status, __ATOMIC_ACQUIRE) + != TP_STATUS_AVAILABLE) + unsent++; + } + return unsent; +} + +// The kernel retries SEND_REQUEST slots only when we call send(). +void txring_kick(int fd) { + (void)send(fd, NULL, 0, MSG_DONTWAIT); +} + +void txring_close(cmd_t *c) { + size_t unsent; + + if (c->txring_map && c->txring_map != MAP_FAILED) { + unsent = txring_unsent(c); + if (unsent > 0) + pe("TX-DROP %16s: %zu frames in flight at exit\n", + c->arg0, unsent); + munmap(c->txring_map, c->txring_map_len); + c->txring_map = NULL; + } +} diff --git a/src/ef.h b/src/ef.h index c48a8b1..eff95a4 100644 --- a/src/ef.h +++ b/src/ef.h @@ -5,8 +5,11 @@ #include #include #include +#include #include +#define RATE_BURST 1024 + #include "version.h" #ifdef __cplusplus @@ -16,8 +19,49 @@ extern "C" { #define DIV_ROUND(a, b) (1 + ((a - 1) / b)) #define BIT_TO_BYTE(x) (DIV_ROUND(x, 8)) +#define NSEC_PER_SEC 1000000000LL + +static inline void ts_clear(struct timespec *a) { + a->tv_sec = 0; + a->tv_nsec = 0; +} + +static inline int ts_isset(const struct timespec *a) { + return a->tv_sec != 0 || a->tv_nsec != 0; +} + +static inline int ts_less(const struct timespec *a, const struct timespec *b) { + if (a->tv_sec != b->tv_sec) + return a->tv_sec < b->tv_sec; + return a->tv_nsec < b->tv_nsec; +} + +static inline void ts_add(const struct timespec *a, const struct timespec *b, + struct timespec *out) { + out->tv_sec = a->tv_sec + b->tv_sec; + out->tv_nsec = a->tv_nsec + b->tv_nsec; + if (out->tv_nsec >= NSEC_PER_SEC) { + out->tv_sec += 1; + out->tv_nsec -= NSEC_PER_SEC; + } +} + +static inline void ts_sub(const struct timespec *a, const struct timespec *b, + struct timespec *out) { + out->tv_sec = a->tv_sec - b->tv_sec; + out->tv_nsec = a->tv_nsec - b->tv_nsec; + if (out->tv_nsec < 0) { + out->tv_sec -= 1; + out->tv_nsec += NSEC_PER_SEC; + } +} + extern int NO_PAD; extern int TIME_OUT_MS; +extern int QDISC_BYPASS; +extern int TX_RING; // -r enables PACKET_TX_RING for all TX cmds +extern int MMSG_TX; // -m batches TX with sendmmsg in the rate path +extern int IGNORE_LINK_DOWN; // --ignore-link-down: retry on ENETDOWN (txring) /////////////////////////////////////////////////////////////////////////////// typedef struct { @@ -290,7 +334,29 @@ typedef struct cmd { buf_t *frame_buf; buf_t *frame_mask_buf; int done; + int rep_explicit; // user passed 'rep N' (rate-without-rep is 0) uint32_t repeat; + + uint32_t rate_pps; // 0 = unlimited + uint64_t rate_bps; // 0 = not set; wire-rate bps before conversion + int rate_burst; // 0 = auto (10% of pps, clamped to [1,64]) + int64_t tb_tokens; // millipkts (1000 = one packet) + int64_t tb_max; // max tokens (burst * 1000) + int64_t tb_rate; // millipkts per second + struct timespec tb_last; // CLOCK_MONOTONIC last refill + + void *txring_map; // mmap'd PACKET_TX_RING base, NULL if unused + size_t txring_map_len; + size_t txring_frame_size; // bytes per slot + size_t txring_frame_nr; // total slots (always power of 2) + size_t txring_mask; // frame_nr - 1 + size_t txring_head; // producer index + + // Pre-built sendmmsg vector (allocated by rate_init when -m is set or + // rate is in use). All entries point to the same frame_buf; caller + // varies vlen at send time. NULL when unused. + struct mmsghdr *mmsg; + struct iovec *miov; } cmd_t; typedef struct { @@ -299,10 +365,27 @@ typedef struct { int has_tx; cmd_t *cmd; int rx_err_cnt; + int tx_err_cnt; } cmd_socket_t; int exec_cmds(int cnt, cmd_t *cmds); +void rate_init(cmd_t *c); +void rate_refill(cmd_t *c, struct timespec *now); +int rate_can_send(cmd_t *c); +void rate_consume(cmd_t *c); +void rate_consume_n(cmd_t *c, int n); +int rate_burst_available(cmd_t *c); +int64_t rate_ns_until_token(cmd_t *c); +uint32_t rate_bps_to_pps(uint64_t bps, size_t frame_len); +int rate_refill_cmds(int cnt, cmd_t *cmds, struct timespec *ts_pace); + +int txring_init(cmd_t *c, int fd); +int txring_send(cmd_t *c, int fd, int budget); +size_t txring_unsent(const cmd_t *c); +void txring_kick(int fd); +void txring_close(cmd_t *c); + void print_hex_str(int fd, void *_d, int s); int argc_frame(int argc, const char *argv[], frame_t *f); diff --git a/test/test-rate-limit.cxx b/test/test-rate-limit.cxx new file mode 100644 index 0000000..97e3499 --- /dev/null +++ b/test/test-rate-limit.cxx @@ -0,0 +1,188 @@ +#include "ef.h" +#include "catch_single_include.hxx" + +#include + +static cmd_t make_cmd(uint32_t rate_pps) +{ + cmd_t c; + memset(&c, 0, sizeof(c)); + c.type = CMD_TYPE_TX; + c.rate_pps = rate_pps; + c.repeat = 1; + return c; +} + +TEST_CASE("rate_init sets correct fields", "[rate]") { + cmd_t c = make_cmd(80); + rate_init(&c); + + CHECK(c.rate_burst == 8); // clamp(80/10, 1, 64) = 8 + CHECK(c.tb_rate == 80 * 1000LL); // 80 pps * 1000 millipkts + CHECK(c.tb_max == 8 * 1000LL); // burst=8 * 1000 + CHECK(c.tb_tokens == c.tb_max); // starts full + CHECK(c.tb_last.tv_sec != 0); // timestamp set +} + +TEST_CASE("rate_can_send / rate_consume drain burst", "[rate]") { + cmd_t c = make_cmd(100); + rate_init(&c); + + // burst = clamp(100/10, 1, 64) = 10 + int sent = 0; + while (rate_can_send(&c)) { + rate_consume(&c); + sent++; + if (sent > 100) + break; // safety + } + + CHECK(sent == 10); + CHECK(rate_can_send(&c) == 0); +} + +TEST_CASE("rate_can_send unlimited always returns 1", "[rate]") { + cmd_t c = make_cmd(0); + // No init needed for unlimited + + CHECK(rate_can_send(&c) == 1); + rate_consume(&c); // should be no-op + CHECK(rate_can_send(&c) == 1); +} + +TEST_CASE("rate_refill adds correct tokens", "[rate]") { + cmd_t c = make_cmd(1000); + rate_init(&c); + + // Drain fully + while (rate_can_send(&c)) + rate_consume(&c); + + CHECK(c.tb_tokens == 0); + + // Simulate 5ms elapsed (stays under burst cap) + struct timespec now = c.tb_last; + now.tv_nsec += 5000000; // 5ms + if (now.tv_nsec >= 1000000000) { + now.tv_sec += 1; + now.tv_nsec -= 1000000000; + } + + rate_refill(&c, &now); + + // 1000 pps * 5ms = 5 packets = 5000 millipkts + CHECK(c.tb_tokens == 5000); +} + +TEST_CASE("rate_refill partial token accumulation", "[rate]") { + cmd_t c = make_cmd(100); + rate_init(&c); + + while (rate_can_send(&c)) + rate_consume(&c); + + struct timespec now = c.tb_last; + now.tv_nsec += 5000000; // 5ms + + rate_refill(&c, &now); + + // 100 pps * 5ms = 0.5 packets = 500 millipkts + CHECK(c.tb_tokens == 500); + CHECK(rate_can_send(&c) == 0); // not enough for a full packet +} + +TEST_CASE("rate_refill second boundary crossing", "[rate]") { + cmd_t c = make_cmd(50); + rate_init(&c); + + // burst = clamp(50/10, 1, 64) = 5 + CHECK(c.rate_burst == 5); + + while (rate_can_send(&c)) + rate_consume(&c); + + struct timespec now = c.tb_last; + // 1.5s elapsed + now.tv_sec += 1; + now.tv_nsec += 500000000; + if (now.tv_nsec >= 1000000000) { + now.tv_sec += 1; + now.tv_nsec -= 1000000000; + } + + rate_refill(&c, &now); + + // 50 pps * 1.5s = 75 pkts, but capped at burst (5 * 1000 = 5000) + CHECK(c.tb_tokens == 5000); +} + +TEST_CASE("rate_refill high rate no overflow", "[rate]") { + cmd_t c = make_cmd(1000000); // 1 Mpps + rate_init(&c); + + while (rate_can_send(&c)) + rate_consume(&c); + + struct timespec now = c.tb_last; + now.tv_nsec += 10000000; // 10ms + + rate_refill(&c, &now); + + // 1e6 pps * 10ms = 10000 pkts = 10000000 millipkts, capped at burst + CHECK(c.tb_tokens == c.tb_max); +} + +TEST_CASE("rate_ns_until_token returns correct wait", "[rate]") { + cmd_t c = make_cmd(1000); + rate_init(&c); + + while (rate_can_send(&c)) + rate_consume(&c); + + int64_t ns = rate_ns_until_token(&c); + + // Need 1000 millipkts, rate is 1000000 millipkts/sec + // 1000 / 1000000 * 1e9 = 1000000 ns = 1ms + CHECK(ns == 1000000); +} + +TEST_CASE("rate_ns_until_token unlimited returns 0", "[rate]") { + cmd_t c = make_cmd(0); + CHECK(rate_ns_until_token(&c) == 0); +} + +TEST_CASE("rate_ns_until_token with tokens returns 0", "[rate]") { + cmd_t c = make_cmd(1000); + rate_init(&c); + // Bucket is full + CHECK(rate_ns_until_token(&c) == 0); +} + +TEST_CASE("rate_bps_to_pps 1G minimum frame", "[rate]") { + // 60-byte frame (min Ethernet without FCS) + 24 overhead = 84 bytes = 672 bits + // 1e9 / 672 = 1488095 + CHECK(rate_bps_to_pps(1000000000ULL, 60) == 1488095); +} + +TEST_CASE("rate_bps_to_pps 1G 1514-byte frame", "[rate]") { + // 1514-byte frame + 24 overhead = 1538 bytes = 12304 bits + // 1e9 / 12304 = 81274 + CHECK(rate_bps_to_pps(1000000000ULL, 1514) == 81274); +} + +TEST_CASE("rate_bps_to_pps 100M minimum frame", "[rate]") { + // 60 + 24 = 84 bytes = 672 bits + // 1e8 / 672 = 148809 + CHECK(rate_bps_to_pps(100000000ULL, 60) == 148809); +} + +TEST_CASE("rate_bps_to_pps floors to 1 pps", "[rate]") { + // 100 bps with a 1514-byte frame: 100 / 12304 = 0, floored to 1 + CHECK(rate_bps_to_pps(100, 1514) == 1); +} + +TEST_CASE("rate_bps_to_pps 10G", "[rate]") { + // 60 + 24 = 84 = 672 bits + // 10e9 / 672 = 14880952 + CHECK(rate_bps_to_pps(10000000000ULL, 60) == 14880952); +}