kubernetes bounded_frequency_runner_test 源码

  • 2022-09-18
  • 浏览 (223)

kubernetes bounded_frequency_runner_test 代码

文件路径:/pkg/util/async/bounded_frequency_runner_test.go

/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package async

import (
	"sync"
	"testing"
	"time"
)

// Track calls to the managed function.
type receiver struct {
	lock    sync.Mutex
	run     bool
	retryFn func()
}

func (r *receiver) F() {
	r.lock.Lock()
	defer r.lock.Unlock()
	r.run = true

	if r.retryFn != nil {
		r.retryFn()
		r.retryFn = nil
	}
}

func (r *receiver) reset() bool {
	r.lock.Lock()
	defer r.lock.Unlock()
	was := r.run
	r.run = false
	return was
}

func (r *receiver) setRetryFn(retryFn func()) {
	r.lock.Lock()
	defer r.lock.Unlock()
	r.retryFn = retryFn
}

// A single change event in the fake timer.
type timerUpdate struct {
	active bool
	next   time.Duration // iff active == true
}

// Fake time.
type fakeTimer struct {
	c chan time.Time

	lock    sync.Mutex
	now     time.Time
	timeout time.Time
	active  bool

	updated chan timerUpdate
}

func newFakeTimer() *fakeTimer {
	ft := &fakeTimer{
		now:     time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC),
		c:       make(chan time.Time),
		updated: make(chan timerUpdate),
	}
	return ft
}

func (ft *fakeTimer) C() <-chan time.Time {
	return ft.c
}

func (ft *fakeTimer) Reset(in time.Duration) bool {
	ft.lock.Lock()
	defer ft.lock.Unlock()

	was := ft.active
	ft.active = true
	ft.timeout = ft.now.Add(in)
	ft.updated <- timerUpdate{
		active: true,
		next:   in,
	}
	return was
}

func (ft *fakeTimer) Stop() bool {
	ft.lock.Lock()
	defer ft.lock.Unlock()

	was := ft.active
	ft.active = false
	ft.updated <- timerUpdate{
		active: false,
	}
	return was
}

func (ft *fakeTimer) Now() time.Time {
	ft.lock.Lock()
	defer ft.lock.Unlock()

	return ft.now
}

func (ft *fakeTimer) Remaining() time.Duration {
	ft.lock.Lock()
	defer ft.lock.Unlock()

	return ft.timeout.Sub(ft.now)
}

func (ft *fakeTimer) Since(t time.Time) time.Duration {
	ft.lock.Lock()
	defer ft.lock.Unlock()

	return ft.now.Sub(t)
}

func (ft *fakeTimer) Sleep(d time.Duration) {
	// ft.advance grabs ft.lock
	ft.advance(d)
}

// advance the current time.
func (ft *fakeTimer) advance(d time.Duration) {
	ft.lock.Lock()
	defer ft.lock.Unlock()

	ft.now = ft.now.Add(d)
	if ft.active && !ft.now.Before(ft.timeout) {
		ft.active = false
		ft.c <- ft.timeout
	}
}

// return the calling line number (for printing)
// test the timer's state
func checkTimer(name string, t *testing.T, upd timerUpdate, active bool, next time.Duration) {
	if upd.active != active {
		t.Fatalf("%s: expected timer active=%v", name, active)
	}
	if active && upd.next != next {
		t.Fatalf("%s: expected timer to be %v, got %v", name, next, upd.next)
	}
}

// test and reset the receiver's state
func checkReceiver(name string, t *testing.T, receiver *receiver, expected bool) {
	triggered := receiver.reset()
	if expected && !triggered {
		t.Fatalf("%s: function should have been called", name)
	} else if !expected && triggered {
		t.Fatalf("%s: function should not have been called", name)
	}
}

// Durations embedded in test cases depend on these.
var minInterval = 1 * time.Second
var maxInterval = 10 * time.Second

func waitForReset(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectCall bool, expectNext time.Duration) {
	upd := <-timer.updated // wait for stop
	checkReceiver(name, t, obj, expectCall)
	checkReceiver(name, t, obj, false) // prove post-condition
	checkTimer(name, t, upd, false, 0)
	upd = <-timer.updated // wait for reset
	checkTimer(name, t, upd, true, expectNext)
}

func waitForRun(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
	waitForReset(name, t, timer, obj, true, maxInterval)
}

func waitForRunWithRetry(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
	// It will first get reset as with a normal run, and then get set again
	waitForRun(name, t, timer, obj)
	waitForReset(name, t, timer, obj, false, expectNext)
}

func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
	waitForReset(name, t, timer, obj, false, expectNext)
}

