From 89da3b94bb97417ca2c5b0ce3a28643819030247 Mon Sep 17 00:00:00 2001 From: Oleg Nesterov Date: Thu, 25 Apr 2019 18:50:55 +0200 Subject: [PATCH] rcu/sync: Simplify the state machine With this patch rcu_sync has a single state variable and the transition rules become really simple: GP_IDLE - owned by the first rcu_sync_enter() which moves it to GP_ENTER - owned by rcu-callback which moves it to GP_PASSED - owned by the last rcu_sync_exit() which moves it to GP_EXIT - and this is the only "nontrivial" state. rcu-callback moves it back to GP_IDLE unless another enter() comes before a GP pass. If rcu-callback is invoked before the next rcu_sync_exit() it must see gp_count incremented by that enter() and set GP_PASSED. Otherwise, if the next rcu_sync_exit() wins the race, it will move it to GP_REPLAY - owned by rcu-callback which moves it to GP_EXIT Signed-off-by: Oleg Nesterov [ paulmck: While here, apply READ_ONCE() and WRITE_ONCE() to ->gp_state. ] [ paulmck: Tweaks to make htmldocs happy. (Reported by kbuild test robot.) ] Signed-off-by: Paul E. McKenney --- include/linux/rcu_sync.h | 4 +- kernel/rcu/sync.c | 193 ++++++++++++++++++++++----------------- 2 files changed, 110 insertions(+), 87 deletions(-) diff --git a/include/linux/rcu_sync.h b/include/linux/rcu_sync.h index 87971e85519c..9b83865d24f9 100644 --- a/include/linux/rcu_sync.h +++ b/include/linux/rcu_sync.h @@ -19,7 +19,6 @@ struct rcu_sync { int gp_count; wait_queue_head_t gp_wait; - int cb_state; struct rcu_head cb_head; }; @@ -36,7 +35,7 @@ static inline bool rcu_sync_is_idle(struct rcu_sync *rsp) !rcu_read_lock_bh_held() && !rcu_read_lock_sched_held(), "suspicious rcu_sync_is_idle() usage"); - return !rsp->gp_state; /* GP_IDLE */ + return !READ_ONCE(rsp->gp_state); /* GP_IDLE */ } extern void rcu_sync_init(struct rcu_sync *); @@ -49,7 +48,6 @@ extern void rcu_sync_dtor(struct rcu_sync *); .gp_state = 0, \ .gp_count = 0, \ .gp_wait = __WAIT_QUEUE_HEAD_INITIALIZER(name.gp_wait), \ - .cb_state = 0, \ } #define DEFINE_RCU_SYNC(name) \ diff --git a/kernel/rcu/sync.c b/kernel/rcu/sync.c index ee427e138dad..d4558ab7a07d 100644 --- a/kernel/rcu/sync.c +++ b/kernel/rcu/sync.c @@ -10,15 +10,13 @@ #include #include -enum { GP_IDLE = 0, GP_PENDING, GP_PASSED }; -enum { CB_IDLE = 0, CB_PENDING, CB_REPLAY }; +enum { GP_IDLE = 0, GP_ENTER, GP_PASSED, GP_EXIT, GP_REPLAY }; #define rss_lock gp_wait.lock /** * rcu_sync_init() - Initialize an rcu_sync structure * @rsp: Pointer to rcu_sync structure to be initialized - * @type: Flavor of RCU with which to synchronize rcu_sync structure */ void rcu_sync_init(struct rcu_sync *rsp) { @@ -41,6 +39,70 @@ void rcu_sync_enter_start(struct rcu_sync *rsp) rsp->gp_state = GP_PASSED; } + +static void rcu_sync_func(struct rcu_head *rhp); + +static void rcu_sync_call(struct rcu_sync *rsp) +{ + call_rcu(&rsp->cb_head, rcu_sync_func); +} + +/** + * rcu_sync_func() - Callback function managing reader access to fastpath + * @rhp: Pointer to rcu_head in rcu_sync structure to use for synchronization + * + * This function is passed to call_rcu() function by rcu_sync_enter() and + * rcu_sync_exit(), so that it is invoked after a grace period following the + * that invocation of enter/exit. + * + * If it is called by rcu_sync_enter() it signals that all the readers were + * switched onto slow path. + * + * If it is called by rcu_sync_exit() it takes action based on events that + * have taken place in the meantime, so that closely spaced rcu_sync_enter() + * and rcu_sync_exit() pairs need not wait for a grace period. + * + * If another rcu_sync_enter() is invoked before the grace period + * ended, reset state to allow the next rcu_sync_exit() to let the + * readers back onto their fastpaths (after a grace period). If both + * another rcu_sync_enter() and its matching rcu_sync_exit() are invoked + * before the grace period ended, re-invoke call_rcu() on behalf of that + * rcu_sync_exit(). Otherwise, set all state back to idle so that readers + * can again use their fastpaths. + */ +static void rcu_sync_func(struct rcu_head *rhp) +{ + struct rcu_sync *rsp = container_of(rhp, struct rcu_sync, cb_head); + unsigned long flags; + + WARN_ON_ONCE(READ_ONCE(rsp->gp_state) == GP_IDLE); + WARN_ON_ONCE(READ_ONCE(rsp->gp_state) == GP_PASSED); + + spin_lock_irqsave(&rsp->rss_lock, flags); + if (rsp->gp_count) { + /* + * We're at least a GP after the GP_IDLE->GP_ENTER transition. + */ + WRITE_ONCE(rsp->gp_state, GP_PASSED); + wake_up_locked(&rsp->gp_wait); + } else if (rsp->gp_state == GP_REPLAY) { + /* + * A new rcu_sync_exit() has happened; requeue the callback to + * catch a later GP. + */ + WRITE_ONCE(rsp->gp_state, GP_EXIT); + rcu_sync_call(rsp); + } else { + /* + * We're at least a GP after the last rcu_sync_exit(); eveybody + * will now have observed the write side critical section. + * Let 'em rip!. + */ + WRITE_ONCE(rsp->gp_state, GP_IDLE); + } + spin_unlock_irqrestore(&rsp->rss_lock, flags); +} + /** * rcu_sync_enter() - Force readers onto slowpath * @rsp: Pointer to rcu_sync structure to use for synchronization @@ -58,84 +120,43 @@ void rcu_sync_enter_start(struct rcu_sync *rsp) */ void rcu_sync_enter(struct rcu_sync *rsp) { - bool need_wait, need_sync; + int gp_state; spin_lock_irq(&rsp->rss_lock); - need_wait = rsp->gp_count++; - need_sync = rsp->gp_state == GP_IDLE; - if (need_sync) - rsp->gp_state = GP_PENDING; + gp_state = rsp->gp_state; + if (gp_state == GP_IDLE) { + WRITE_ONCE(rsp->gp_state, GP_ENTER); + WARN_ON_ONCE(rsp->gp_count); + /* + * Note that we could simply do rcu_sync_call(rsp) here and + * avoid the "if (gp_state == GP_IDLE)" block below. + * + * However, synchronize_rcu() can be faster if rcu_expedited + * or rcu_blocking_is_gp() is true. + * + * Another reason is that we can't wait for rcu callback if + * we are called at early boot time but this shouldn't happen. + */ + } + rsp->gp_count++; spin_unlock_irq(&rsp->rss_lock); - WARN_ON_ONCE(need_wait && need_sync); - if (need_sync) { + if (gp_state == GP_IDLE) { + /* + * See the comment above, this simply does the "synchronous" + * call_rcu(rcu_sync_func) which does GP_ENTER -> GP_PASSED. + */ synchronize_rcu(); - rsp->gp_state = GP_PASSED; - wake_up_all(&rsp->gp_wait); - } else if (need_wait) { - wait_event(rsp->gp_wait, rsp->gp_state == GP_PASSED); - } else { - /* - * Possible when there's a pending CB from a rcu_sync_exit(). - * Nobody has yet been allowed the 'fast' path and thus we can - * avoid doing any sync(). The callback will get 'dropped'. - */ - WARN_ON_ONCE(rsp->gp_state != GP_PASSED); + rcu_sync_func(&rsp->cb_head); + /* Not really needed, wait_event() would see GP_PASSED. */ + return; } + + wait_event(rsp->gp_wait, READ_ONCE(rsp->gp_state) >= GP_PASSED); } /** - * rcu_sync_func() - Callback function managing reader access to fastpath - * @rhp: Pointer to rcu_head in rcu_sync structure to use for synchronization - * - * This function is passed to one of the call_rcu() functions by - * rcu_sync_exit(), so that it is invoked after a grace period following the - * that invocation of rcu_sync_exit(). It takes action based on events that - * have taken place in the meantime, so that closely spaced rcu_sync_enter() - * and rcu_sync_exit() pairs need not wait for a grace period. - * - * If another rcu_sync_enter() is invoked before the grace period - * ended, reset state to allow the next rcu_sync_exit() to let the - * readers back onto their fastpaths (after a grace period). If both - * another rcu_sync_enter() and its matching rcu_sync_exit() are invoked - * before the grace period ended, re-invoke call_rcu() on behalf of that - * rcu_sync_exit(). Otherwise, set all state back to idle so that readers - * can again use their fastpaths. - */ -static void rcu_sync_func(struct rcu_head *rhp) -{ - struct rcu_sync *rsp = container_of(rhp, struct rcu_sync, cb_head); - unsigned long flags; - - WARN_ON_ONCE(rsp->gp_state != GP_PASSED); - WARN_ON_ONCE(rsp->cb_state == CB_IDLE); - - spin_lock_irqsave(&rsp->rss_lock, flags); - if (rsp->gp_count) { - /* - * A new rcu_sync_begin() has happened; drop the callback. - */ - rsp->cb_state = CB_IDLE; - } else if (rsp->cb_state == CB_REPLAY) { - /* - * A new rcu_sync_exit() has happened; requeue the callback - * to catch a later GP. - */ - rsp->cb_state = CB_PENDING; - call_rcu(&rsp->cb_head, rcu_sync_func); - } else { - /* - * We're at least a GP after rcu_sync_exit(); eveybody will now - * have observed the write side critical section. Let 'em rip!. - */ - rsp->cb_state = CB_IDLE; - rsp->gp_state = GP_IDLE; - } - spin_unlock_irqrestore(&rsp->rss_lock, flags); -} - -/** - * rcu_sync_exit() - Allow readers back onto fast patch after grace period + * rcu_sync_exit() - Allow readers back onto fast path after grace period * @rsp: Pointer to rcu_sync structure to use for synchronization * * This function is used by updaters who have completed, and can therefore @@ -146,13 +167,16 @@ static void rcu_sync_func(struct rcu_head *rhp) */ void rcu_sync_exit(struct rcu_sync *rsp) { + WARN_ON_ONCE(READ_ONCE(rsp->gp_state) == GP_IDLE); + WARN_ON_ONCE(READ_ONCE(rsp->gp_count) == 0); + spin_lock_irq(&rsp->rss_lock); if (!--rsp->gp_count) { - if (rsp->cb_state == CB_IDLE) { - rsp->cb_state = CB_PENDING; - call_rcu(&rsp->cb_head, rcu_sync_func); - } else if (rsp->cb_state == CB_PENDING) { - rsp->cb_state = CB_REPLAY; + if (rsp->gp_state == GP_PASSED) { + WRITE_ONCE(rsp->gp_state, GP_EXIT); + rcu_sync_call(rsp); + } else if (rsp->gp_state == GP_EXIT) { + WRITE_ONCE(rsp->gp_state, GP_REPLAY); } } spin_unlock_irq(&rsp->rss_lock); @@ -164,18 +188,19 @@ void rcu_sync_exit(struct rcu_sync *rsp) */ void rcu_sync_dtor(struct rcu_sync *rsp) { - int cb_state; + int gp_state; - WARN_ON_ONCE(rsp->gp_count); + WARN_ON_ONCE(READ_ONCE(rsp->gp_count)); + WARN_ON_ONCE(READ_ONCE(rsp->gp_state) == GP_PASSED); spin_lock_irq(&rsp->rss_lock); - if (rsp->cb_state == CB_REPLAY) - rsp->cb_state = CB_PENDING; - cb_state = rsp->cb_state; + if (rsp->gp_state == GP_REPLAY) + WRITE_ONCE(rsp->gp_state, GP_EXIT); + gp_state = rsp->gp_state; spin_unlock_irq(&rsp->rss_lock); - if (cb_state != CB_IDLE) { + if (gp_state != GP_IDLE) { rcu_barrier(); - WARN_ON_ONCE(rsp->cb_state != CB_IDLE); + WARN_ON_ONCE(rsp->gp_state != GP_IDLE); } }