kubernetes server 源码
kubernetes server 代码
文件路径:/staging/src/k8s.io/pod-security-admission/cmd/webhook/server/server.go
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package server implements a Server object for running the admission controller as a webhook server.
package server
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"time"
"github.com/spf13/cobra"
admissionv1 "k8s.io/api/admission/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
apiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
kubeinformers "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
compbasemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/version/verflag"
"k8s.io/klog/v2"
"k8s.io/pod-security-admission/admission"
admissionapi "k8s.io/pod-security-admission/admission/api"
podsecurityconfigloader "k8s.io/pod-security-admission/admission/api/load"
"k8s.io/pod-security-admission/api"
"k8s.io/pod-security-admission/cmd/webhook/server/options"
"k8s.io/pod-security-admission/metrics"
"k8s.io/pod-security-admission/policy"
)
const maxRequestSize = int64(3 * 1024 * 1024)
// NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions
func NewServerCommand() *cobra.Command {
opts := options.NewOptions()
cmdName := "podsecurity-webhook"
if executable, err := os.Executable(); err == nil {
cmdName = filepath.Base(executable)
}
cmd := &cobra.Command{
Use: cmdName,
Long: `The PodSecurity webhook is a standalone webhook server implementing the Pod
Security Standards.`,
RunE: func(cmd *cobra.Command, _ []string) error {
verflag.PrintAndExitIfRequested()
return runServer(cmd.Context(), opts)
},
Args: cobra.NoArgs,
}
opts.AddFlags(cmd.Flags())
verflag.AddFlags(cmd.Flags())
return cmd
}
func runServer(ctx context.Context, opts *options.Options) error {
config, err := LoadConfig(opts)
if err != nil {
return err
}
server, err := Setup(config)
if err != nil {
return err
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
stopCh := apiserver.SetupSignalHandler()
<-stopCh
cancel()
}()
return server.Start(ctx)
}
type Server struct {
secureServing *apiserver.SecureServingInfo
insecureServing *apiserver.DeprecatedInsecureServingInfo
informerFactory kubeinformers.SharedInformerFactory
delegate *admission.Admission
metricsRegistry compbasemetrics.KubeRegistry
}
func (s *Server) Start(ctx context.Context) error {
s.informerFactory.Start(ctx.Done())
mux := http.NewServeMux()
healthz.InstallHandler(mux, healthz.PingHealthz)
healthz.InstallReadyzHandler(mux, healthz.NewInformerSyncHealthz(s.informerFactory))
// The webhook is stateless, so it's safe to expose everything on the insecure port for
// debugging or proxy purposes. The API server will not connect to an http webhook.
mux.HandleFunc("/", s.HandleValidate)
// Serve the metrics.
mux.Handle("/metrics",
compbasemetrics.HandlerFor(s.metricsRegistry, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}))
if s.insecureServing != nil {
if err := s.insecureServing.Serve(mux, 0, ctx.Done()); err != nil {
return fmt.Errorf("failed to start insecure server: %w", err)
}
}
var shutdownCh <-chan struct{}
var listenerStoppedCh <-chan struct{}
if s.secureServing != nil {
var err error
shutdownCh, listenerStoppedCh, err = s.secureServing.Serve(mux, 0, ctx.Done())
if err != nil {
return fmt.Errorf("failed to start secure server: %w", err)
}
}
<-listenerStoppedCh
klog.V(1).InfoS("[graceful-termination] HTTP Server has stopped listening")
// Wait for graceful shutdown.
<-shutdownCh
klog.V(1).Info("[graceful-termination] HTTP Server is exiting")
return nil
}
func (s *Server) HandleValidate(w http.ResponseWriter, r *http.Request) {
defer utilruntime.HandleCrash(func(_ interface{}) {
// Assume the crash happened before the response was written.
http.Error(w, "internal server error", http.StatusInternalServerError)
})
var (
body []byte
err error
ctx = r.Context()
)
if timeout, ok, err := parseTimeout(r); err != nil {
// Ignore an invalid timeout.
klog.V(2).InfoS("Invalid timeout", "error", err)
} else if ok {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
if r.Body == nil {
err = errors.New("request body is empty")
klog.ErrorS(err, "bad request")
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
defer r.Body.Close()
limitedReader := &io.LimitedReader{R: r.Body, N: maxRequestSize}
if body, err = ioutil.ReadAll(limitedReader); err != nil {
klog.ErrorS(err, "unable to read the body from the incoming request")
http.Error(w, "unable to read the body from the incoming request", http.StatusBadRequest)
return
}
if limitedReader.N <= 0 {
klog.ErrorS(err, "unable to read the body from the incoming request; limit reached")
http.Error(w, fmt.Sprintf("request entity is too large; limit is %d bytes", maxRequestSize), http.StatusRequestEntityTooLarge)
return
}
// verify the content type is accurate
if contentType := r.Header.Get("Content-Type"); contentType != "application/json" {
err = fmt.Errorf("contentType=%s, expected application/json", contentType)
klog.ErrorS(err, "unable to process a request with an unknown content type", "content type", contentType)
http.Error(w, "unable to process a request with a non-json content type", http.StatusBadRequest)
return
}
v1AdmissionReviewKind := admissionv1.SchemeGroupVersion.WithKind("AdmissionReview")
reviewObject, gvk, err := codecs.UniversalDeserializer().Decode(body, &v1AdmissionReviewKind, nil)
if err != nil {
klog.ErrorS(err, "unable to decode the request")
http.Error(w, "unable to decode the request", http.StatusBadRequest)
return
}
if *gvk != v1AdmissionReviewKind {
klog.InfoS("Unexpected AdmissionReview kind", "kind", gvk.String())
http.Error(w, fmt.Sprintf("unexpected AdmissionReview kind: %s", gvk.String()), http.StatusBadRequest)
return
}
review, ok := reviewObject.(*admissionv1.AdmissionReview)
if !ok {
klog.InfoS("Failed admissionv1.AdmissionReview type assertion")
http.Error(w, "unexpected AdmissionReview type", http.StatusBadRequest)
}
klog.V(1).InfoS("received request", "UID", review.Request.UID, "kind", review.Request.Kind, "resource", review.Request.Resource)
attributes := api.RequestAttributes(review.Request, codecs.UniversalDeserializer())
response := s.delegate.Validate(ctx, attributes)
response.UID = review.Request.UID // Response UID must match request UID
review.Response = response
writeResponse(w, review)
}
// Config holds the loaded options.Options used to set up the webhook server.
type Config struct {
SecureServing *apiserver.SecureServingInfo
InsecureServing *apiserver.DeprecatedInsecureServingInfo
KubeConfig *restclient.Config
PodSecurityConfig *admissionapi.PodSecurityConfiguration
}
// LoadConfig loads the Config from the Options.
func LoadConfig(opts *options.Options) (*Config, error) {
if errs := opts.Validate(); len(errs) > 0 {
return nil, utilerrors.NewAggregate(errs)
}
var c Config
opts.SecureServing.ApplyTo(&c.SecureServing)
opts.InsecureServing.ApplyTo(&c.InsecureServing)
// Load Kube Client
kubeConfig, err := clientcmd.BuildConfigFromFlags("", opts.Kubeconfig)
if err != nil {
return nil, err
}
kubeConfig.QPS = opts.ClientQPSLimit
kubeConfig.Burst = opts.ClientQPSBurst
c.KubeConfig = restclient.AddUserAgent(kubeConfig, "podsecurity-webhook")
// Load PodSecurity config
c.PodSecurityConfig, err = podsecurityconfigloader.LoadFromFile(opts.Config)
if err != nil {
return nil, err
}
return &c, nil
}
// Setup creates an Admission object to handle the admission logic.
func Setup(c *Config) (*Server, error) {
s := &Server{
secureServing: c.SecureServing,
insecureServing: c.InsecureServing,
}
if s.secureServing == nil && s.insecureServing == nil {
return nil, errors.New("no serving info configured")
}
client, err := clientset.NewForConfig(c.KubeConfig)
if err != nil {
return nil, err
}
s.informerFactory = kubeinformers.NewSharedInformerFactory(client, 0 /* no resync */)
namespaceInformer := s.informerFactory.Core().V1().Namespaces()
namespaceLister := namespaceInformer.Lister()
evaluator, err := policy.NewEvaluator(policy.DefaultChecks())
if err != nil {
return nil, fmt.Errorf("could not create PodSecurityRegistry: %w", err)
}
metrics := metrics.NewPrometheusRecorder(api.GetAPIVersion())
s.metricsRegistry = compbasemetrics.NewKubeRegistry()
metrics.MustRegister(s.metricsRegistry.MustRegister)
s.delegate = &admission.Admission{
Configuration: c.PodSecurityConfig,
Evaluator: evaluator,
Metrics: metrics,
PodSpecExtractor: admission.DefaultPodSpecExtractor{},
PodLister: admission.PodListerFromClient(client),
NamespaceGetter: admission.NamespaceGetterFromListerAndClient(namespaceLister, client),
}
if err := s.delegate.CompleteConfiguration(); err != nil {
return nil, fmt.Errorf("configuration error: %w", err)
}
if err := s.delegate.ValidateConfiguration(); err != nil {
return nil, fmt.Errorf("invalid configuration: %w", err)
}
return s, nil
}
func writeResponse(w http.ResponseWriter, review *admissionv1.AdmissionReview) {
// Webhooks should always respond with a 200 HTTP status code when an AdmissionResponse can be sent.
// In an error case, the true status code is captured in the response.result.code
if err := json.NewEncoder(w).Encode(review); err != nil {
klog.ErrorS(err, "Failed to encode response")
// Unable to send an AdmissionResponse, fall back to an HTTP error.
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
// parseTimeout parses the given HTTP request URL and extracts the timeout query parameter
// value if specified by the user.
// If a timeout is not specified the function returns false and err is set to nil
// If the value specified is malformed then the function returns false and err is set
func parseTimeout(req *http.Request) (time.Duration, bool, error) {
value := req.URL.Query().Get("timeout")
if value == "" {
return 0, false, nil
}
timeout, err := time.ParseDuration(value)
if err != nil {
return 0, false, fmt.Errorf("invalid timeout query: %w", err)
}
return timeout, true, nil
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