From b2972195cb4fbfc9b73f2eb9983ab9d81cd74520 Mon Sep 17 00:00:00 2001 From: dragonflylee Date: Wed, 6 Sep 2023 17:52:39 +0800 Subject: [PATCH] add webhook --- cmd/main.go | 37 +- config/deployment.yaml | 8 +- config/kustomization.yaml | 8 + config/rbac/role.yaml | 52 ++- config/webhook/kustomizeconfig.yaml | 33 ++ config/webhook/manager_patch.yaml | 22 ++ config/webhook/manifests.yaml | 70 ++++ config/webhook/service.yaml | 12 + pkg/controllers/services.go | 4 - pkg/k8s/k8s.go | 33 +- pkg/k8s/rotator.go | 576 ++++++++++++++++++++++++++++ pkg/svc/handler.go | 39 ++ 12 files changed, 882 insertions(+), 12 deletions(-) create mode 100644 config/webhook/kustomizeconfig.yaml create mode 100644 config/webhook/manager_patch.yaml create mode 100644 config/webhook/manifests.yaml create mode 100644 config/webhook/service.yaml create mode 100644 pkg/k8s/rotator.go create mode 100644 pkg/svc/handler.go diff --git a/cmd/main.go b/cmd/main.go index c3bb6b5..16e8d89 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -6,8 +6,10 @@ import ( "monitor/pkg/controllers" "monitor/pkg/k8s" + "monitor/pkg/svc" "go.uber.org/zap/zapcore" + corev1 "k8s.io/api/core/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/log/zap" ) @@ -17,17 +19,24 @@ var ( commit string level zapcore.Level conf k8s.Config + + disableWebhook bool ) func main() { conf.HealthProbeBindAddress = "0" // Disable healthy endpoint of controller manager - conf.LeaderElectionID = "monitor.3m3m.net" + conf.LeaderElectionID = "monitor.demo.io" // 参数解析 flag.TextVar(&level, "log-level", zapcore.InfoLevel, "logger level") flag.StringVar(&conf.MetricsBindAddress, "metrics-address", "127.0.0.1:8080", "The address the metric endpoint binds to.") flag.BoolVar(&conf.LeaderElection, "enable-leader-election", false, "Enable leader election for controller manager") flag.BoolVar(&conf.PProf, "enable-pprof", false, "Enable profile debug") + flag.StringVar(&conf.CaKey.Namespace, "namespace", os.Getenv("POD_NAMESPACE"), "namespace where pod run") + flag.StringVar(&conf.CaKey.Name, "cert-ca", "monitor-ca", "cert ca secret name") + flag.StringVar(&conf.WebhookName, "cert-service-name", "monitor-webhook-service", "The service name used to generate the TLS cert's hostname") + flag.StringVar(&conf.CertDir, "cert-dir", "", "The directory where certs are stored") + flag.BoolVar(&disableWebhook, "disable-webhook", false, "diable webhook mode") flag.Parse() // 初始化日志格式 ctrl.SetLogger(zap.New(func(o *zap.Options) { @@ -37,12 +46,36 @@ func main() { })) log := ctrl.Log.WithName("monitor") - mgr, err := k8s.NewManager(conf) + setupFinished := make(chan struct{}) + mgr, err := k8s.NewManager(conf, setupFinished) if err != nil { log.Error(err, "new k8s manager") os.Exit(1) } + ev := svc.NewHandler() + + if !disableWebhook { + go func() { + <-setupFinished + + if err = mgr.Add(ev); err != nil { + log.Error(err, "add event handler") + os.Exit(1) + } + + if err = mgr.WebHook(&corev1.Pod{}).WithDefaulter(ev).Complete(); err != nil { + log.Error(err, "setup pod webhook") + os.Exit(1) + } + }() + } else { + if err = mgr.Add(ev); err != nil { + log.Error(err, "add event handler") + os.Exit(1) + } + } + if err = mgr.Setup(controllers.NewService); err != nil { log.Error(err, "setup service controller") os.Exit(1) diff --git a/config/deployment.yaml b/config/deployment.yaml index 03ce967..9c07f10 100644 --- a/config/deployment.yaml +++ b/config/deployment.yaml @@ -4,9 +4,14 @@ metadata: name: controller namespace: system spec: - replicas: 2 + replicas: 1 + selector: + matchLabels: + component: controller template: metadata: + labels: + component: controller annotations: prometheus.io/port: "8080" prometheus.io/scrape: "true" @@ -17,6 +22,7 @@ spec: - args: - --metrics-address=:8080 - --enable-leader-election + - --cert-dir=/tmp/webhook image: controller:latest name: manager env: diff --git a/config/kustomization.yaml b/config/kustomization.yaml index 916693a..90db882 100644 --- a/config/kustomization.yaml +++ b/config/kustomization.yaml @@ -13,8 +13,16 @@ resources: - rbac/role.yaml - rbac/leader_election_role.yaml - rbac/role_binding.yaml +- webhook/manifests.yaml +- webhook/service.yaml - deployment.yaml +patchesStrategicMerge: +- webhook/manager_patch.yaml + +configurations: +- webhook/kustomizeconfig.yaml + apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization images: diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 4bfff24..1522ddd 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -2,9 +2,26 @@ apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: - creationTimestamp: null name: role rules: +- apiGroups: + - "" + resources: + - Pods + verbs: + - get + - list + - watch +- apiGroups: + - "" + resources: + - secrets + verbs: + - create + - get + - list + - update + - watch - apiGroups: - "" resources: @@ -13,3 +30,36 @@ rules: - get - list - watch +- apiGroups: + - admissionregistration.k8s.io + resourceNames: + - monitor-mutating-webhook-configuration + resources: + - mutatingwebhookconfigurations + verbs: + - get + - update +- apiGroups: + - admissionregistration.k8s.io + resources: + - mutatingwebhookconfigurations + - validatingwebhookconfigurations + verbs: + - list + - watch +- apiGroups: + - admissionregistration.k8s.io + resourceNames: + - monitor-validating-webhook-configuration + resources: + - validatingwebhookconfigurations + verbs: + - get + - update +- apiGroups: + - apiextensions.k8s.io + resources: + - customresourcedefinitions + verbs: + - list + - watch diff --git a/config/webhook/kustomizeconfig.yaml b/config/webhook/kustomizeconfig.yaml new file mode 100644 index 0000000..eb792b6 --- /dev/null +++ b/config/webhook/kustomizeconfig.yaml @@ -0,0 +1,33 @@ +# the following config is for teaching kustomize where to look at when substituting vars. +# It requires kustomize v2.1.0 or newer to work properly. +nameReference: +- kind: Service + version: v1 + fieldSpecs: + - kind: CustomResourceDefinition + version: v1 + group: apiextensions.k8s.io + path: spec/conversion/webhook/clientConfig/service/name + - kind: MutatingWebhookConfiguration + group: admissionregistration.k8s.io + path: webhooks/clientConfig/service/name + - kind: ValidatingWebhookConfiguration + group: admissionregistration.k8s.io + path: webhooks/clientConfig/service/name + +namespace: +- kind: CustomResourceDefinition + version: v1 + group: apiextensions.k8s.io + path: spec/conversion/webhook/clientConfig/service/namespace +- kind: MutatingWebhookConfiguration + group: admissionregistration.k8s.io + path: webhooks/clientConfig/service/namespace + create: true +- kind: ValidatingWebhookConfiguration + group: admissionregistration.k8s.io + path: webhooks/clientConfig/service/namespace + create: true + +varReference: +- path: metadata/annotations diff --git a/config/webhook/manager_patch.yaml b/config/webhook/manager_patch.yaml new file mode 100644 index 0000000..7521b8a --- /dev/null +++ b/config/webhook/manager_patch.yaml @@ -0,0 +1,22 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: controller + namespace: system +spec: + template: + metadata: + labels: + monitor.demo.io/inject: "false" + spec: + containers: + - name: manager + ports: + - name: webhook-server + containerPort: 9443 + volumeMounts: + - name: webhook + mountPath: /tmp/webhook + volumes: + - name: webhook + emptyDir: {} diff --git a/config/webhook/manifests.yaml b/config/webhook/manifests.yaml new file mode 100644 index 0000000..194c101 --- /dev/null +++ b/config/webhook/manifests.yaml @@ -0,0 +1,70 @@ +apiVersion: admissionregistration.k8s.io/v1 +kind: MutatingWebhookConfiguration +metadata: + creationTimestamp: null + name: mutating-webhook-configuration +webhooks: +- admissionReviewVersions: + - v1beta1 + - v1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /mutate--v1-pod + failurePolicy: Fail + name: namespace.injector.kb.io + namespaceSelector: + matchExpressions: + - key: monitor-injection + operator: In + values: + - enabled + objectSelector: + matchExpressions: + - key: monitor.demo.io/inject + operator: NotIn + values: + - "false" + rules: + - apiGroups: + - "" + apiVersions: + - v1 + operations: + - CREATE + resources: + - pods + scope: '*' + sideEffects: None +- admissionReviewVersions: + - v1beta1 + - v1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /mutate--v1-pod + failurePolicy: Fail + name: object.injector.kb.io + namespaceSelector: + matchExpressions: + - key: monitor-injection + operator: DoesNotExist + objectSelector: + matchExpressions: + - key: monitor.demo.io/inject + operator: In + values: + - "true" + rules: + - apiGroups: + - "" + apiVersions: + - v1 + operations: + - CREATE + resources: + - pods + scope: '*' + sideEffects: None diff --git a/config/webhook/service.yaml b/config/webhook/service.yaml new file mode 100644 index 0000000..4733397 --- /dev/null +++ b/config/webhook/service.yaml @@ -0,0 +1,12 @@ + +apiVersion: v1 +kind: Service +metadata: + name: webhook-service + namespace: system +spec: + selector: + component: controller + ports: + - port: 443 + targetPort: 9443 diff --git a/pkg/controllers/services.go b/pkg/controllers/services.go index 96215c5..d495a74 100644 --- a/pkg/controllers/services.go +++ b/pkg/controllers/services.go @@ -20,15 +20,11 @@ type ServiceReconciler struct { //+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch; func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - l := ctrl.LoggerFrom(ctx) - var svc corev1.Service err := r.client.Get(ctx, req.NamespacedName, &svc) if errors.IsNotFound(err) { return ctrl.Result{}, nil } - - l.Info("got svc") return ctrl.Result{}, nil } diff --git a/pkg/k8s/k8s.go b/pkg/k8s/k8s.go index 5098ee3..63b340e 100644 --- a/pkg/k8s/k8s.go +++ b/pkg/k8s/k8s.go @@ -9,23 +9,29 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/manager/signals" ) +var log = ctrl.Log.WithName("k8s") + type Config struct { ctrl.Options - PProf bool + PProf bool + WebhookName string + CaKey types.NamespacedName } type Manager struct { mgr ctrl.Manager } -func NewManager(conf Config) (*Manager, error) { +func NewManager(conf Config, setupFinished chan struct{}) (*Manager, error) { restConfig, err := config.GetConfig() if err != nil { return nil, err @@ -52,6 +58,25 @@ func NewManager(conf Config) (*Manager, error) { mgr.AddMetricsExtraHandler("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) } + if conf.CertDir != "" { + // Make sure certs are generated and valid if cert rotation is enabled. + cr := &CertRotator{ + CaKey: conf.CaKey, + CertDir: conf.CertDir, + DNSName: fmt.Sprintf("%s.%s.svc", conf.WebhookName, conf.CaKey.Namespace), + IsReady: setupFinished, + Webhooks: []WebhookInfo{ + { + Name: "monitor-mutating-webhook-configuration", + Type: Mutating, + }, + }, + } + if err = AddRotator(mgr, cr); err != nil { + return nil, err + } + } + return &Manager{mgr: mgr}, nil } @@ -63,8 +88,8 @@ func (s *Manager) Setup(f func(manager.Manager) error) error { return f(s.mgr) } -func (s *Manager) WebHook(obj runtime.Object) error { - return ctrl.NewWebhookManagedBy(s.mgr).For(obj).Complete() +func (s *Manager) WebHook(obj runtime.Object) *builder.WebhookBuilder { + return ctrl.NewWebhookManagedBy(s.mgr).For(obj) } func (s *Manager) Start() error { diff --git a/pkg/k8s/rotator.go b/pkg/k8s/rotator.go new file mode 100644 index 0000000..de6f41b --- /dev/null +++ b/pkg/k8s/rotator.go @@ -0,0 +1,576 @@ +package k8s + +import ( + "bytes" + "context" + "crypto" + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "encoding/base64" + "encoding/pem" + "errors" + "fmt" + "math/big" + "net" + "os" + "path/filepath" + "sync/atomic" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +var logcr = ctrl.Log.WithName("cert-rotation") + +const ( + rotationCheckFrequency = 12 * time.Hour + lookaheadInterval = 90 * rotationCheckFrequency + certValidityDuration = 10 * 365 * 24 * time.Hour + + bits = 2048 + CACertKey = "ca.crt" +) + +// WebhookType it the type of webhook, either validating/mutating webhook or a CRD conversion webhook +type WebhookType int + +const ( + // ValidatingWebhook indicates the webhook is a ValidatingWebhook + Validating WebhookType = iota + // MutingWebhook indicates the webhook is a MutatingWebhook + Mutating + // CRDConversionWebhook indicates the webhook is a conversion webhook + CRDConversion +) + +// AddRotator adds the CertRotator and ReconcileWH to the manager. +func AddRotator(mgr manager.Manager, cr *CertRotator) error { + // add a new namespace-scoped cache.Cache to the provided manager + c, err := cache.New(mgr.GetConfig(), cache.Options{ + Scheme: mgr.GetScheme(), + Mapper: mgr.GetRESTMapper(), + Namespace: cr.CaKey.Namespace, + }) + + if err = mgr.Add(c); err != nil { + return err + } + + cr.reader = c + cr.writer = mgr.GetClient() + cr.wasCAInjected = new(atomic.Value) + cr.caNotInjected = make(chan struct{}) + + if err = mgr.Add(cr); err != nil { + return err + } + + r := &ReconcileWH{ + Client: mgr.GetClient(), + secretKey: cr.CaKey, + webhooks: cr.Webhooks, + wasCAInjected: cr.wasCAInjected, + } + + // Create a new controller + reconciler, err := controller.NewUnmanaged("cert-rotator", mgr, controller.Options{Reconciler: r}) + if err != nil { + return err + } + + err = reconciler.Watch(source.NewKindWithCache(&corev1.Secret{}, c), + &handler.EnqueueRequestForObject{}) + if err != nil { + return err + } + + for _, wh := range cr.Webhooks { + obj := new(unstructured.Unstructured) + obj.SetGroupVersionKind(wh.GVK()) + err = reconciler.Watch(source.NewKindWithCache(obj, c), + handler.EnqueueRequestsFromMapFunc(func(object client.Object) []reconcile.Request { + if object.GetName() != wh.Name { + return nil + } + return []reconcile.Request{{NamespacedName: r.secretKey}} + }), + ) + if err != nil { + return err + } + } + return mgr.Add(&controllerWrapper{Controller: reconciler}) +} + +type controllerWrapper struct { + controller.Controller + needLeaderElection bool +} + +func (cw controllerWrapper) NeedLeaderElection() bool { + return cw.needLeaderElection +} + +var _ manager.Runnable = &CertRotator{} + +// WebhookInfo is used by the certmgr to receive info about resources to be updated with certificates +type WebhookInfo struct { + //Name is the name of the webhook for a validating or mutating webhook, or the CRD name in case of a CRD conversion webhook + Name string + Type WebhookType +} + +func (w WebhookInfo) GVK() schema.GroupVersionKind { + t2g := map[WebhookType]schema.GroupVersionKind{ + Validating: {Group: "admissionregistration.k8s.io", Version: "v1", Kind: "ValidatingWebhookConfiguration"}, + Mutating: {Group: "admissionregistration.k8s.io", Version: "v1", Kind: "MutatingWebhookConfiguration"}, + CRDConversion: {Group: "apiextensions.k8s.io", Version: "v1", Kind: "CustomResourceDefinition"}, + } + return t2g[w.Type] +} + +// SyncingSource is a reader that needs syncing prior to being usable. +type SyncingReader interface { + client.Reader + WaitForCacheSync(ctx context.Context) bool +} + +// CertRotator contains cert artifacts and a channel to close when the certs are ready. +type CertRotator struct { + reader SyncingReader + writer client.Writer + + CaKey types.NamespacedName + DNSName string + CertDir string + Webhooks []WebhookInfo + IsReady chan struct{} + wasCAInjected *atomic.Value + caNotInjected chan struct{} + // RequireLeaderElection should be set to true if the CertRotator needs to + // be run in the leader election mode. + RequireLeaderElection bool +} + +func (cr *CertRotator) NeedLeaderElection() bool { + return cr.RequireLeaderElection +} + +// Start starts the CertRotator runnable to rotate certs and ensure the certs are ready. +func (cr *CertRotator) Start(ctx context.Context) error { + // explicitly rotate on the first round so that the certificate + // can be bootstrapped, otherwise manager exits before a cert can be written + log.Info("starting cert certmgr controller") + if !cr.reader.WaitForCacheSync(ctx) { + return errors.New("failed waiting for reader to sync") + } + + if err := cr.refreshCertIfNeeded(ctx); err != nil { + log.Error(err, "could not refresh cert on startup") + return err + } + + // Once the certs are ready, close the channel. + go cr.ensureReady() + + ticker := time.NewTicker(rotationCheckFrequency) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := cr.refreshCertIfNeeded(ctx); err != nil { + log.Error(err, "error rotating certs") + } + case <-cr.caNotInjected: + return errors.New("could not inject certs to webhooks") + case <-ctx.Done(): + log.Info("stopping cert certmgr controller") + return nil + } + } +} + +// refreshCertIfNeeded returns whether there's any error when refreshing the certs if needed. +func (cr *CertRotator) refreshCertIfNeeded(ctx context.Context) error { + refreshFn := func() (bool, error) { + lookahead := time.Now().Add(lookaheadInterval) + caKey, err := cr.refreshCA(ctx, lookahead) + if err != nil { + return false, err + } + + if err = cr.refreshFile(ctx, caKey, lookahead); err != nil { + return false, err + } + + return true, nil + } + + if err := wait.ExponentialBackoff(wait.Backoff{ + Duration: 10 * time.Millisecond, + Factor: 2, + Jitter: 1, + Steps: 10, + }, refreshFn); err != nil { + return err + } + return nil +} + +// refreshCA creates the self-signed CA cert and private key that will +// be used to sign the server certificate +func (cr *CertRotator) refreshCA(ctx context.Context, lookahead time.Time) (*KeyPair, error) { + var ca = new(corev1.Secret) + caFile := filepath.Join(cr.CertDir, CACertKey) + + if err := cr.reader.Get(ctx, cr.CaKey, ca); err != nil { + if !apierrors.IsNotFound(err) { + return nil, fmt.Errorf("acquiring ca to update certificates %w", err) + } + ca.Name = cr.CaKey.Name + ca.Namespace = cr.CaKey.Namespace + } else if caKey, err := loadCert(ca); err == nil { + // 已存在证书 验证是否有效 + if err = validCert(ca, caKey.Cert, lookahead); err == nil { + if _, err = os.Stat(caFile); os.IsNotExist(err) { + if err = os.WriteFile(caFile, ca.Data[corev1.TLSCertKey], os.ModePerm); err != nil { + log.Error(err, "could not write ca", "path", caFile) + return nil, err + } + } + return caKey, nil + } + } + + log.Info("refreshing ca", "name", cr.CaKey) + if err := cr.refreshCerts(ctx, nil, ca); err != nil { + log.Error(err, "could not refresh ca", "name", cr.CaKey) + return nil, err + } + + if err := os.WriteFile(caFile, ca.Data[corev1.TLSCertKey], os.ModePerm); err != nil { + log.Error(err, "could not write ca", "path", caFile) + return nil, err + } + log.Info("server ca refreshed", "name", cr.CaKey, "file", caFile) + return loadCert(ca) +} + +func (cr *CertRotator) refreshFile(ctx context.Context, ca *KeyPair, lookahead time.Time) error { + certFile := filepath.Join(cr.CertDir, corev1.TLSCertKey) + keyFile := filepath.Join(cr.CertDir, corev1.TLSPrivateKeyKey) + crt, err := tls.LoadX509KeyPair(certFile, keyFile) + if err == nil { + certDer, err := x509.ParseCertificate(crt.Certificate[0]) + if err != nil { + return err + } + caPool := x509.NewCertPool() + caPool.AddCert(ca.Cert) + _, err = certDer.Verify(x509.VerifyOptions{ + DNSName: cr.DNSName, + Roots: caPool, + CurrentTime: lookahead, + }) + if err == nil { + return nil + } + } + certBytes, key, err := CreateCertPEM(ca, cr.DNSName) + if err != nil { + return err + } + if err = os.WriteFile(certFile, certBytes, os.ModePerm); err != nil { + return err + } + if err = os.WriteFile(keyFile, key, os.ModePerm); err != nil { + return err + } + log.Info("refresh cert complete", "cert", certFile, "key", keyFile) + return nil +} + +func (cr *CertRotator) refreshCerts(ctx context.Context, ca *KeyPair, s *corev1.Secret, hosts ...string) error { + cert, key, err := CreateCertPEM(ca, hosts...) + if err != nil { + return err + } + // updateSecret + s.Data = map[string][]byte{ + corev1.TLSCertKey: cert, + corev1.TLSPrivateKeyKey: key, + } + if !s.CreationTimestamp.IsZero() { + return cr.writer.Update(ctx, s) + } + + s.Type = corev1.SecretTypeTLS + return cr.writer.Create(ctx, s) +} + +// ensureReady ensure the cert files exist and the CAs are injected. +func (cr *CertRotator) ensureReady() { + checkFn := func() (bool, error) { + certFile := cr.CertDir + "/" + corev1.TLSCertKey + _, err := os.Stat(certFile) + if err != nil { + return false, nil + } + return cr.wasCAInjected.Load() != nil, nil + } + if err := wait.ExponentialBackoff(wait.Backoff{ + Duration: 1 * time.Second, + Factor: 2, + Jitter: 1, + Steps: 10, + }, checkFn); err != nil { + log.Error(err, "max retries for checking CA injection") + close(cr.caNotInjected) + return + } + log.Info("CA certs are injected to webhooks") + close(cr.IsReady) +} + +// KeyPair stores cert artifacts. +type KeyPair struct { + Cert *x509.Certificate + Key crypto.PrivateKey +} + +func loadCert(s *corev1.Secret) (*KeyPair, error) { + c, err := tls.X509KeyPair(s.Data[corev1.TLSCertKey], s.Data[corev1.TLSPrivateKeyKey]) + if err != nil { + return nil, err + } + certDer, err := x509.ParseCertificate(c.Certificate[0]) + if err != nil { + return nil, err + } + return &KeyPair{ + Key: c.PrivateKey, + Cert: certDer, + }, nil +} + +func validCert(s *corev1.Secret, ca *x509.Certificate, lookahead time.Time) error { + caPool := x509.NewCertPool() + caPool.AddCert(ca) + + c, err := tls.X509KeyPair(s.Data[corev1.TLSCertKey], s.Data[corev1.TLSPrivateKeyKey]) + if err != nil { + return err + } + certDer, err := x509.ParseCertificate(c.Certificate[0]) + if err != nil { + return err + } + _, err = certDer.Verify(x509.VerifyOptions{ + Roots: caPool, + CurrentTime: lookahead, + }) + return err +} + +// CreateCertPEM takes the results of CreateCACert and uses it to create the +// PEM-encoded public certificate and private key, respectively +func CreateCertPEM(ca *KeyPair, hosts ...string) ([]byte, []byte, error) { + privKey, err := rsa.GenerateKey(rand.Reader, bits) + if err != nil { + return nil, nil, err + } + + validFrom := time.Now().Add(-time.Hour) // valid an hour earlier to avoid flakes due to clock skew + tpl := &x509.Certificate{ + NotBefore: validFrom, + NotAfter: validFrom.Add(certValidityDuration), + KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment | x509.KeyUsageCertSign, + BasicConstraintsValid: true, + IsCA: true, + } + + max := new(big.Int).Lsh(big.NewInt(1), 128) + if tpl.SerialNumber, err = rand.Int(rand.Reader, max); err != nil { + return nil, nil, err + } + + for _, h := range hosts { + if ip := net.ParseIP(h); ip != nil { + tpl.IPAddresses = append(tpl.IPAddresses, ip) + } else { + tpl.DNSNames = append(tpl.DNSNames, h) + } + } + + if ca == nil { + ca = &KeyPair{ + Key: privKey, + Cert: tpl, + } + } else if len(hosts) > 0 { + tpl.Subject.CommonName = hosts[0] + } + + derBytes, err := x509.CreateCertificate(rand.Reader, tpl, ca.Cert, privKey.Public(), ca.Key) + if err != nil { + return nil, nil, err + } + + // pemEncode takes a certificate and encodes it as PEM + certBuf := &bytes.Buffer{} + if err := pem.Encode(certBuf, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes}); err != nil { + return nil, nil, fmt.Errorf("encoding cert %w", err) + } + keyBuf := &bytes.Buffer{} + if err := pem.Encode(keyBuf, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(privKey)}); err != nil { + return nil, nil, fmt.Errorf("encoding key %w", err) + } + return certBuf.Bytes(), keyBuf.Bytes(), nil +} + +// ReconcileWH reconciles a validatingwebhookconfiguration, making sure it +// has the appropriate CA cert +type ReconcileWH struct { + client.Client + secretKey types.NamespacedName + webhooks []WebhookInfo + wasCAInjected *atomic.Value +} + +// +kubebuilder:rbac:groups="",resources=secrets,verbs=create;get;list;watch;update +// +kubebuilder:rbac:groups="admissionregistration.k8s.io",resources=validatingwebhookconfigurations;mutatingwebhookconfigurations,verbs=list;watch +// +kubebuilder:rbac:groups="admissionregistration.k8s.io",resources=validatingwebhookconfigurations,resourceNames=monitor-validating-webhook-configuration,verbs=get;update +// +kubebuilder:rbac:groups="admissionregistration.k8s.io",resources=mutatingwebhookconfigurations,resourceNames=monitor-mutating-webhook-configuration,verbs=get;update +// +kubebuilder:rbac:groups="apiextensions.k8s.io",resources=customresourcedefinitions,verbs=list;watch +// Reconcile reads that state of the cluster for a validatingwebhookconfiguration +// object and makes sure the most recent CA cert is included + +func (r *ReconcileWH) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + if req.NamespacedName != r.secretKey { + return ctrl.Result{}, nil + } + + var secret corev1.Secret + if err := r.Get(ctx, req.NamespacedName, &secret); err != nil { + if apierrors.IsNotFound(err) { + // Object not found, return. Created objects are automatically garbage collected. + // For additional cleanup logic use finalizers. + return ctrl.Result{}, nil + } + // Error reading the object - requeue the request. + return ctrl.Result{Requeue: true}, err + } + + if secret.GetDeletionTimestamp().IsZero() { + caPem, ok := secret.Data[corev1.TLSCertKey] + if !ok { + return ctrl.Result{}, nil + } + + // Ensure certs on webhooks + if err := r.ensureCerts(ctx, caPem); err != nil { + return ctrl.Result{}, err + } + + // Set CAInjected if the reconciler has not exited early. + r.wasCAInjected.Store(true) + } + + return ctrl.Result{}, nil +} + +// ensureCerts returns an arbitrary error if multiple errors are encountered, +// while all the errors are logged. +// This is important to allow the controller to reconcile the secret. If an error +// is returned, request will be requeued, and the controller will attempt to reconcile +// the secret again. +// When an error is encountered for when processing a webhook, the error is logged, but +// following webhooks are also attempted to be updated. If multiple errors occur for different +// webhooks, only the last one will be returned. This is ok, as the returned error is only meant +// to indicate that reconciliation failed. The information about all the errors is passed not +// by the returned error, but rather in the logged errors. +func (r *ReconcileWH) ensureCerts(ctx context.Context, certPem []byte) error { + encodedPem := base64.StdEncoding.EncodeToString(certPem) + + for _, wh := range r.webhooks { + obj := new(unstructured.Unstructured) + obj.SetGroupVersionKind(wh.GVK()) + + err := r.Get(ctx, types.NamespacedName{Name: wh.Name}, obj) + if err != nil { + log.Error(err, "Webhook not found. Unable to update certificate.") + continue + } + if !obj.GetDeletionTimestamp().IsZero() { + log.Info("Webhook is being deleted. Unable to update certificate") + continue + } + + log.Info("Ensuring CA cert", "name", wh.Name, "GVK", wh.GVK()) + + switch wh.Type { + case Validating, Mutating: + err = injectCertToWebhook(obj, encodedPem) + case CRDConversion: + err = injectCertToConversionWebhook(obj, encodedPem) + default: + return fmt.Errorf("incorrect webhook type `%v`", wh.Type) + } + if err != nil { + log.Error(err, "Unable to inject cert to webhook.") + continue + } + if err = r.Update(ctx, obj); err != nil { + log.Error(err, "Error updating webhook with certificate") + } + } + return nil +} + +func injectCertToWebhook(wh *unstructured.Unstructured, certPem string) error { + webhooks, found, err := unstructured.NestedSlice(wh.Object, "webhooks") + if err != nil { + return err + } + if !found { + return fmt.Errorf("`webhooks` field not found in ValidatingWebhookConfiguration") + } + for i, h := range webhooks { + hook, ok := h.(map[string]interface{}) + if !ok { + return fmt.Errorf("webhook %d is not well-formed", i) + } + if err = unstructured.SetNestedField(hook, certPem, "clientConfig", "caBundle"); err != nil { + return err + } + webhooks[i] = hook + } + return unstructured.SetNestedSlice(wh.Object, webhooks, "webhooks") +} + +func injectCertToConversionWebhook(crd *unstructured.Unstructured, certPem string) error { + _, found, err := unstructured.NestedMap(crd.Object, "spec", "conversion", "webhook", "clientConfig") + if err != nil { + return err + } + if !found { + return fmt.Errorf("`clientConfig` field not found in CustomResourceDefinition `%s`", crd.GetName()) + } + return unstructured.SetNestedField(crd.Object, certPem, "spec", "conversion", "webhook", "clientConfig", "caBundle") +} diff --git a/pkg/svc/handler.go b/pkg/svc/handler.go new file mode 100644 index 0000000..fbb3f10 --- /dev/null +++ b/pkg/svc/handler.go @@ -0,0 +1,39 @@ +package svc + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" +) + +var log = ctrl.Log.WithName("handler") + +type EventHandler struct { + update chan interface{} +} + +func NewHandler() *EventHandler { + return &EventHandler{ + update: make(chan interface{}, 100), + } +} + +func (ev *EventHandler) Default(ctx context.Context, obj runtime.Object) error { + switch obj := obj.(type) { + case *corev1.Pod: + log.Info("pod create", "namespace", obj.Namespace, "spec", obj.Spec) + } + return nil +} + +func (ev *EventHandler) Start(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + log.Info("EventHandler exit") + return nil + } + } +}