rcu: Allow rcu_do_batch() to dynamically adjust batch sizes

Bimodal behavior of rcu_do_batch() is not really suited to Google
applications like gfe servers.

When a process with millions of sockets exits, closing all files
queues two rcu callbacks per socket.

This eventually reaches the point where RCU enters an emergency
mode, where rcu_do_batch() do not return until whole queue is flushed.

Each rcu callback lasts at least 70 nsec, so with millions of
elements, we easily spend more than 100 msec without rescheduling.

Goal of this patch is to avoid the infamous message like following
"need_resched set for > 51999388 ns (52 ticks) without schedule"

We dynamically adjust the number of elements we process, instead
of 10 / INFINITE choices, we use a floor of ~1 % of current entries.

If the number is above 1000, we switch to a time based limit of 3 msec
per batch, adjustable with /sys/module/rcutree/parameters/rcu_resched_ns

Signed-off-by: Eric Dumazet <edumazet@google.com>
[ paulmck: Forward-port and remove debug statements. ]
Signed-off-by: Paul E. McKenney <paulmck@linux.ibm.com>
This commit is contained in:
Eric Dumazet 2019-07-24 18:07:52 -07:00 committed by Paul E. McKenney
parent f48fe4c586
commit cfcdef5e30
1 changed files with 19 additions and 1 deletions

View File

@ -56,6 +56,7 @@
#include <linux/smpboot.h>
#include <linux/jiffies.h>
#include <linux/sched/isolation.h>
#include <linux/sched/clock.h>
#include "../time/tick-internal.h"
#include "tree.h"
@ -416,6 +417,12 @@ module_param(qlowmark, long, 0444);
static ulong jiffies_till_first_fqs = ULONG_MAX;
static ulong jiffies_till_next_fqs = ULONG_MAX;
static bool rcu_kick_kthreads;
static int rcu_divisor = 7;
module_param(rcu_divisor, int, 0644);
/* Force an exit from rcu_do_batch() after 3 milliseconds. */
static long rcu_resched_ns = 3 * NSEC_PER_MSEC;
module_param(rcu_resched_ns, long, 0644);
/*
* How long the grace period must be before we start recruiting
@ -2109,6 +2116,7 @@ static void rcu_do_batch(struct rcu_data *rdp)
struct rcu_head *rhp;
struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl);
long bl, count;
long pending, tlimit = 0;
/* If no callbacks are ready, just return. */
if (!rcu_segcblist_ready_cbs(&rdp->cblist)) {
@ -2130,7 +2138,10 @@ static void rcu_do_batch(struct rcu_data *rdp)
local_irq_save(flags);
rcu_nocb_lock(rdp);
WARN_ON_ONCE(cpu_is_offline(smp_processor_id()));
bl = rdp->blimit;
pending = rcu_segcblist_n_cbs(&rdp->cblist);
bl = max(rdp->blimit, pending >> rcu_divisor);
if (unlikely(bl > 100))
tlimit = local_clock() + rcu_resched_ns;
trace_rcu_batch_start(rcu_state.name,
rcu_segcblist_n_lazy_cbs(&rdp->cblist),
rcu_segcblist_n_cbs(&rdp->cblist), bl);
@ -2153,6 +2164,13 @@ static void rcu_do_batch(struct rcu_data *rdp)
(need_resched() ||
(!is_idle_task(current) && !rcu_is_callbacks_kthread())))
break;
if (unlikely(tlimit)) {
/* only call local_clock() every 32 callbacks */
if (likely((-rcl.len & 31) || local_clock() < tlimit))
continue;
/* Exceeded the time limit, so leave. */
break;
}
if (offloaded) {
WARN_ON_ONCE(in_serving_softirq());
local_bh_enable();