kubernetes pager_test 源码
kubernetes pager_test 代码
文件路径:/staging/src/k8s.io/client-go/tools/pager/pager_test.go
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pager
import (
"context"
"fmt"
"reflect"
"testing"
"time"
"k8s.io/apimachinery/pkg/api/errors"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
)
func list(count int, rv string) *metainternalversion.List {
var list metainternalversion.List
for i := 0; i < count; i++ {
list.Items = append(list.Items, &metav1beta1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%d", i),
},
})
}
list.ResourceVersion = rv
return &list
}
type testPager struct {
t *testing.T
rv string
index int
remaining int
last int
continuing bool
done bool
expectPage int64
}
func (p *testPager) reset() {
p.continuing = false
p.remaining += p.index
p.index = 0
p.last = 0
p.done = false
}
func (p *testPager) PagedList(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
if p.done {
p.t.Errorf("did not expect additional call to paged list")
return nil, fmt.Errorf("unexpected list call")
}
expectedContinue := fmt.Sprintf("%s:%d", p.rv, p.last)
if options.Limit != p.expectPage || (p.continuing && options.Continue != expectedContinue) {
p.t.Errorf("invariant violated, expected limit %d and continue %s, got %#v", p.expectPage, expectedContinue, options)
return nil, fmt.Errorf("invariant violated")
}
if options.Continue != "" && options.ResourceVersion != "" {
p.t.Errorf("invariant violated, specifying resource version (%s) is not allowed when using continue (%s).", options.ResourceVersion, options.Continue)
return nil, fmt.Errorf("invariant violated")
}
if options.Continue != "" && options.ResourceVersionMatch != "" {
p.t.Errorf("invariant violated, specifying resource version match type (%s) is not allowed when using continue (%s).", options.ResourceVersionMatch, options.Continue)
return nil, fmt.Errorf("invariant violated")
}
var list metainternalversion.List
total := options.Limit
if total == 0 {
total = int64(p.remaining)
}
for i := int64(0); i < total; i++ {
if p.remaining <= 0 {
break
}
list.Items = append(list.Items, &metav1beta1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%d", p.index),
},
})
p.remaining--
p.index++
}
p.last = p.index
if p.remaining > 0 {
list.Continue = fmt.Sprintf("%s:%d", p.rv, p.last)
p.continuing = true
} else {
p.done = true
}
list.ResourceVersion = p.rv
return &list, nil
}
func (p *testPager) ExpiresOnSecondPage(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
if p.continuing {
p.done = true
return nil, errors.NewResourceExpired("this list has expired")
}
return p.PagedList(ctx, options)
}
func (p *testPager) ExpiresOnSecondPageThenFullList(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
if p.continuing {
p.reset()
p.expectPage = 0
return nil, errors.NewResourceExpired("this list has expired")
}
return p.PagedList(ctx, options)
}
func TestListPager_List(t *testing.T) {
type fields struct {
PageSize int64
PageFn ListPageFunc
FullListIfExpired bool
}
type args struct {
ctx context.Context
options metav1.ListOptions
}
tests := []struct {
name string
fields fields
args args
want runtime.Object
wantPaged bool
wantErr bool
isExpired bool
}{
{
name: "empty page",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList},
args: args{},
want: list(0, "rv:20"),
wantPaged: false,
},
{
name: "one page",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList},
args: args{},
want: list(9, "rv:20"),
wantPaged: false,
},
{
name: "one full page",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList},
args: args{},
want: list(10, "rv:20"),
wantPaged: false,
},
{
name: "two pages",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
args: args{},
want: list(11, "rv:20"),
wantPaged: true,
},
{
name: "three pages",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList},
args: args{},
want: list(21, "rv:20"),
wantPaged: true,
},
{
name: "expires on second page",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPage},
args: args{},
wantPaged: true,
wantErr: true,
isExpired: true,
},
{
name: "expires on second page and then lists",
fields: fields{
FullListIfExpired: true,
PageSize: 10,
PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPageThenFullList,
},
args: args{},
want: list(21, "rv:20"),
wantPaged: true,
},
{
name: "two pages with resourceVersion",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
args: args{options: metav1.ListOptions{ResourceVersion: "rv:10"}},
want: list(11, "rv:20"),
wantPaged: true,
},
{
name: "two pages with resourceVersion and resourceVersionMatch",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
args: args{options: metav1.ListOptions{ResourceVersion: "rv:10", ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan}},
want: list(11, "rv:20"),
wantPaged: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &ListPager{
PageSize: tt.fields.PageSize,
PageFn: tt.fields.PageFn,
FullListIfExpired: tt.fields.FullListIfExpired,
}
ctx := tt.args.ctx
if ctx == nil {
ctx = context.Background()
}
got, paginatedResult, err := p.List(ctx, tt.args.options)
if (err != nil) != tt.wantErr {
t.Errorf("ListPager.List() error = %v, wantErr %v", err, tt.wantErr)
return
}
if tt.isExpired != errors.IsResourceExpired(err) {
t.Errorf("ListPager.List() error = %v, isExpired %v", err, tt.isExpired)
return
}
if tt.wantPaged != paginatedResult {
t.Errorf("paginatedResult = %t, want %t", paginatedResult, tt.wantPaged)
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ListPager.List() = %v, want %v", got, tt.want)
}
})
}
}
func TestListPager_EachListItem(t *testing.T) {
type fields struct {
PageSize int64
PageFn ListPageFunc
}
tests := []struct {
name string
fields fields
want runtime.Object
wantErr bool
wantPanic bool
isExpired bool
processorErrorOnItem int
processorPanicOnItem int
cancelContextOnItem int
}{
{
name: "empty page",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList},
want: list(0, "rv:20"),
},
{
name: "one page",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList},
want: list(9, "rv:20"),
},
{
name: "one full page",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList},
want: list(10, "rv:20"),
},
{
name: "two pages",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
want: list(11, "rv:20"),
},
{
name: "three pages",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList},
want: list(21, "rv:20"),
},
{
name: "expires on second page",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPage},
want: list(10, "rv:20"), // all items on the first page should have been visited
wantErr: true,
isExpired: true,
},
{
name: "error processing item",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 51, rv: "rv:20"}).PagedList},
want: list(3, "rv:20"), // all the items <= the one the processor returned an error on should have been visited
wantPanic: true,
processorPanicOnItem: 3,
},
{
name: "cancel context while processing",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 51, rv: "rv:20"}).PagedList},
want: list(10, "rv:20"), // The whole PageSize worth of items got returned.
wantErr: true,
cancelContextOnItem: 3,
},
}
processorErr := fmt.Errorf("processor error")
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
p := &ListPager{
PageSize: tt.fields.PageSize,
PageFn: tt.fields.PageFn,
}
var items []runtime.Object
fn := func(obj runtime.Object) error {
items = append(items, obj)
if tt.processorErrorOnItem > 0 && len(items) == tt.processorErrorOnItem {
return processorErr
}
if tt.processorPanicOnItem > 0 && len(items) == tt.processorPanicOnItem {
panic(processorErr)
}
if tt.cancelContextOnItem > 0 && len(items) == tt.cancelContextOnItem {
cancel()
}
return nil
}
var err error
var panic interface{}
func() {
defer func() {
panic = recover()
}()
err = p.EachListItem(ctx, metav1.ListOptions{}, fn)
}()
if (panic != nil) && !tt.wantPanic {
t.Errorf(".EachListItem() panic = %v, wantPanic %v", panic, tt.wantPanic)
return
}
if (err != nil) != tt.wantErr {
t.Errorf("ListPager.EachListItem() error = %v, wantErr %v", err, tt.wantErr)
return
}
if tt.isExpired != errors.IsResourceExpired(err) {
t.Errorf("ListPager.EachListItem() error = %v, isExpired %v", err, tt.isExpired)
return
}
if tt.processorErrorOnItem > 0 && err != processorErr {
t.Errorf("ListPager.EachListItem() error = %v, processorErrorOnItem %d", err, tt.processorErrorOnItem)
return
}
l := tt.want.(*metainternalversion.List)
if !reflect.DeepEqual(items, l.Items) {
t.Errorf("ListPager.EachListItem() = %v, want %v", items, l.Items)
}
})
}
}
func TestListPager_eachListPageBuffered(t *testing.T) {
tests := []struct {
name string
totalPages int
pagesProcessed int
wantPageLists int
pageBufferSize int32
pageSize int
}{
{
name: "no buffer, one total page",
totalPages: 1,
pagesProcessed: 1,
wantPageLists: 1,
pageBufferSize: 0,
}, {
name: "no buffer, 1/5 pages processed",
totalPages: 5,
pagesProcessed: 1,
wantPageLists: 2, // 1 received for processing, 1 listed
pageBufferSize: 0,
},
{
name: "no buffer, 2/5 pages processed",
totalPages: 5,
pagesProcessed: 2,
wantPageLists: 3,
pageBufferSize: 0,
},
{
name: "no buffer, 5/5 pages processed",
totalPages: 5,
pagesProcessed: 5,
wantPageLists: 5,
pageBufferSize: 0,
},
{
name: "size 1 buffer, 1/5 pages processed",
totalPages: 5,
pagesProcessed: 1,
wantPageLists: 3,
pageBufferSize: 1,
},
{
name: "size 1 buffer, 5/5 pages processed",
totalPages: 5,
pagesProcessed: 5,
wantPageLists: 5,
pageBufferSize: 1,
},
{
name: "size 10 buffer, 1/5 page processed",
totalPages: 5,
pagesProcessed: 1,
wantPageLists: 5,
pageBufferSize: 10, // buffer is larger than list
},
}
processorErr := fmt.Errorf("processor error")
pageSize := 10
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pgr := &testPager{t: t, expectPage: int64(pageSize), remaining: tt.totalPages * pageSize, rv: "rv:20"}
pageLists := 0
wantedPageListsDone := make(chan struct{})
listFn := func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
pageLists++
if pageLists == tt.wantPageLists {
close(wantedPageListsDone)
}
return pgr.PagedList(ctx, options)
}
p := &ListPager{
PageSize: int64(pageSize),
PageBufferSize: tt.pageBufferSize,
PageFn: listFn,
}
pagesProcessed := 0
fn := func(obj runtime.Object) error {
pagesProcessed++
if tt.pagesProcessed == pagesProcessed && tt.wantPageLists > 0 {
// wait for buffering to catch up
select {
case <-time.After(time.Second):
return fmt.Errorf("Timed out waiting for %d page lists", tt.wantPageLists)
case <-wantedPageListsDone:
}
return processorErr
}
return nil
}
err := p.eachListChunkBuffered(context.Background(), metav1.ListOptions{}, fn)
if tt.pagesProcessed > 0 && err == processorErr {
// expected
} else if err != nil {
t.Fatal(err)
}
if tt.wantPageLists > 0 && pageLists != tt.wantPageLists {
t.Errorf("expected %d page lists, got %d", tt.wantPageLists, pageLists)
}
if pagesProcessed != tt.pagesProcessed {
t.Errorf("expected %d pages processed, got %d", tt.pagesProcessed, pagesProcessed)
}
})
}
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