kubernetes bounded_frequency_runner_test 源码
kubernetes bounded_frequency_runner_test 代码
文件路径:/pkg/util/async/bounded_frequency_runner_test.go
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package async
import (
"sync"
"testing"
"time"
)
// Track calls to the managed function.
type receiver struct {
lock sync.Mutex
run bool
retryFn func()
}
func (r *receiver) F() {
r.lock.Lock()
defer r.lock.Unlock()
r.run = true
if r.retryFn != nil {
r.retryFn()
r.retryFn = nil
}
}
func (r *receiver) reset() bool {
r.lock.Lock()
defer r.lock.Unlock()
was := r.run
r.run = false
return was
}
func (r *receiver) setRetryFn(retryFn func()) {
r.lock.Lock()
defer r.lock.Unlock()
r.retryFn = retryFn
}
// A single change event in the fake timer.
type timerUpdate struct {
active bool
next time.Duration // iff active == true
}
// Fake time.
type fakeTimer struct {
c chan time.Time
lock sync.Mutex
now time.Time
timeout time.Time
active bool
updated chan timerUpdate
}
func newFakeTimer() *fakeTimer {
ft := &fakeTimer{
now: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC),
c: make(chan time.Time),
updated: make(chan timerUpdate),
}
return ft
}
func (ft *fakeTimer) C() <-chan time.Time {
return ft.c
}
func (ft *fakeTimer) Reset(in time.Duration) bool {
ft.lock.Lock()
defer ft.lock.Unlock()
was := ft.active
ft.active = true
ft.timeout = ft.now.Add(in)
ft.updated <- timerUpdate{
active: true,
next: in,
}
return was
}
func (ft *fakeTimer) Stop() bool {
ft.lock.Lock()
defer ft.lock.Unlock()
was := ft.active
ft.active = false
ft.updated <- timerUpdate{
active: false,
}
return was
}
func (ft *fakeTimer) Now() time.Time {
ft.lock.Lock()
defer ft.lock.Unlock()
return ft.now
}
func (ft *fakeTimer) Remaining() time.Duration {
ft.lock.Lock()
defer ft.lock.Unlock()
return ft.timeout.Sub(ft.now)
}
func (ft *fakeTimer) Since(t time.Time) time.Duration {
ft.lock.Lock()
defer ft.lock.Unlock()
return ft.now.Sub(t)
}
func (ft *fakeTimer) Sleep(d time.Duration) {
// ft.advance grabs ft.lock
ft.advance(d)
}
// advance the current time.
func (ft *fakeTimer) advance(d time.Duration) {
ft.lock.Lock()
defer ft.lock.Unlock()
ft.now = ft.now.Add(d)
if ft.active && !ft.now.Before(ft.timeout) {
ft.active = false
ft.c <- ft.timeout
}
}
// return the calling line number (for printing)
// test the timer's state
func checkTimer(name string, t *testing.T, upd timerUpdate, active bool, next time.Duration) {
if upd.active != active {
t.Fatalf("%s: expected timer active=%v", name, active)
}
if active && upd.next != next {
t.Fatalf("%s: expected timer to be %v, got %v", name, next, upd.next)
}
}
// test and reset the receiver's state
func checkReceiver(name string, t *testing.T, receiver *receiver, expected bool) {
triggered := receiver.reset()
if expected && !triggered {
t.Fatalf("%s: function should have been called", name)
} else if !expected && triggered {
t.Fatalf("%s: function should not have been called", name)
}
}
// Durations embedded in test cases depend on these.
var minInterval = 1 * time.Second
var maxInterval = 10 * time.Second
func waitForReset(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectCall bool, expectNext time.Duration) {
upd := <-timer.updated // wait for stop
checkReceiver(name, t, obj, expectCall)
checkReceiver(name, t, obj, false) // prove post-condition
checkTimer(name, t, upd, false, 0)
upd = <-timer.updated // wait for reset
checkTimer(name, t, upd, true, expectNext)
}
func waitForRun(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
waitForReset(name, t, timer, obj, true, maxInterval)
}
func waitForRunWithRetry(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
// It will first get reset as with a normal run, and then get set again
waitForRun(name, t, timer, obj)
waitForReset(name, t, timer, obj, false, expectNext)
}
func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
waitForReset(name, t, timer, obj, false, expectNext)
}
func waitForNothing(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
select {
case <-timer.c:
t.Fatalf("%s: unexpected timer tick", name)
case upd := <-timer.updated:
t.Fatalf("%s: unexpected timer update %v", name, upd)
default:
}
checkReceiver(name, t, obj, false)
}
func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) {
obj := &receiver{}
timer := newFakeTimer()
runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer)
stop := make(chan struct{})
var upd timerUpdate
// Start.
go runner.Loop(stop)
upd = <-timer.updated // wait for initial time to be set to max
checkTimer("init", t, upd, true, maxInterval)
checkReceiver("init", t, obj, false)
// Run once, immediately.
// rel=0ms
runner.Run()
waitForRun("first run", t, timer, obj)
// Run again, before minInterval expires.
timer.advance(500 * time.Millisecond) // rel=500ms
runner.Run()
waitForDefer("too soon after first", t, timer, obj, 500*time.Millisecond)
// Run again, before minInterval expires.
timer.advance(499 * time.Millisecond) // rel=999ms
runner.Run()
waitForDefer("still too soon after first", t, timer, obj, 1*time.Millisecond)
// Do the deferred run
timer.advance(1 * time.Millisecond) // rel=1000ms
waitForRun("second run", t, timer, obj)
// Try again immediately
runner.Run()
waitForDefer("too soon after second", t, timer, obj, 1*time.Second)
// Run again, before minInterval expires.
timer.advance(1 * time.Millisecond) // rel=1ms
runner.Run()
waitForDefer("still too soon after second", t, timer, obj, 999*time.Millisecond)
// Ensure that we don't run again early
timer.advance(998 * time.Millisecond) // rel=999ms
waitForNothing("premature", t, timer, obj)
// Do the deferred run
timer.advance(1 * time.Millisecond) // rel=1000ms
waitForRun("third run", t, timer, obj)
// Let minInterval pass, but there are no runs queued
timer.advance(1 * time.Second) // rel=1000ms
waitForNothing("minInterval", t, timer, obj)
// Let maxInterval pass
timer.advance(9 * time.Second) // rel=10000ms
waitForRun("maxInterval", t, timer, obj)
// Run again, before minInterval expires.
timer.advance(1 * time.Millisecond) // rel=1ms
runner.Run()
waitForDefer("too soon after maxInterval run", t, timer, obj, 999*time.Millisecond)
// Let minInterval pass
timer.advance(999 * time.Millisecond) // rel=1000ms
waitForRun("fourth run", t, timer, obj)
// Clean up.
stop <- struct{}{}
// a message is sent to time.updated in func Stop() at the end of the child goroutine
// to terminate the child, a receive on time.updated is needed here
<-timer.updated
}
func Test_BoundedFrequencyRunnerBurst(t *testing.T) {
obj := &receiver{}
timer := newFakeTimer()
runner := construct("test-runner", obj.F, minInterval, maxInterval, 2, timer)
stop := make(chan struct{})
var upd timerUpdate
// Start.
go runner.Loop(stop)
upd = <-timer.updated // wait for initial time to be set to max
checkTimer("init", t, upd, true, maxInterval)
checkReceiver("init", t, obj, false)
// Run once, immediately.
// abs=0ms, rel=0ms
runner.Run()
waitForRun("first run", t, timer, obj)
// Run again, before minInterval expires, with burst.
timer.advance(1 * time.Millisecond) // abs=1ms, rel=1ms
runner.Run()
waitForRun("second run", t, timer, obj)
// Run again, before minInterval expires.
timer.advance(498 * time.Millisecond) // abs=499ms, rel=498ms
runner.Run()
waitForDefer("too soon after second", t, timer, obj, 502*time.Millisecond)
// Run again, before minInterval expires.
timer.advance(1 * time.Millisecond) // abs=500ms, rel=499ms
runner.Run()
waitForDefer("too soon after second 2", t, timer, obj, 501*time.Millisecond)
// Run again, before minInterval expires.
timer.advance(1 * time.Millisecond) // abs=501ms, rel=500ms
runner.Run()
waitForDefer("too soon after second 3", t, timer, obj, 500*time.Millisecond)
// Advance timer enough to replenish bursts, but not enough to be minInterval
// after the last run
timer.advance(499 * time.Millisecond) // abs=1000ms, rel=999ms
waitForNothing("not minInterval", t, timer, obj)
runner.Run()
waitForRun("third run", t, timer, obj)
// Run again, before minInterval expires.
timer.advance(1 * time.Millisecond) // abs=1001ms, rel=1ms
runner.Run()
waitForDefer("too soon after third", t, timer, obj, 999*time.Millisecond)
// Run again, before minInterval expires.
timer.advance(998 * time.Millisecond) // abs=1999ms, rel=999ms
runner.Run()
waitForDefer("too soon after third 2", t, timer, obj, 1*time.Millisecond)
// Advance and do the deferred run
timer.advance(1 * time.Millisecond) // abs=2000ms, rel=1000ms
waitForRun("fourth run", t, timer, obj)
// Run again, once burst has fully replenished.
timer.advance(2 * time.Second) // abs=4000ms, rel=2000ms
runner.Run()
waitForRun("fifth run", t, timer, obj)
runner.Run()
waitForRun("sixth run", t, timer, obj)
runner.Run()
waitForDefer("too soon after sixth", t, timer, obj, 1*time.Second)
// Wait until minInterval after the last run
timer.advance(1 * time.Second) // abs=5000ms, rel=1000ms
waitForRun("seventh run", t, timer, obj)
// Wait for maxInterval
timer.advance(10 * time.Second) // abs=15000ms, rel=10000ms
waitForRun("maxInterval", t, timer, obj)
// Clean up.
stop <- struct{}{}
// a message is sent to time.updated in func Stop() at the end of the child goroutine
// to terminate the child, a receive on time.updated is needed here
<-timer.updated
}
func Test_BoundedFrequencyRunnerRetryAfter(t *testing.T) {
obj := &receiver{}
timer := newFakeTimer()
runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer)
stop := make(chan struct{})
var upd timerUpdate
// Start.
go runner.Loop(stop)
upd = <-timer.updated // wait for initial time to be set to max
checkTimer("init", t, upd, true, maxInterval)
checkReceiver("init", t, obj, false)
// Run once, immediately, and queue a retry
// rel=0ms
obj.setRetryFn(func() { runner.RetryAfter(5 * time.Second) })
runner.Run()
waitForRunWithRetry("first run", t, timer, obj, 5*time.Second)
// Nothing happens...
timer.advance(time.Second) // rel=1000ms
waitForNothing("minInterval, nothing queued", t, timer, obj)
// After retryInterval, function is called
timer.advance(4 * time.Second) // rel=5000ms
waitForRun("retry", t, timer, obj)
// Run again, before minInterval expires.
timer.advance(499 * time.Millisecond) // rel=499ms
runner.Run()
waitForDefer("too soon after retry", t, timer, obj, 501*time.Millisecond)
// Do the deferred run, queue another retry after it returns
timer.advance(501 * time.Millisecond) // rel=1000ms
runner.RetryAfter(5 * time.Second)
waitForRunWithRetry("second run", t, timer, obj, 5*time.Second)
// Wait for minInterval to pass
timer.advance(time.Second) // rel=1000ms
waitForNothing("minInterval, nothing queued", t, timer, obj)
// Now do another run
runner.Run()
waitForRun("third run", t, timer, obj)
// Retry was cancelled because we already ran
timer.advance(4 * time.Second)
waitForNothing("retry cancelled", t, timer, obj)
// Run, queue a retry from a goroutine
obj.setRetryFn(func() {
go func() {
time.Sleep(100 * time.Millisecond)
runner.RetryAfter(5 * time.Second)
}()
})
runner.Run()
waitForRunWithRetry("fourth run", t, timer, obj, 5*time.Second)
// Call Run again before minInterval passes
timer.advance(100 * time.Millisecond) // rel=100ms
runner.Run()
waitForDefer("too soon after fourth run", t, timer, obj, 900*time.Millisecond)
// Deferred run will run after minInterval passes
timer.advance(900 * time.Millisecond) // rel=1000ms
waitForRun("fifth run", t, timer, obj)
// Retry was cancelled because we already ran
timer.advance(4 * time.Second) // rel=4s since run, 5s since RetryAfter
waitForNothing("retry cancelled", t, timer, obj)
// Rerun happens after maxInterval
timer.advance(5 * time.Second) // rel=9s since run, 10s since RetryAfter
waitForNothing("premature", t, timer, obj)
timer.advance(time.Second) // rel=10s since run
waitForRun("maxInterval", t, timer, obj)
// Clean up.
stop <- struct{}{}
// a message is sent to time.updated in func Stop() at the end of the child goroutine
// to terminate the child, a receive on time.updated is needed here
<-timer.updated
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