func waitForNothing(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
	select {
	case <-timer.c:
		t.Fatalf("%s: unexpected timer tick", name)
	case upd := <-timer.updated:
		t.Fatalf("%s: unexpected timer update %v", name, upd)
	default:
	}
	checkReceiver(name, t, obj, false)
}

func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) {
	obj := &receiver{}
	timer := newFakeTimer()
	runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer)
	stop := make(chan struct{})

	var upd timerUpdate

	// Start.
	go runner.Loop(stop)
	upd = <-timer.updated // wait for initial time to be set to max
	checkTimer("init", t, upd, true, maxInterval)
	checkReceiver("init", t, obj, false)

	// Run once, immediately.
	// rel=0ms
	runner.Run()
	waitForRun("first run", t, timer, obj)

	// Run again, before minInterval expires.
	timer.advance(500 * time.Millisecond) // rel=500ms
	runner.Run()
	waitForDefer("too soon after first", t, timer, obj, 500*time.Millisecond)

	// Run again, before minInterval expires.
	timer.advance(499 * time.Millisecond) // rel=999ms
	runner.Run()
	waitForDefer("still too soon after first", t, timer, obj, 1*time.Millisecond)

	// Do the deferred run
	timer.advance(1 * time.Millisecond) // rel=1000ms
	waitForRun("second run", t, timer, obj)

	// Try again immediately
	runner.Run()
	waitForDefer("too soon after second", t, timer, obj, 1*time.Second)

	// Run again, before minInterval expires.
	timer.advance(1 * time.Millisecond) // rel=1ms
	runner.Run()
	waitForDefer("still too soon after second", t, timer, obj, 999*time.Millisecond)

	// Ensure that we don't run again early
	timer.advance(998 * time.Millisecond) // rel=999ms
	waitForNothing("premature", t, timer, obj)

	// Do the deferred run
	timer.advance(1 * time.Millisecond) // rel=1000ms
	waitForRun("third run", t, timer, obj)

	// Let minInterval pass, but there are no runs queued
	timer.advance(1 * time.Second) // rel=1000ms
	waitForNothing("minInterval", t, timer, obj)

	// Let maxInterval pass
	timer.advance(9 * time.Second) // rel=10000ms
	waitForRun("maxInterval", t, timer, obj)

	// Run again, before minInterval expires.
	timer.advance(1 * time.Millisecond) // rel=1ms
	runner.Run()
	waitForDefer("too soon after maxInterval run", t, timer, obj, 999*time.Millisecond)

	// Let minInterval pass
	timer.advance(999 * time.Millisecond) // rel=1000ms
	waitForRun("fourth run", t, timer, obj)

	// Clean up.
	stop <- struct{}{}
	// a message is sent to time.updated in func Stop() at the end of the child goroutine
	// to terminate the child, a receive on time.updated is needed here
	<-timer.updated
}

func Test_BoundedFrequencyRunnerBurst(t *testing.T) {
	obj := &receiver{}
	timer := newFakeTimer()
	runner := construct("test-runner", obj.F, minInterval, maxInterval, 2, timer)
	stop := make(chan struct{})

	var upd timerUpdate

	// Start.
	go runner.Loop(stop)
	upd = <-timer.updated // wait for initial time to be set to max
	checkTimer("init", t, upd, true, maxInterval)
	checkReceiver("init", t, obj, false)

	// Run once, immediately.
	// abs=0ms, rel=0ms
	runner.Run()
	waitForRun("first run", t, timer, obj)

	// Run again, before minInterval expires, with burst.
	timer.advance(1 * time.Millisecond) // abs=1ms, rel=1ms
	runner.Run()
	waitForRun("second run", t, timer, obj)

	// Run again, before minInterval expires.
	timer.advance(498 * time.Millisecond) // abs=499ms, rel=498ms
	runner.Run()
	waitForDefer("too soon after second", t, timer, obj, 502*time.Millisecond)

	// Run again, before minInterval expires.
	timer.advance(1 * time.Millisecond) // abs=500ms, rel=499ms
	runner.Run()
	waitForDefer("too soon after second 2", t, timer, obj, 501*time.Millisecond)

	// Run again, before minInterval expires.
	timer.advance(1 * time.Millisecond) // abs=501ms, rel=500ms
	runner.Run()
	waitForDefer("too soon after second 3", t, timer, obj, 500*time.Millisecond)

	// Advance timer enough to replenish bursts, but not enough to be minInterval
	// after the last run
	timer.advance(499 * time.Millisecond) // abs=1000ms, rel=999ms
	waitForNothing("not minInterval", t, timer, obj)
	runner.Run()
	waitForRun("third run", t, timer, obj)

	// Run again, before minInterval expires.
	timer.advance(1 * time.Millisecond) // abs=1001ms, rel=1ms
	runner.Run()
	waitForDefer("too soon after third", t, timer, obj, 999*time.Millisecond)

	// Run again, before minInterval expires.
	timer.advance(998 * time.Millisecond) // abs=1999ms, rel=999ms
	runner.Run()
	waitForDefer("too soon after third 2", t, timer, obj, 1*time.Millisecond)

	// Advance and do the deferred run
	timer.advance(1 * time.Millisecond) // abs=2000ms, rel=1000ms
	waitForRun("fourth run", t, timer, obj)

	// Run again, once burst has fully replenished.
	timer.advance(2 * time.Second) // abs=4000ms, rel=2000ms
	runner.Run()
	waitForRun("fifth run", t, timer, obj)
	runner.Run()
	waitForRun("sixth run", t, timer, obj)
	runner.Run()
	waitForDefer("too soon after sixth", t, timer, obj, 1*time.Second)

	// Wait until minInterval after the last run
	timer.advance(1 * time.Second) // abs=5000ms, rel=1000ms
	waitForRun("seventh run", t, timer, obj)

	// Wait for maxInterval
	timer.advance(10 * time.Second) // abs=15000ms, rel=10000ms
	waitForRun("maxInterval", t, timer, obj)

	// Clean up.
	stop <- struct{}{}
	// a message is sent to time.updated in func Stop() at the end of the child goroutine
	// to terminate the child, a receive on time.updated is needed here
	<-timer.updated
}

