From 53c7189ff365877e385db636c708d457536c4538 Mon Sep 17 00:00:00 2001 From: Christian Deacon Date: Fri, 22 May 2020 20:53:56 +0000 Subject: [PATCH] Add RX Queue Config Option And Spread Load Across Queues. --- src/compressor.c | 10 +++++++++- src/compressor_cache_user.c | 31 ++++++++++++++++++++++++++----- src/compressor_cache_user.h | 4 +++- src/config.h | 1 + 4 files changed, 39 insertions(+), 7 deletions(-) diff --git a/src/compressor.c b/src/compressor.c index f11c92a..fe602f3 100644 --- a/src/compressor.c +++ b/src/compressor.c @@ -90,9 +90,17 @@ int main(int argc, char **argv) { tcp_exclude = 0; } + int rxqueues = 0; + + if (config_lookup_int(&config, "rxqueues", &rxqueues) == CONFIG_FALSE) + { + rxqueues = 0; + } + cfg.rate_limit = rate_limit; cfg.new_conn_limit = new_conn_limit; cfg.tcp_exclude = tcp_exclude; + cfg.rxqueues = rxqueues; int cockpit_enabled = 0; config_lookup_bool(&config, "cockpit_enabled", &cockpit_enabled); @@ -143,7 +151,7 @@ int main(int argc, char **argv) { return 1; } - load_skb_program(interface, ifindex, maps->xsk_map_fd, maps->a2s_cache_map_fd); + load_skb_program(interface, ifindex, maps->xsk_map_fd, maps->a2s_cache_map_fd, &cfg); if (redis_addr && redis_port) { start_cache_seeding(maps->a2s_cache_map_fd, forwarding_rules, redis_addr, redis_port); } diff --git a/src/compressor_cache_user.c b/src/compressor_cache_user.c index 6416796..ad71fc5 100644 --- a/src/compressor_cache_user.c +++ b/src/compressor_cache_user.c @@ -54,6 +54,7 @@ #include "compressor_filter_user.h" #include "xassert.h" #include "checksum.h" +#include "config.h" #ifndef AF_XDP #define AF_XDP 44 @@ -447,7 +448,7 @@ struct xdp_umem *xdp_umem_configure(int sfd) { return umem; } -struct xdp_sock *xsk_configure(struct xdp_umem *umem, int ifindex) { +struct xdp_sock *xsk_configure(struct xdp_umem *umem, int ifindex, int cpu_id, int *allow) { static int ndescs = NUM_DESCS; struct xdp_sock *xsk = calloc(1, sizeof(struct xdp_sock)); @@ -499,7 +500,7 @@ struct xdp_sock *xsk_configure(struct xdp_umem *umem, int ifindex) { struct sockaddr_xdp sxdp = {}; sxdp.sxdp_family = AF_XDP; sxdp.sxdp_ifindex = ifindex; - sxdp.sxdp_queue_id = 0; + sxdp.sxdp_queue_id = cpu_id; if (umem) { sxdp.sxdp_flags = XDP_SHARED_UMEM; @@ -508,21 +509,41 @@ struct xdp_sock *xsk_configure(struct xdp_umem *umem, int ifindex) { sxdp.sxdp_flags = 0; } - xassert(bind(sfd, (struct sockaddr *)&sxdp, sizeof(sxdp)) == 0); + if (bind(sfd, (struct sockaddr *)&sxdp, sizeof(sxdp)) != 0) + { + *allow = 0; + } return xsk; } -void load_skb_program(const char *ifname, int ifindex, int xsk_map_fd, int a2s_info_cache_map_fd) { +void load_skb_program(const char *ifname, int ifindex, int xsk_map_fd, int a2s_info_cache_map_fd, struct config *cfg) { a2s_cache_map_fd = a2s_info_cache_map_fd; xassert(pthread_rwlock_init(&a2s_cache_lock, NULL) == 0); int num_cpus = get_nprocs_conf(); + + // Check for config override. + if (cfg->rxqueues > 0) { + num_cpus = cfg->rxqueues; + } + if (num_cpus > MAX_CPUS) { num_cpus = MAX_CPUS; } + for (int cpu_id = 0; cpu_id < num_cpus; cpu_id++) { - struct xdp_sock *xsk = xsk_configure(NULL, ifindex); + int allow = 1; + struct xdp_sock *xsk = xsk_configure(NULL, ifindex, cpu_id, &allow); + + if (!allow) + { + int errnum = errno; + fprintf(stdout, "WARNING - Couldn't configure AF_XDP socket for CPU #%d :: %s\n", cpu_id, strerror(errnum)); + + continue; + } + xassert(bpf_map_update_elem(xsk_map_fd, &cpu_id, &xsk->sfd, BPF_ANY) == 0); xsk_cache_run(xsk); } diff --git a/src/compressor_cache_user.h b/src/compressor_cache_user.h index 446f0c3..0a6de37 100644 --- a/src/compressor_cache_user.h +++ b/src/compressor_cache_user.h @@ -17,7 +17,9 @@ #pragma once -void load_skb_program(const char *ifname, int ifindex, int xsk_map_fd, int a2s_info_cache_map_fd); +#include "config.h" + +void load_skb_program(const char *ifname, int ifindex, int xsk_map_fd, int a2s_info_cache_map_fd, struct config *cfg); void get_cache_rlock(void); void get_cache_wlock(void); void release_cache_lock(void); diff --git a/src/config.h b/src/config.h index bc7455b..c8623ab 100644 --- a/src/config.h +++ b/src/config.h @@ -26,6 +26,7 @@ struct config { uint_fast64_t new_conn_limit; uint_fast64_t rate_limit; uint_fast8_t tcp_exclude; + uint16_t rxqueues; }; struct forwarding_rule {