kubernetes pager_test 源码

  • 2022-09-18
  • 浏览 (203)

kubernetes pager_test 代码

文件路径:/staging/src/k8s.io/client-go/tools/pager/pager_test.go

/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pager

import (
	"context"
	"fmt"
	"reflect"
	"testing"
	"time"

	"k8s.io/apimachinery/pkg/api/errors"
	metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
	"k8s.io/apimachinery/pkg/runtime"
)

func list(count int, rv string) *metainternalversion.List {
	var list metainternalversion.List
	for i := 0; i < count; i++ {
		list.Items = append(list.Items, &metav1beta1.PartialObjectMetadata{
			ObjectMeta: metav1.ObjectMeta{
				Name: fmt.Sprintf("%d", i),
			},
		})
	}
	list.ResourceVersion = rv
	return &list
}

type testPager struct {
	t          *testing.T
	rv         string
	index      int
	remaining  int
	last       int
	continuing bool
	done       bool
	expectPage int64
}

func (p *testPager) reset() {
	p.continuing = false
	p.remaining += p.index
	p.index = 0
	p.last = 0
	p.done = false
}

func (p *testPager) PagedList(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
	if p.done {
		p.t.Errorf("did not expect additional call to paged list")
		return nil, fmt.Errorf("unexpected list call")
	}
	expectedContinue := fmt.Sprintf("%s:%d", p.rv, p.last)
	if options.Limit != p.expectPage || (p.continuing && options.Continue != expectedContinue) {
		p.t.Errorf("invariant violated, expected limit %d and continue %s, got %#v", p.expectPage, expectedContinue, options)
		return nil, fmt.Errorf("invariant violated")
	}
	if options.Continue != "" && options.ResourceVersion != "" {
		p.t.Errorf("invariant violated, specifying resource version (%s) is not allowed when using continue (%s).", options.ResourceVersion, options.Continue)
		return nil, fmt.Errorf("invariant violated")
	}
	if options.Continue != "" && options.ResourceVersionMatch != "" {
		p.t.Errorf("invariant violated, specifying resource version match type (%s) is not allowed when using continue (%s).", options.ResourceVersionMatch, options.Continue)
		return nil, fmt.Errorf("invariant violated")
	}
	var list metainternalversion.List
	total := options.Limit
	if total == 0 {
		total = int64(p.remaining)
	}
	for i := int64(0); i < total; i++ {
		if p.remaining <= 0 {
			break
		}
		list.Items = append(list.Items, &metav1beta1.PartialObjectMetadata{
			ObjectMeta: metav1.ObjectMeta{
				Name: fmt.Sprintf("%d", p.index),
			},
		})
		p.remaining--
		p.index++
	}
	p.last = p.index
	if p.remaining > 0 {
		list.Continue = fmt.Sprintf("%s:%d", p.rv, p.last)
		p.continuing = true
	} else {
		p.done = true
	}
	list.ResourceVersion = p.rv
	return &list, nil
}

func (p *testPager) ExpiresOnSecondPage(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
	if p.continuing {
		p.done = true
		return nil, errors.NewResourceExpired("this list has expired")
	}
	return p.PagedList(ctx, options)
}

func (p *testPager) ExpiresOnSecondPageThenFullList(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
	if p.continuing {
		p.reset()
		p.expectPage = 0
		return nil, errors.NewResourceExpired("this list has expired")
	}
	return p.PagedList(ctx, options)
}

