kubernetes leaderelection_test 源码

  • 2022-09-18
  • 浏览 (207)

kubernetes leaderelection_test 代码

文件路径:/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go

/*
Copyright 2015 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package leaderelection

import (
	"context"
	"encoding/json"
	"fmt"
	"sync"
	"testing"
	"time"

	coordinationv1 "k8s.io/api/coordination/v1"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/equality"
	"k8s.io/apimachinery/pkg/api/errors"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/util/diff"
	"k8s.io/client-go/kubernetes/fake"
	fakeclient "k8s.io/client-go/testing"
	rl "k8s.io/client-go/tools/leaderelection/resourcelock"
	"k8s.io/client-go/tools/record"
	"k8s.io/utils/clock"
)

func createLockObject(t *testing.T, objectType, namespace, name string, record *rl.LeaderElectionRecord) (obj runtime.Object) {
	objectMeta := metav1.ObjectMeta{
		Namespace: namespace,
		Name:      name,
	}
	if record != nil {
		recordBytes, _ := json.Marshal(record)
		objectMeta.Annotations = map[string]string{
			rl.LeaderElectionRecordAnnotationKey: string(recordBytes),
		}
	}
	switch objectType {
	case "endpoints":
		obj = &corev1.Endpoints{ObjectMeta: objectMeta}
	case "configmaps":
		obj = &corev1.ConfigMap{ObjectMeta: objectMeta}
	case "leases":
		var spec coordinationv1.LeaseSpec
		if record != nil {
			spec = rl.LeaderElectionRecordToLeaseSpec(record)
		}
		obj = &coordinationv1.Lease{ObjectMeta: objectMeta, Spec: spec}
	default:
		t.Fatal("unexpected objType:" + objectType)
	}
	return
}

type Reactor struct {
	verb       string
	objectType string
	reaction   fakeclient.ReactionFunc
}

func testTryAcquireOrRenew(t *testing.T, objectType string) {
	future := time.Now().Add(1000 * time.Hour)
	past := time.Now().Add(-1000 * time.Hour)

	tests := []struct {
		name           string
		observedRecord rl.LeaderElectionRecord
		observedTime   time.Time
		reactors       []Reactor

		expectSuccess    bool
		transitionLeader bool
		outHolder        string
	}{
		{
			name: "acquire from no object",
			reactors: []Reactor{
				{
					verb: "get",
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
					},
				},
				{
					verb: "create",
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, action.(fakeclient.CreateAction).GetObject(), nil
					},
				},
			},
			expectSuccess: true,
			outHolder:     "baz",
		},
		{
			name: "acquire from object without annotations",
			reactors: []Reactor{
				{
					verb: "get",
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), nil), nil
					},
				},
				{
					verb: "update",
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, action.(fakeclient.CreateAction).GetObject(), nil
					},
				},
			},
			expectSuccess:    true,
			transitionLeader: true,
			outHolder:        "baz",
		},
		{
			name: "acquire from unled object",
			reactors: []Reactor{
				{
					verb: "get",
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{}), nil
					},
				},
				{
					verb: "update",
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, action.(fakeclient.CreateAction).GetObject(), nil
					},
				},
			},

			expectSuccess:    true,
			transitionLeader: true,
			outHolder:        "baz",
		},
		{
			name: "acquire from led, unacked object",
			reactors: []Reactor{
				{
					verb: "get",
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
					},
				},
				{
					verb: "update",
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, action.(fakeclient.CreateAction).GetObject(), nil
					},
				},
			},
			observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"},
			observedTime:   past,

			expectSuccess:    true,
			transitionLeader: true,
			outHolder:        "baz",
		},
		{
			name: "acquire from empty led, acked object",
			reactors: []Reactor{
				{
					verb: "get",
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: ""}), nil
					},
				},
				{
					verb: "update",
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, action.(fakeclient.CreateAction).GetObject(), nil
					},
				},
			},
			observedTime: future,

			expectSuccess:    true,
			transitionLeader: true,
			outHolder:        "baz",
		},
		{
			name: "don't acquire from led, acked object",
			reactors: []Reactor{
				{
					verb: "get",
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
					},
				},
			},
			observedTime: future,

			expectSuccess: false,
			outHolder:     "bing",
		},
		{
			name: "renew already acquired object",
			reactors: []Reactor{
				{
					verb: "get",
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
					},
				},
				{
					verb: "update",
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, action.(fakeclient.CreateAction).GetObject(), nil
					},
				},
			},
			observedTime:   future,
			observedRecord: rl.LeaderElectionRecord{HolderIdentity: "baz"},

			expectSuccess: true,
			outHolder:     "baz",
		},
	}

	for i := range tests {
		test := &tests[i]
		t.Run(test.name, func(t *testing.T) {
			// OnNewLeader is called async so we have to wait for it.
			var wg sync.WaitGroup
			wg.Add(1)
			var reportedLeader string
			var lock rl.Interface

			objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"}
			resourceLockConfig := rl.ResourceLockConfig{
				Identity:      "baz",
				EventRecorder: &record.FakeRecorder{},
			}
			c := &fake.Clientset{}
			for _, reactor := range test.reactors {
				c.AddReactor(reactor.verb, objectType, reactor.reaction)
			}
			c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
				t.Errorf("unreachable action. testclient called too many times: %+v", action)
				return true, nil, fmt.Errorf("unreachable action")
			})

			switch objectType {
			case "leases":
				lock = &rl.LeaseLock{
					LeaseMeta:  objectMeta,
					LockConfig: resourceLockConfig,
					Client:     c.CoordinationV1(),
				}
			default:
				t.Fatalf("Unknown objectType: %v", objectType)
			}

			lec := LeaderElectionConfig{
				Lock:          lock,
				LeaseDuration: 10 * time.Second,
				Callbacks: LeaderCallbacks{
					OnNewLeader: func(l string) {
						defer wg.Done()
						reportedLeader = l
					},
				},
			}
			observedRawRecord := GetRawRecordOrDie(t, objectType, test.observedRecord)
			le := &LeaderElector{
				config:            lec,
				observedRecord:    test.observedRecord,
				observedRawRecord: observedRawRecord,
				observedTime:      test.observedTime,
				clock:             clock.RealClock{},
			}
			if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
				t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
			}

			le.observedRecord.AcquireTime = metav1.Time{}
			le.observedRecord.RenewTime = metav1.Time{}
			if le.observedRecord.HolderIdentity != test.outHolder {
				t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity)
			}
			if len(test.reactors) != len(c.Actions()) {
				t.Errorf("wrong number of api interactions")
			}
			if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 {
				t.Errorf("leader should have transitioned but did not")
			}
			if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 {
				t.Errorf("leader should not have transitioned but did")
			}

			le.maybeReportTransition()
			wg.Wait()
			if reportedLeader != test.outHolder {
				t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader)
			}
		})
	}
}

// Will test leader election using lease as the resource
func TestTryAcquireOrRenewLeases(t *testing.T) {
	testTryAcquireOrRenew(t, "leases")
}

func TestLeaseSpecToLeaderElectionRecordRoundTrip(t *testing.T) {
	holderIdentity := "foo"
	leaseDurationSeconds := int32(10)
	leaseTransitions := int32(1)
	oldSpec := coordinationv1.LeaseSpec{
		HolderIdentity:       &holderIdentity,
		LeaseDurationSeconds: &leaseDurationSeconds,
		AcquireTime:          &metav1.MicroTime{time.Now()},
		RenewTime:            &metav1.MicroTime{time.Now()},
		LeaseTransitions:     &leaseTransitions,
	}

	oldRecord := rl.LeaseSpecToLeaderElectionRecord(&oldSpec)
	newSpec := rl.LeaderElectionRecordToLeaseSpec(oldRecord)

	if !equality.Semantic.DeepEqual(oldSpec, newSpec) {
		t.Errorf("diff: %v", diff.ObjectReflectDiff(oldSpec, newSpec))
	}

	newRecord := rl.LeaseSpecToLeaderElectionRecord(&newSpec)

	if !equality.Semantic.DeepEqual(oldRecord, newRecord) {
		t.Errorf("diff: %v", diff.ObjectReflectDiff(oldRecord, newRecord))
	}
}

func multiLockType(t *testing.T, objectType string) (primaryType, secondaryType string) {
	switch objectType {
	case rl.EndpointsLeasesResourceLock:
		return "endpoints", rl.LeasesResourceLock
	case rl.ConfigMapsLeasesResourceLock:
		return "configmaps", rl.LeasesResourceLock
	default:
		t.Fatal("unexpected objType:" + objectType)
	}
	return
}

func GetRawRecordOrDie(t *testing.T, objectType string, ler rl.LeaderElectionRecord) (ret []byte) {
	var err error
	switch objectType {
	case "endpoints", "configmaps", "leases":
		ret, err = json.Marshal(ler)
		if err != nil {
			t.Fatalf("lock %s get raw record %v failed: %v", objectType, ler, err)
		}
	case "endpointsleases", "configmapsleases":
		recordBytes, err := json.Marshal(ler)
		if err != nil {
			t.Fatalf("lock %s get raw record %v failed: %v", objectType, ler, err)
		}
		ret = rl.ConcatRawRecord(recordBytes, recordBytes)
	default:
		t.Fatal("unexpected objType:" + objectType)
	}
	return
}

func testTryAcquireOrRenewMultiLock(t *testing.T, objectType string) {
	future := time.Now().Add(1000 * time.Hour)
	past := time.Now().Add(-1000 * time.Hour)
	primaryType, secondaryType := multiLockType(t, objectType)
	tests := []struct {
		name              string
		observedRecord    rl.LeaderElectionRecord
		observedRawRecord []byte
		observedTime      time.Time
		reactors          []Reactor

		expectSuccess    bool
		transitionLeader bool
		outHolder        string
	}{
		{
			name: "acquire from no object",
			reactors: []Reactor{
				{
					verb:       "get",
					objectType: primaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
					},
				},
				{
					verb:       "create",
					objectType: primaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, action.(fakeclient.CreateAction).GetObject(), nil
					},
				},
				{
					verb:       "create",
					objectType: secondaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, action.(fakeclient.CreateAction).GetObject(), nil
					},
				},
			},
			expectSuccess: true,
			outHolder:     "baz",
		},
		{
			name: "acquire from unled old object",
			reactors: []Reactor{
				{
					verb:       "get",
					objectType: primaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{}), nil
					},
				},
				{
					verb:       "get",
					objectType: secondaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
					},
				},
				{
					verb:       "update",
					objectType: primaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, action.(fakeclient.UpdateAction).GetObject(), nil
					},
				},
				{
					verb:       "get",
					objectType: secondaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
					},
				},
				{
					verb:       "create",
					objectType: secondaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, action.(fakeclient.CreateAction).GetObject(), nil
					},
				},
			},
			expectSuccess:    true,
			transitionLeader: true,
			outHolder:        "baz",
		},
		{
			name: "acquire from unled transition object",
			reactors: []Reactor{
				{
					verb:       "get",
					objectType: primaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{}), nil
					},
				},
				{
					verb:       "get",
					objectType: secondaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{}), nil
					},
				},
				{
					verb:       "update",
					objectType: primaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, action.(fakeclient.UpdateAction).GetObject(), nil
					},
				},
				{
					verb:       "get",
					objectType: secondaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{}), nil
					},
				},
				{
					verb:       "update",
					objectType: secondaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, action.(fakeclient.UpdateAction).GetObject(), nil
					},
				},
			},
			expectSuccess:    true,
			transitionLeader: true,
			outHolder:        "baz",
		},
		{
			name: "acquire from led, unack old object",
			reactors: []Reactor{
				{
					verb:       "get",
					objectType: primaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
					},
				},
				{
					verb:       "get",
					objectType: secondaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
					},
				},
				{
					verb:       "update",
					objectType: primaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, action.(fakeclient.UpdateAction).GetObject(), nil
					},
				},
				{
					verb:       "get",
					objectType: secondaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
					},
				},
				{
					verb:       "create",
					objectType: secondaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, action.(fakeclient.CreateAction).GetObject(), nil
					},
				},
			},
			observedRecord:    rl.LeaderElectionRecord{HolderIdentity: "bing"},
			observedRawRecord: GetRawRecordOrDie(t, primaryType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
			observedTime:      past,

			expectSuccess:    true,
			transitionLeader: true,
			outHolder:        "baz",
		},
		{
			name: "acquire from led, unack transition object",
			reactors: []Reactor{
				{
					verb:       "get",
					objectType: primaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
					},
				},
				{
					verb:       "get",
					objectType: secondaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
					},
				},
				{
					verb:       "update",
					objectType: primaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, action.(fakeclient.UpdateAction).GetObject(), nil
					},
				},
				{
					verb:       "get",
					objectType: secondaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
					},
				},
				{
					verb:       "update",
					objectType: secondaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, action.(fakeclient.UpdateAction).GetObject(), nil
					},
				},
			},
			observedRecord:    rl.LeaderElectionRecord{HolderIdentity: "bing"},
			observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
			observedTime:      past,

			expectSuccess:    true,
			transitionLeader: true,
			outHolder:        "baz",
		},
		{
			name: "acquire from conflict led, ack transition object",
			reactors: []Reactor{
				{
					verb:       "get",
					objectType: primaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
					},
				},
				{
					verb:       "get",
					objectType: secondaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
					},
				},
			},
			observedRecord:    rl.LeaderElectionRecord{HolderIdentity: "bing"},
			observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
			observedTime:      future,

			expectSuccess: false,
			outHolder:     rl.UnknownLeader,
		},
		{
			name: "acquire from led, unack unknown object",
			reactors: []Reactor{
				{
					verb:       "get",
					objectType: primaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader}), nil
					},
				},
				{
					verb:       "get",
					objectType: secondaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader}), nil
					},
				},
				{
					verb:       "update",
					objectType: primaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, action.(fakeclient.UpdateAction).GetObject(), nil
					},
				},
				{
					verb:       "get",
					objectType: secondaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader}), nil
					},
				},
				{
					verb:       "update",
					objectType: secondaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, action.(fakeclient.UpdateAction).GetObject(), nil
					},
				},
			},
			observedRecord:    rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader},
			observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader}),
			observedTime:      past,

			expectSuccess:    true,
			transitionLeader: true,
			outHolder:        "baz",
		},
		{
			name: "don't acquire from led, ack old object",
			reactors: []Reactor{
				{
					verb:       "get",
					objectType: primaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
					},
				},
				{
					verb:       "get",
					objectType: secondaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
					},
				},
			},
			observedRecord:    rl.LeaderElectionRecord{HolderIdentity: "bing"},
			observedRawRecord: GetRawRecordOrDie(t, primaryType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
			observedTime:      future,

			expectSuccess: false,
			outHolder:     "bing",
		},
		{
			name: "don't acquire from led, acked new object, observe new record",
			reactors: []Reactor{
				{
					verb:       "get",
					objectType: primaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
					},
				},
				{
					verb:       "get",
					objectType: secondaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
					},
				},
			},
			observedRecord:    rl.LeaderElectionRecord{HolderIdentity: "bing"},
			observedRawRecord: GetRawRecordOrDie(t, secondaryType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
			observedTime:      future,

			expectSuccess: false,
			outHolder:     rl.UnknownLeader,
		},
		{
			name: "don't acquire from led, acked new object, observe transition record",
			reactors: []Reactor{
				{
					verb:       "get",
					objectType: primaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
					},
				},
				{
					verb:       "get",
					objectType: secondaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
					},
				},
			},
			observedRecord:    rl.LeaderElectionRecord{HolderIdentity: "bing"},
			observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
			observedTime:      future,

			expectSuccess: false,
			outHolder:     "bing",
		},
		{
			name: "renew already required object",
			reactors: []Reactor{
				{
					verb:       "get",
					objectType: primaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
					},
				},
				{
					verb:       "get",
					objectType: secondaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
					},
				},
				{
					verb:       "update",
					objectType: primaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, action.(fakeclient.UpdateAction).GetObject(), nil
					},
				},
				{
					verb:       "get",
					objectType: secondaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
					},
				},
				{
					verb:       "update",
					objectType: secondaryType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, action.(fakeclient.UpdateAction).GetObject(), nil
					},
				},
			},
			observedRecord:    rl.LeaderElectionRecord{HolderIdentity: "baz"},
			observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: "baz"}),
			observedTime:      future,

			expectSuccess: true,
			outHolder:     "baz",
		},
	}

	for i := range tests {
		test := &tests[i]
		t.Run(test.name, func(t *testing.T) {
			// OnNewLeader is called async so we have to wait for it.
			var wg sync.WaitGroup
			wg.Add(1)
			var reportedLeader string

			resourceLockConfig := rl.ResourceLockConfig{
				Identity:      "baz",
				EventRecorder: &record.FakeRecorder{},
			}
			c := &fake.Clientset{}
			for _, reactor := range test.reactors {
				c.AddReactor(reactor.verb, reactor.objectType, reactor.reaction)
			}
			c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
				t.Errorf("unreachable action. testclient called too many times: %+v", action)
				return true, nil, fmt.Errorf("unreachable action")
			})

			lock, err := rl.New(objectType, "foo", "bar", c.CoreV1(), c.CoordinationV1(), resourceLockConfig)
			if err != nil {
				t.Fatalf("Couldn't create lock: %v", err)
			}

			lec := LeaderElectionConfig{
				Lock:          lock,
				LeaseDuration: 10 * time.Second,
				Callbacks: LeaderCallbacks{
					OnNewLeader: func(l string) {
						defer wg.Done()
						reportedLeader = l
					},
				},
			}
			le := &LeaderElector{
				config:            lec,
				observedRecord:    test.observedRecord,
				observedRawRecord: test.observedRawRecord,
				observedTime:      test.observedTime,
				clock:             clock.RealClock{},
			}
			if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
				t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
			}

			le.observedRecord.AcquireTime = metav1.Time{}
			le.observedRecord.RenewTime = metav1.Time{}
			if le.observedRecord.HolderIdentity != test.outHolder {
				t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity)
			}
			if len(test.reactors) != len(c.Actions()) {
				t.Errorf("wrong number of api interactions")
			}
			if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 {
				t.Errorf("leader should have transitioned but did not")
			}
			if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 {
				t.Errorf("leader should not have transitioned but did")
			}

			le.maybeReportTransition()
			wg.Wait()
			if reportedLeader != test.outHolder {
				t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader)
			}
		})
	}
}

// Will test leader election using endpointsleases as the resource
func TestTryAcquireOrRenewEndpointsLeases(t *testing.T) {
	testTryAcquireOrRenewMultiLock(t, "endpointsleases")
}

// Will test leader election using configmapsleases as the resource
func TestTryAcquireOrRenewConfigMapsLeases(t *testing.T) {
	testTryAcquireOrRenewMultiLock(t, "configmapsleases")
}

func testReleaseLease(t *testing.T, objectType string) {
	tests := []struct {
		name           string
		observedRecord rl.LeaderElectionRecord
		observedTime   time.Time
		reactors       []Reactor

		expectSuccess    bool
		transitionLeader bool
		outHolder        string
	}{
		{
			name: "release acquired lock from no object",
			reactors: []Reactor{
				{
					verb:       "get",
					objectType: objectType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
					},
				},
				{
					verb:       "create",
					objectType: objectType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, action.(fakeclient.CreateAction).GetObject(), nil
					},
				},
				{
					verb:       "update",
					objectType: objectType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						return true, action.(fakeclient.UpdateAction).GetObject(), nil
					},
				},
			},
			expectSuccess: true,
			outHolder:     "",
		},
	}

	for i := range tests {
		test := &tests[i]
		t.Run(test.name, func(t *testing.T) {
			// OnNewLeader is called async so we have to wait for it.
			var wg sync.WaitGroup
			wg.Add(1)
			var reportedLeader string
			var lock rl.Interface

			objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"}
			resourceLockConfig := rl.ResourceLockConfig{
				Identity:      "baz",
				EventRecorder: &record.FakeRecorder{},
			}
			c := &fake.Clientset{}
			for _, reactor := range test.reactors {
				c.AddReactor(reactor.verb, objectType, reactor.reaction)
			}
			c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
				t.Errorf("unreachable action. testclient called too many times: %+v", action)
				return true, nil, fmt.Errorf("unreachable action")
			})

			switch objectType {
			case "leases":
				lock = &rl.LeaseLock{
					LeaseMeta:  objectMeta,
					LockConfig: resourceLockConfig,
					Client:     c.CoordinationV1(),
				}
			default:
				t.Fatalf("Unknown objectType: %v", objectType)
			}

			lec := LeaderElectionConfig{
				Lock:          lock,
				LeaseDuration: 10 * time.Second,
				Callbacks: LeaderCallbacks{
					OnNewLeader: func(l string) {
						defer wg.Done()
						reportedLeader = l
					},
				},
			}
			observedRawRecord := GetRawRecordOrDie(t, objectType, test.observedRecord)
			le := &LeaderElector{
				config:            lec,
				observedRecord:    test.observedRecord,
				observedRawRecord: observedRawRecord,
				observedTime:      test.observedTime,
				clock:             clock.RealClock{},
			}
			if !le.tryAcquireOrRenew(context.Background()) {
				t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", true)
			}

			le.maybeReportTransition()

			// Wait for a response to the leader transition, and add 1 so that we can track the final transition.
			wg.Wait()
			wg.Add(1)

			if test.expectSuccess != le.release() {
				t.Errorf("unexpected result of release: [succeeded=%v]", !test.expectSuccess)
			}

			le.observedRecord.AcquireTime = metav1.Time{}
			le.observedRecord.RenewTime = metav1.Time{}
			if le.observedRecord.HolderIdentity != test.outHolder {
				t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity)
			}
			if len(test.reactors) != len(c.Actions()) {
				t.Errorf("wrong number of api interactions")
			}
			if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 {
				t.Errorf("leader should have transitioned but did not")
			}
			if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 {
				t.Errorf("leader should not have transitioned but did")
			}
			le.maybeReportTransition()
			wg.Wait()
			if reportedLeader != test.outHolder {
				t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader)
			}
		})
	}
}

// Will test leader election using endpoints as the resource
func TestReleaseLeaseLeases(t *testing.T) {
	testReleaseLease(t, "leases")
}

func TestReleaseOnCancellation_Leases(t *testing.T) {
	testReleaseOnCancellation(t, "leases")
}

func testReleaseOnCancellation(t *testing.T, objectType string) {
	var (
		onNewLeader   = make(chan struct{})
		onRenewCalled = make(chan struct{})
		onRenewResume = make(chan struct{})
		onRelease     = make(chan struct{})

		lockObj runtime.Object
		gets    int
		updates int
		wg      sync.WaitGroup
	)
	resetVars := func() {
		onNewLeader = make(chan struct{})
		onRenewCalled = make(chan struct{})
		onRenewResume = make(chan struct{})
		onRelease = make(chan struct{})

		lockObj = nil
		gets = 0
		updates = 0
	}
	lec := LeaderElectionConfig{
		LeaseDuration: 15 * time.Second,
		RenewDeadline: 2 * time.Second,
		RetryPeriod:   1 * time.Second,

		// This is what we're testing
		ReleaseOnCancel: true,

		Callbacks: LeaderCallbacks{
			OnNewLeader:      func(identity string) {},
			OnStoppedLeading: func() {},
			OnStartedLeading: func(context.Context) {
				close(onNewLeader)
			},
		},
	}

	tests := []struct {
		name     string
		reactors []Reactor
	}{
		{
			name: "release acquired lock on cancellation of update",
			reactors: []Reactor{
				{
					verb:       "get",
					objectType: objectType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						gets++
						if lockObj != nil {
							return true, lockObj, nil
						}
						return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
					},
				},
				{
					verb:       "create",
					objectType: objectType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						lockObj = action.(fakeclient.CreateAction).GetObject()
						return true, lockObj, nil
					},
				},
				{
					verb:       "update",
					objectType: objectType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						updates++

						// Second update (first renew) should return our canceled error
						// FakeClient doesn't do anything with the context so we're doing this ourselves
						if updates == 2 {
							close(onRenewCalled)
							<-onRenewResume
							return true, nil, context.Canceled
						} else if updates == 3 {
							// We update the lock after the cancellation to release it
							// This wg is to avoid the data race on lockObj
							defer wg.Done()
							close(onRelease)
						}

						lockObj = action.(fakeclient.UpdateAction).GetObject()
						return true, lockObj, nil
					},
				},
			},
		},
		{
			name: "release acquired lock on cancellation of get",
			reactors: []Reactor{
				{
					verb:       "get",
					objectType: objectType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						gets++
						if lockObj != nil {
							// Third and more get (first create, second renew) should return our canceled error
							// FakeClient doesn't do anything with the context so we're doing this ourselves
							if gets >= 3 {
								close(onRenewCalled)
								<-onRenewResume
								return true, nil, context.Canceled
							}
							return true, lockObj, nil
						}
						return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
					},
				},
				{
					verb:       "create",
					objectType: objectType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						lockObj = action.(fakeclient.CreateAction).GetObject()
						return true, lockObj, nil
					},
				},
				{
					verb:       "update",
					objectType: objectType,
					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
						updates++
						// Second update (first renew) should release the lock
						if updates == 2 {
							// We update the lock after the cancellation to release it
							// This wg is to avoid the data race on lockObj
							defer wg.Done()
							close(onRelease)
						}

						lockObj = action.(fakeclient.UpdateAction).GetObject()
						return true, lockObj, nil
					},
				},
			},
		},
	}

	for i := range tests {
		test := &tests[i]
		t.Run(test.name, func(t *testing.T) {
			wg.Add(1)
			resetVars()

			resourceLockConfig := rl.ResourceLockConfig{
				Identity:      "baz",
				EventRecorder: &record.FakeRecorder{},
			}
			c := &fake.Clientset{}
			for _, reactor := range test.reactors {
				c.AddReactor(reactor.verb, objectType, reactor.reaction)
			}
			c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
				t.Errorf("unreachable action. testclient called too many times: %+v", action)
				return true, nil, fmt.Errorf("unreachable action")
			})
			lock, err := rl.New(objectType, "foo", "bar", c.CoreV1(), c.CoordinationV1(), resourceLockConfig)
			if err != nil {
				t.Fatal("resourcelock.New() = ", err)
			}

			lec.Lock = lock
			elector, err := NewLeaderElector(lec)
			if err != nil {
				t.Fatal("Failed to create leader elector: ", err)
			}

			ctx, cancel := context.WithCancel(context.Background())

			go elector.Run(ctx)

			// Wait for us to become the leader
			select {
			case <-onNewLeader:
			case <-time.After(10 * time.Second):
				t.Fatal("failed to become the leader")
			}

			// Wait for renew (update) to be invoked
			select {
			case <-onRenewCalled:
			case <-time.After(10 * time.Second):
				t.Fatal("the elector failed to renew the lock")
			}

			// Cancel the context - stopping the elector while
			// it's running
			cancel()

			// Resume the tryAcquireOrRenew call to return the cancellation
			// which should trigger the release flow
			close(onRenewResume)

			select {
			case <-onRelease:
			case <-time.After(10 * time.Second):
				t.Fatal("the lock was not released")
			}
			wg.Wait()
		})
	}
}

相关信息

kubernetes 源码目录

相关文章

kubernetes healthzadaptor 源码

kubernetes healthzadaptor_test 源码

kubernetes leaderelection 源码

kubernetes metrics 源码

0  赞