kubernetes eventseries_test 源码

  • 2022-09-18
  • 浏览 (181)

kubernetes eventseries_test 代码

文件路径:/staging/src/k8s.io/client-go/tools/events/eventseries_test.go

/*
Copyright 2019 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package events

import (
	"strconv"
	"testing"
	"time"

	"os"
	"strings"

	v1 "k8s.io/api/core/v1"
	eventsv1 "k8s.io/api/events/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	k8sruntime "k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/kubernetes/scheme"
	restclient "k8s.io/client-go/rest"
	ref "k8s.io/client-go/tools/reference"
)

type testEventSeriesSink struct {
	OnCreate func(e *eventsv1.Event) (*eventsv1.Event, error)
	OnUpdate func(e *eventsv1.Event) (*eventsv1.Event, error)
	OnPatch  func(e *eventsv1.Event, p []byte) (*eventsv1.Event, error)
}

// Create records the event for testing.
func (t *testEventSeriesSink) Create(e *eventsv1.Event) (*eventsv1.Event, error) {
	if t.OnCreate != nil {
		return t.OnCreate(e)
	}
	return e, nil
}

// Update records the event for testing.
func (t *testEventSeriesSink) Update(e *eventsv1.Event) (*eventsv1.Event, error) {
	if t.OnUpdate != nil {
		return t.OnUpdate(e)
	}
	return e, nil
}

// Patch records the event for testing.
func (t *testEventSeriesSink) Patch(e *eventsv1.Event, p []byte) (*eventsv1.Event, error) {
	if t.OnPatch != nil {
		return t.OnPatch(e, p)
	}
	return e, nil
}

func TestEventSeriesf(t *testing.T) {
	hostname, _ := os.Hostname()

	testPod := &v1.Pod{
		ObjectMeta: metav1.ObjectMeta{
			Name:      "foo",
			Namespace: "baz",
			UID:       "bar",
		},
	}

	regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]")
	if err != nil {
		t.Fatal(err)
	}

	related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]")
	if err != nil {
		t.Fatal(err)
	}

	expectedEvent := &eventsv1.Event{
		ObjectMeta: metav1.ObjectMeta{
			Name:      "foo",
			Namespace: "baz",
		},
		EventTime:           metav1.MicroTime{time.Now()},
		ReportingController: "eventTest",
		ReportingInstance:   "eventTest-" + hostname,
		Action:              "started",
		Reason:              "test",
		Regarding:           *regarding,
		Related:             related,
		Note:                "some verbose message: 1",
		Type:                v1.EventTypeNormal,
	}

	isomorphicEvent := expectedEvent.DeepCopy()

	nonIsomorphicEvent := expectedEvent.DeepCopy()
	nonIsomorphicEvent.Action = "stopped"

	expectedEvent.Series = &eventsv1.EventSeries{Count: 1}
	table := []struct {
		regarding    k8sruntime.Object
		related      k8sruntime.Object
		actual       *eventsv1.Event
		elements     []interface{}
		expect       *eventsv1.Event
		expectUpdate bool
	}{
		{
			regarding:    regarding,
			related:      related,
			actual:       isomorphicEvent,
			elements:     []interface{}{1},
			expect:       expectedEvent,
			expectUpdate: true,
		},
		{
			regarding:    regarding,
			related:      related,
			actual:       nonIsomorphicEvent,
			elements:     []interface{}{1},
			expect:       nonIsomorphicEvent,
			expectUpdate: false,
		},
	}

	stopCh := make(chan struct{})

	createEvent := make(chan *eventsv1.Event)
	updateEvent := make(chan *eventsv1.Event)
	patchEvent := make(chan *eventsv1.Event)

	testEvents := testEventSeriesSink{
		OnCreate: func(event *eventsv1.Event) (*eventsv1.Event, error) {
			createEvent <- event
			return event, nil
		},
		OnUpdate: func(event *eventsv1.Event) (*eventsv1.Event, error) {
			updateEvent <- event
			return event, nil
		},
		OnPatch: func(event *eventsv1.Event, patch []byte) (*eventsv1.Event, error) {
			// event we receive is already patched, usually the sink uses it only to retrieve the name and namespace, here
			// we'll use it directly
			patchEvent <- event
			return event, nil
		},
	}
	eventBroadcaster := newBroadcaster(&testEvents, 0, map[eventKey]*eventsv1.Event{})
	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "eventTest")
	broadcaster := eventBroadcaster.(*eventBroadcasterImpl)
	// Don't call StartRecordingToSink, as we don't need neither refreshing event
	// series nor finishing them in this tests and additional events updated would
	// race with our expected ones.
	broadcaster.startRecordingEvents(stopCh)
	recorder.Eventf(regarding, related, isomorphicEvent.Type, isomorphicEvent.Reason, isomorphicEvent.Action, isomorphicEvent.Note, []interface{}{1})
	// read from the chan as this was needed only to populate the cache
	<-createEvent
	for index, item := range table {
		actual := item.actual
		recorder.Eventf(item.regarding, item.related, actual.Type, actual.Reason, actual.Action, actual.Note, item.elements)
		// validate event
		if item.expectUpdate {
			actualEvent := <-patchEvent
			t.Logf("%v - validating event affected by patch request", index)
			validateEvent(strconv.Itoa(index), true, actualEvent, item.expect, t)
		} else {
			actualEvent := <-createEvent
			t.Logf("%v - validating event affected by a create request", index)
			validateEvent(strconv.Itoa(index), false, actualEvent, item.expect, t)
		}
	}
	close(stopCh)
}

func validateEvent(messagePrefix string, expectedUpdate bool, actualEvent *eventsv1.Event, expectedEvent *eventsv1.Event, t *testing.T) {
	recvEvent := *actualEvent

	// Just check that the timestamp was set.
	if recvEvent.EventTime.IsZero() {
		t.Errorf("%v - timestamp wasn't set: %#v", messagePrefix, recvEvent)
	}

	if expectedUpdate {
		if recvEvent.Series == nil {
			t.Errorf("%v - Series was nil but expected: %#v", messagePrefix, recvEvent.Series)

		} else {
			if recvEvent.Series.Count != expectedEvent.Series.Count {
				t.Errorf("%v - Series mismatch actual was: %#v but expected: %#v", messagePrefix, recvEvent.Series, expectedEvent.Series)
			}
		}

		// Check that name has the right prefix.
		if n, en := recvEvent.Name, expectedEvent.Name; !strings.HasPrefix(n, en) {
			t.Errorf("%v - Name '%v' does not contain prefix '%v'", messagePrefix, n, en)
		}
	} else {
		if recvEvent.Series != nil {
			t.Errorf("%v - series was expected to be nil but was: %#v", messagePrefix, recvEvent.Series)
		}
	}

}

func TestFinishSeries(t *testing.T) {
	hostname, _ := os.Hostname()
	testPod := &v1.Pod{
		ObjectMeta: metav1.ObjectMeta{
			Name:      "foo",
			Namespace: "baz",
			UID:       "bar",
		},
	}
	regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]")
	if err != nil {
		t.Fatal(err)
	}
	related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]")
	if err != nil {
		t.Fatal(err)
	}
	LastObservedTime := metav1.MicroTime{Time: time.Now().Add(-9 * time.Minute)}

	createEvent := make(chan *eventsv1.Event, 10)
	updateEvent := make(chan *eventsv1.Event, 10)
	patchEvent := make(chan *eventsv1.Event, 10)
	testEvents := testEventSeriesSink{
		OnCreate: func(event *eventsv1.Event) (*eventsv1.Event, error) {
			createEvent <- event
			return event, nil
		},
		OnUpdate: func(event *eventsv1.Event) (*eventsv1.Event, error) {
			updateEvent <- event
			return event, nil
		},
		OnPatch: func(event *eventsv1.Event, patch []byte) (*eventsv1.Event, error) {
			// event we receive is already patched, usually the sink uses it
			// only to retrieve the name and namespace, here we'll use it directly
			patchEvent <- event
			return event, nil
		},
	}
	cache := map[eventKey]*eventsv1.Event{}
	eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl)
	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl)
	cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started")
	nonFinishedEvent := cachedEvent.DeepCopy()
	nonFinishedEvent.ReportingController = "nonFinished-controller"
	cachedEvent.Series = &eventsv1.EventSeries{
		Count:            10,
		LastObservedTime: LastObservedTime,
	}
	cache[getKey(cachedEvent)] = cachedEvent
	cache[getKey(nonFinishedEvent)] = nonFinishedEvent
	eventBroadcaster.finishSeries()
	select {
	case actualEvent := <-patchEvent:
		t.Logf("validating event affected by patch request")
		eventBroadcaster.mu.Lock()
		defer eventBroadcaster.mu.Unlock()
		if len(cache) != 1 {
			t.Errorf("cache should be empty, but instead got a size of %v", len(cache))
		}
		if !actualEvent.Series.LastObservedTime.Equal(&cachedEvent.Series.LastObservedTime) {
			t.Errorf("series was expected be seen with LastObservedTime %v, but instead got %v ", cachedEvent.Series.LastObservedTime, actualEvent.Series.LastObservedTime)
		}
		// check that we emitted only one event
		if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 {
			t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent))
		}
	case <-time.After(wait.ForeverTestTimeout):
		t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
	}
}

func TestRefreshExistingEventSeries(t *testing.T) {
	hostname, _ := os.Hostname()
	testPod := &v1.Pod{
		ObjectMeta: metav1.ObjectMeta{
			Name:      "foo",
			Namespace: "baz",
			UID:       "bar",
		},
	}
	regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]")
	if err != nil {
		t.Fatal(err)
	}
	related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]")
	if err != nil {
		t.Fatal(err)
	}
	LastObservedTime := metav1.MicroTime{Time: time.Now().Add(-9 * time.Minute)}
	createEvent := make(chan *eventsv1.Event, 10)
	updateEvent := make(chan *eventsv1.Event, 10)
	patchEvent := make(chan *eventsv1.Event, 10)

	table := []struct {
		patchFunc func(event *eventsv1.Event, patch []byte) (*eventsv1.Event, error)
	}{
		{
			patchFunc: func(event *eventsv1.Event, patch []byte) (*eventsv1.Event, error) {
				// event we receive is already patched, usually the sink uses it
				//only to retrieve the name and namespace, here we'll use it directly.
				patchEvent <- event
				return event, nil
			},
		},
		{
			patchFunc: func(event *eventsv1.Event, patch []byte) (*eventsv1.Event, error) {
				// we simulate an apiserver error here
				patchEvent <- nil
				return nil, &restclient.RequestConstructionError{}
			},
		},
	}
	for _, item := range table {
		testEvents := testEventSeriesSink{
			OnCreate: func(event *eventsv1.Event) (*eventsv1.Event, error) {
				createEvent <- event
				return event, nil
			},
			OnUpdate: func(event *eventsv1.Event) (*eventsv1.Event, error) {
				updateEvent <- event
				return event, nil
			},
			OnPatch: item.patchFunc,
		}
		cache := map[eventKey]*eventsv1.Event{}
		eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl)
		recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl)
		cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started")
		cachedEvent.Series = &eventsv1.EventSeries{
			Count:            10,
			LastObservedTime: LastObservedTime,
		}
		cacheKey := getKey(cachedEvent)
		cache[cacheKey] = cachedEvent

		eventBroadcaster.refreshExistingEventSeries()
		select {
		case <-patchEvent:
			t.Logf("validating event affected by patch request")
			eventBroadcaster.mu.Lock()
			defer eventBroadcaster.mu.Unlock()
			if len(cache) != 1 {
				t.Errorf("cache should be with same size, but instead got a size of %v", len(cache))
			}
			// check that we emitted only one event
			if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 {
				t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent))
			}
			cacheEvent, exists := cache[cacheKey]

			if cacheEvent == nil || !exists {
				t.Errorf("expected event to exist and not being nil, but instead event: %v and exists: %v", cacheEvent, exists)
			}
		case <-time.After(wait.ForeverTestTimeout):
			t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
		}
	}
}

相关信息

kubernetes 源码目录

相关文章

kubernetes doc 源码

kubernetes event_broadcaster 源码

kubernetes event_recorder 源码

kubernetes fake 源码

kubernetes helper 源码

kubernetes helper_test 源码

kubernetes interfaces 源码

0  赞