func TestListPager_List(t *testing.T) {
	type fields struct {
		PageSize          int64
		PageFn            ListPageFunc
		FullListIfExpired bool
	}
	type args struct {
		ctx     context.Context
		options metav1.ListOptions
	}
	tests := []struct {
		name      string
		fields    fields
		args      args
		want      runtime.Object
		wantPaged bool
		wantErr   bool
		isExpired bool
	}{
		{
			name:      "empty page",
			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList},
			args:      args{},
			want:      list(0, "rv:20"),
			wantPaged: false,
		},
		{
			name:      "one page",
			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList},
			args:      args{},
			want:      list(9, "rv:20"),
			wantPaged: false,
		},
		{
			name:      "one full page",
			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList},
			args:      args{},
			want:      list(10, "rv:20"),
			wantPaged: false,
		},
		{
			name:      "two pages",
			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
			args:      args{},
			want:      list(11, "rv:20"),
			wantPaged: true,
		},
		{
			name:      "three pages",
			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList},
			args:      args{},
			want:      list(21, "rv:20"),
			wantPaged: true,
		},
		{
			name:      "expires on second page",
			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPage},
			args:      args{},
			wantPaged: true,
			wantErr:   true,
			isExpired: true,
		},
		{
			name: "expires on second page and then lists",
			fields: fields{
				FullListIfExpired: true,
				PageSize:          10,
				PageFn:            (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPageThenFullList,
			},
			args:      args{},
			want:      list(21, "rv:20"),
			wantPaged: true,
		},
		{
			name:      "two pages with resourceVersion",
			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
			args:      args{options: metav1.ListOptions{ResourceVersion: "rv:10"}},
			want:      list(11, "rv:20"),
			wantPaged: true,
		},
		{
			name:      "two pages with resourceVersion and resourceVersionMatch",
			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
			args:      args{options: metav1.ListOptions{ResourceVersion: "rv:10", ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan}},
			want:      list(11, "rv:20"),
			wantPaged: true,
		},
	}
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			p := &ListPager{
				PageSize:          tt.fields.PageSize,
				PageFn:            tt.fields.PageFn,
				FullListIfExpired: tt.fields.FullListIfExpired,
			}
			ctx := tt.args.ctx
			if ctx == nil {
				ctx = context.Background()
			}
			got, paginatedResult, err := p.List(ctx, tt.args.options)
			if (err != nil) != tt.wantErr {
				t.Errorf("ListPager.List() error = %v, wantErr %v", err, tt.wantErr)
				return
			}
			if tt.isExpired != errors.IsResourceExpired(err) {
				t.Errorf("ListPager.List() error = %v, isExpired %v", err, tt.isExpired)
				return
			}
			if tt.wantPaged != paginatedResult {
				t.Errorf("paginatedResult = %t, want %t", paginatedResult, tt.wantPaged)
			}
			if !reflect.DeepEqual(got, tt.want) {
				t.Errorf("ListPager.List() = %v, want %v", got, tt.want)
			}
		})
	}
}

func TestListPager_EachListItem(t *testing.T) {
	type fields struct {
		PageSize int64
		PageFn   ListPageFunc
	}
	tests := []struct {
		name                 string
		fields               fields
		want                 runtime.Object
		wantErr              bool
		wantPanic            bool
		isExpired            bool
		processorErrorOnItem int
		processorPanicOnItem int
		cancelContextOnItem  int
	}{
		{
			name:   "empty page",
			fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList},
			want:   list(0, "rv:20"),
		},
		{
			name:   "one page",
			fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList},
			want:   list(9, "rv:20"),
		},
		{
			name:   "one full page",
			fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList},
			want:   list(10, "rv:20"),
		},
		{
			name:   "two pages",
			fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
			want:   list(11, "rv:20"),
		},
		{
			name:   "three pages",
			fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList},
			want:   list(21, "rv:20"),
		},
		{
			name:      "expires on second page",
			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPage},
			want:      list(10, "rv:20"), // all items on the first page should have been visited
			wantErr:   true,
			isExpired: true,
		},
		{
			name:                 "error processing item",
			fields:               fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 51, rv: "rv:20"}).PagedList},
			want:                 list(3, "rv:20"), // all the items <= the one the processor returned an error on should have been visited
			wantPanic:            true,
			processorPanicOnItem: 3,
		},
		{
			name:                "cancel context while processing",
			fields:              fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 51, rv: "rv:20"}).PagedList},
			want:                list(10, "rv:20"), // The whole PageSize worth of items got returned.
			wantErr:             true,
			cancelContextOnItem: 3,
		},
	}

	processorErr := fmt.Errorf("processor error")
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			ctx, cancel := context.WithCancel(context.Background())
			p := &ListPager{
				PageSize: tt.fields.PageSize,
				PageFn:   tt.fields.PageFn,
			}
			var items []runtime.Object

			fn := func(obj runtime.Object) error {
				items = append(items, obj)
				if tt.processorErrorOnItem > 0 && len(items) == tt.processorErrorOnItem {
					return processorErr
				}
				if tt.processorPanicOnItem > 0 && len(items) == tt.processorPanicOnItem {
					panic(processorErr)
				}
				if tt.cancelContextOnItem > 0 && len(items) == tt.cancelContextOnItem {
					cancel()
				}
				return nil
			}
			var err error
			var panic interface{}
			func() {
				defer func() {
					panic = recover()
				}()
				err = p.EachListItem(ctx, metav1.ListOptions{}, fn)
			}()
			if (panic != nil) && !tt.wantPanic {
				t.Errorf(".EachListItem() panic = %v, wantPanic %v", panic, tt.wantPanic)
				return
			}
			if (err != nil) != tt.wantErr {
				t.Errorf("ListPager.EachListItem() error = %v, wantErr %v", err, tt.wantErr)
				return
			}
			if tt.isExpired != errors.IsResourceExpired(err) {
				t.Errorf("ListPager.EachListItem() error = %v, isExpired %v", err, tt.isExpired)
				return
			}
			if tt.processorErrorOnItem > 0 && err != processorErr {
				t.Errorf("ListPager.EachListItem() error = %v, processorErrorOnItem %d", err, tt.processorErrorOnItem)
				return
			}
			l := tt.want.(*metainternalversion.List)
			if !reflect.DeepEqual(items, l.Items) {
				t.Errorf("ListPager.EachListItem() = %v, want %v", items, l.Items)
			}
		})
	}
}

