kubernetes storageversion 源码

  • 2022-09-18
  • 浏览 (363)

kubernetes storageversion 代码

文件路径:/staging/src/k8s.io/apiserver/pkg/endpoints/filters/storageversion.go

/*
Copyright 2020 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package filters

import (
	"errors"
	"fmt"
	"net/http"

	apierrors "k8s.io/apimachinery/pkg/api/errors"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/apiserver/pkg/authentication/user"
	"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
	"k8s.io/apiserver/pkg/endpoints/request"
	"k8s.io/apiserver/pkg/storageversion"
	_ "k8s.io/component-base/metrics/prometheus/workqueue" // for workqueue metric registration
	"k8s.io/klog/v2"
)

// WithStorageVersionPrecondition checks if the storage version barrier has
// completed, if not, it only passes the following API requests:
// 1. non-resource requests,
// 2. read requests,
// 3. write requests to the storageversion API,
// 4. create requests to the namespace API sent by apiserver itself,
// 5. write requests to the lease API in kube-system namespace,
// 6. resources whose StorageVersion is not pending update, including non-persisted resources.
func WithStorageVersionPrecondition(handler http.Handler, svm storageversion.Manager, s runtime.NegotiatedSerializer) http.Handler {
	if svm == nil {
		// TODO(roycaihw): switch to warning after the feature graduate to beta/GA
		klog.V(2).Infof("Storage Version barrier is disabled")
		return handler
	}
	return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
		if svm.Completed() {
			handler.ServeHTTP(w, req)
			return
		}
		ctx := req.Context()
		requestInfo, found := request.RequestInfoFrom(ctx)
		if !found {
			responsewriters.InternalError(w, req, errors.New("no RequestInfo found in the context"))
			return
		}
		// Allow non-resource requests
		if !requestInfo.IsResourceRequest {
			handler.ServeHTTP(w, req)
			return
		}
		// Allow read requests
		if requestInfo.Verb == "get" || requestInfo.Verb == "list" || requestInfo.Verb == "watch" {
			handler.ServeHTTP(w, req)
			return
		}
		// Allow writes to the storage version API
		if requestInfo.APIGroup == "internal.apiserver.k8s.io" && requestInfo.Resource == "storageversions" {
			handler.ServeHTTP(w, req)
			return
		}
		// The system namespace is required for apiserver-identity lease to exist. Allow the apiserver
		// itself to create namespaces.
		// NOTE: with this exception, if the bootstrap client writes namespaces with a new version,
		// and the upgraded apiserver dies before updating the StorageVersion for namespaces, the
		// storage migrator won't be able to tell these namespaces are stored in a different version in etcd.
		// Because the bootstrap client only creates system namespace and doesn't update them, this can
		// only happen if the upgraded apiserver is the first apiserver that kicks off namespace creation,
		// or if an upgraded server that joins an existing cluster has new system namespaces (other
		// than kube-system, kube-public, kube-node-lease) that need to be created.
		u, hasUser := request.UserFrom(ctx)
		if requestInfo.APIGroup == "" && requestInfo.Resource == "namespaces" &&
			requestInfo.Verb == "create" && hasUser &&
			u.GetName() == user.APIServerUser && contains(u.GetGroups(), user.SystemPrivilegedGroup) {
			handler.ServeHTTP(w, req)
			return
		}
		// Allow writes to the lease API in kube-system. The storage version API depends on the
		// apiserver-identity leases to operate. Leases in kube-system are either apiserver-identity
		// lease (which gets garbage collected when stale) or leader-election leases (which gets
		// periodically updated by system components). Both types of leases won't be stale in etcd.
		if requestInfo.APIGroup == "coordination.k8s.io" && requestInfo.Resource == "leases" &&
			requestInfo.Namespace == metav1.NamespaceSystem {
			handler.ServeHTTP(w, req)
			return
		}
		// If the resource's StorageVersion is not in the to-be-updated list, let it pass.
		// Non-persisted resources are not in the to-be-updated list, so they will pass.
		gr := schema.GroupResource{requestInfo.APIGroup, requestInfo.Resource}
		if !svm.PendingUpdate(gr) {
			handler.ServeHTTP(w, req)
			return
		}

		gv := schema.GroupVersion{requestInfo.APIGroup, requestInfo.APIVersion}
		responsewriters.ErrorNegotiated(apierrors.NewServiceUnavailable(fmt.Sprintf("wait for storage version registration to complete for resource: %v, last seen error: %v", gr, svm.LastUpdateError(gr))), s, gv, w, req)
	})
}

func contains(s []string, e string) bool {
	for _, a := range s {
		if a == e {
			return true
		}
	}
	return false
}

相关信息

kubernetes 源码目录

相关文章

kubernetes audit 源码

kubernetes audit_annotations 源码

kubernetes audit_test 源码

kubernetes authentication 源码

kubernetes authentication_test 源码

kubernetes authn_audit 源码

kubernetes authn_audit_test 源码

kubernetes authorization 源码

kubernetes authorization_test 源码

kubernetes cachecontrol 源码

0  赞