func Test_BoundedFrequencyRunnerRetryAfter(t *testing.T) {
	obj := &receiver{}
	timer := newFakeTimer()
	runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer)
	stop := make(chan struct{})

	var upd timerUpdate

	// Start.
	go runner.Loop(stop)
	upd = <-timer.updated // wait for initial time to be set to max
	checkTimer("init", t, upd, true, maxInterval)
	checkReceiver("init", t, obj, false)

	// Run once, immediately, and queue a retry
	// rel=0ms
	obj.setRetryFn(func() { runner.RetryAfter(5 * time.Second) })
	runner.Run()
	waitForRunWithRetry("first run", t, timer, obj, 5*time.Second)

	// Nothing happens...
	timer.advance(time.Second) // rel=1000ms
	waitForNothing("minInterval, nothing queued", t, timer, obj)

	// After retryInterval, function is called
	timer.advance(4 * time.Second) // rel=5000ms
	waitForRun("retry", t, timer, obj)

	// Run again, before minInterval expires.
	timer.advance(499 * time.Millisecond) // rel=499ms
	runner.Run()
	waitForDefer("too soon after retry", t, timer, obj, 501*time.Millisecond)

	// Do the deferred run, queue another retry after it returns
	timer.advance(501 * time.Millisecond) // rel=1000ms
	runner.RetryAfter(5 * time.Second)
	waitForRunWithRetry("second run", t, timer, obj, 5*time.Second)

	// Wait for minInterval to pass
	timer.advance(time.Second) // rel=1000ms
	waitForNothing("minInterval, nothing queued", t, timer, obj)

	// Now do another run
	runner.Run()
	waitForRun("third run", t, timer, obj)

	// Retry was cancelled because we already ran
	timer.advance(4 * time.Second)
	waitForNothing("retry cancelled", t, timer, obj)

	// Run, queue a retry from a goroutine
	obj.setRetryFn(func() {
		go func() {
			time.Sleep(100 * time.Millisecond)
			runner.RetryAfter(5 * time.Second)
		}()
	})
	runner.Run()
	waitForRunWithRetry("fourth run", t, timer, obj, 5*time.Second)

	// Call Run again before minInterval passes
	timer.advance(100 * time.Millisecond) // rel=100ms
	runner.Run()
	waitForDefer("too soon after fourth run", t, timer, obj, 900*time.Millisecond)

	// Deferred run will run after minInterval passes
	timer.advance(900 * time.Millisecond) // rel=1000ms
	waitForRun("fifth run", t, timer, obj)

	// Retry was cancelled because we already ran
	timer.advance(4 * time.Second) // rel=4s since run, 5s since RetryAfter
	waitForNothing("retry cancelled", t, timer, obj)

	// Rerun happens after maxInterval
	timer.advance(5 * time.Second) // rel=9s since run, 10s since RetryAfter
	waitForNothing("premature", t, timer, obj)
	timer.advance(time.Second) // rel=10s since run
	waitForRun("maxInterval", t, timer, obj)

	// Clean up.
	stop <- struct{}{}
	// a message is sent to time.updated in func Stop() at the end of the child goroutine
	// to terminate the child, a receive on time.updated is needed here
	<-timer.updated
}

相关信息

kubernetes 源码目录

相关文章

kubernetes bounded_frequency_runner 源码

kubernetes runner 源码

kubernetes runner_test 源码

0  赞