func TestListPager_eachListPageBuffered(t *testing.T) {
	tests := []struct {
		name           string
		totalPages     int
		pagesProcessed int
		wantPageLists  int
		pageBufferSize int32
		pageSize       int
	}{
		{
			name:           "no buffer, one total page",
			totalPages:     1,
			pagesProcessed: 1,
			wantPageLists:  1,
			pageBufferSize: 0,
		}, {
			name:           "no buffer, 1/5 pages processed",
			totalPages:     5,
			pagesProcessed: 1,
			wantPageLists:  2, // 1 received for processing, 1 listed
			pageBufferSize: 0,
		},
		{
			name:           "no buffer, 2/5 pages processed",
			totalPages:     5,
			pagesProcessed: 2,
			wantPageLists:  3,
			pageBufferSize: 0,
		},
		{
			name:           "no buffer, 5/5 pages processed",
			totalPages:     5,
			pagesProcessed: 5,
			wantPageLists:  5,
			pageBufferSize: 0,
		},
		{
			name:           "size 1 buffer, 1/5 pages processed",
			totalPages:     5,
			pagesProcessed: 1,
			wantPageLists:  3,
			pageBufferSize: 1,
		},
		{
			name:           "size 1 buffer, 5/5 pages processed",
			totalPages:     5,
			pagesProcessed: 5,
			wantPageLists:  5,
			pageBufferSize: 1,
		},
		{
			name:           "size 10 buffer, 1/5 page processed",
			totalPages:     5,
			pagesProcessed: 1,
			wantPageLists:  5,
			pageBufferSize: 10, // buffer is larger than list
		},
	}
	processorErr := fmt.Errorf("processor error")
	pageSize := 10
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			pgr := &testPager{t: t, expectPage: int64(pageSize), remaining: tt.totalPages * pageSize, rv: "rv:20"}
			pageLists := 0
			wantedPageListsDone := make(chan struct{})
			listFn := func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
				pageLists++
				if pageLists == tt.wantPageLists {
					close(wantedPageListsDone)
				}
				return pgr.PagedList(ctx, options)
			}
			p := &ListPager{
				PageSize:       int64(pageSize),
				PageBufferSize: tt.pageBufferSize,
				PageFn:         listFn,
			}

			pagesProcessed := 0
			fn := func(obj runtime.Object) error {
				pagesProcessed++
				if tt.pagesProcessed == pagesProcessed && tt.wantPageLists > 0 {
					// wait for buffering to catch up
					select {
					case <-time.After(time.Second):
						return fmt.Errorf("Timed out waiting for %d page lists", tt.wantPageLists)
					case <-wantedPageListsDone:
					}
					return processorErr
				}
				return nil
			}
			err := p.eachListChunkBuffered(context.Background(), metav1.ListOptions{}, fn)
			if tt.pagesProcessed > 0 && err == processorErr {
				// expected
			} else if err != nil {
				t.Fatal(err)
			}
			if tt.wantPageLists > 0 && pageLists != tt.wantPageLists {
				t.Errorf("expected %d page lists, got %d", tt.wantPageLists, pageLists)
			}
			if pagesProcessed != tt.pagesProcessed {
				t.Errorf("expected %d pages processed, got %d", tt.pagesProcessed, pagesProcessed)
			}
		})
	}
}

相关信息

kubernetes 源码目录

相关文章

kubernetes pager 源码

0  赞