1
0

add webhook

This commit is contained in:
2023-09-06 17:52:39 +08:00
parent 586abdfd7b
commit b2972195cb
12 changed files with 882 additions and 12 deletions

View File

@@ -6,8 +6,10 @@ import (
"monitor/pkg/controllers"
"monitor/pkg/k8s"
"monitor/pkg/svc"
"go.uber.org/zap/zapcore"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)
@@ -17,17 +19,24 @@ var (
commit string
level zapcore.Level
conf k8s.Config
disableWebhook bool
)
func main() {
conf.HealthProbeBindAddress = "0" // Disable healthy endpoint of controller manager
conf.LeaderElectionID = "monitor.3m3m.net"
conf.LeaderElectionID = "monitor.demo.io"
// 参数解析
flag.TextVar(&level, "log-level", zapcore.InfoLevel, "logger level")
flag.StringVar(&conf.MetricsBindAddress, "metrics-address", "127.0.0.1:8080", "The address the metric endpoint binds to.")
flag.BoolVar(&conf.LeaderElection, "enable-leader-election", false, "Enable leader election for controller manager")
flag.BoolVar(&conf.PProf, "enable-pprof", false, "Enable profile debug")
flag.StringVar(&conf.CaKey.Namespace, "namespace", os.Getenv("POD_NAMESPACE"), "namespace where pod run")
flag.StringVar(&conf.CaKey.Name, "cert-ca", "monitor-ca", "cert ca secret name")
flag.StringVar(&conf.WebhookName, "cert-service-name", "monitor-webhook-service", "The service name used to generate the TLS cert's hostname")
flag.StringVar(&conf.CertDir, "cert-dir", "", "The directory where certs are stored")
flag.BoolVar(&disableWebhook, "disable-webhook", false, "diable webhook mode")
flag.Parse()
// 初始化日志格式
ctrl.SetLogger(zap.New(func(o *zap.Options) {
@@ -37,12 +46,36 @@ func main() {
}))
log := ctrl.Log.WithName("monitor")
mgr, err := k8s.NewManager(conf)
setupFinished := make(chan struct{})
mgr, err := k8s.NewManager(conf, setupFinished)
if err != nil {
log.Error(err, "new k8s manager")
os.Exit(1)
}
ev := svc.NewHandler()
if !disableWebhook {
go func() {
<-setupFinished
if err = mgr.Add(ev); err != nil {
log.Error(err, "add event handler")
os.Exit(1)
}
if err = mgr.WebHook(&corev1.Pod{}).WithDefaulter(ev).Complete(); err != nil {
log.Error(err, "setup pod webhook")
os.Exit(1)
}
}()
} else {
if err = mgr.Add(ev); err != nil {
log.Error(err, "add event handler")
os.Exit(1)
}
}
if err = mgr.Setup(controllers.NewService); err != nil {
log.Error(err, "setup service controller")
os.Exit(1)

View File

@@ -4,9 +4,14 @@ metadata:
name: controller
namespace: system
spec:
replicas: 2
replicas: 1
selector:
matchLabels:
component: controller
template:
metadata:
labels:
component: controller
annotations:
prometheus.io/port: "8080"
prometheus.io/scrape: "true"
@@ -17,6 +22,7 @@ spec:
- args:
- --metrics-address=:8080
- --enable-leader-election
- --cert-dir=/tmp/webhook
image: controller:latest
name: manager
env:

View File

@@ -13,8 +13,16 @@ resources:
- rbac/role.yaml
- rbac/leader_election_role.yaml
- rbac/role_binding.yaml
- webhook/manifests.yaml
- webhook/service.yaml
- deployment.yaml
patchesStrategicMerge:
- webhook/manager_patch.yaml
configurations:
- webhook/kustomizeconfig.yaml
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
images:

View File

@@ -2,9 +2,26 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
creationTimestamp: null
name: role
rules:
- apiGroups:
- ""
resources:
- Pods
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- secrets
verbs:
- create
- get
- list
- update
- watch
- apiGroups:
- ""
resources:
@@ -13,3 +30,36 @@ rules:
- get
- list
- watch
- apiGroups:
- admissionregistration.k8s.io
resourceNames:
- monitor-mutating-webhook-configuration
resources:
- mutatingwebhookconfigurations
verbs:
- get
- update
- apiGroups:
- admissionregistration.k8s.io
resources:
- mutatingwebhookconfigurations
- validatingwebhookconfigurations
verbs:
- list
- watch
- apiGroups:
- admissionregistration.k8s.io
resourceNames:
- monitor-validating-webhook-configuration
resources:
- validatingwebhookconfigurations
verbs:
- get
- update
- apiGroups:
- apiextensions.k8s.io
resources:
- customresourcedefinitions
verbs:
- list
- watch

View File

@@ -0,0 +1,33 @@
# the following config is for teaching kustomize where to look at when substituting vars.
# It requires kustomize v2.1.0 or newer to work properly.
nameReference:
- kind: Service
version: v1
fieldSpecs:
- kind: CustomResourceDefinition
version: v1
group: apiextensions.k8s.io
path: spec/conversion/webhook/clientConfig/service/name
- kind: MutatingWebhookConfiguration
group: admissionregistration.k8s.io
path: webhooks/clientConfig/service/name
- kind: ValidatingWebhookConfiguration
group: admissionregistration.k8s.io
path: webhooks/clientConfig/service/name
namespace:
- kind: CustomResourceDefinition
version: v1
group: apiextensions.k8s.io
path: spec/conversion/webhook/clientConfig/service/namespace
- kind: MutatingWebhookConfiguration
group: admissionregistration.k8s.io
path: webhooks/clientConfig/service/namespace
create: true
- kind: ValidatingWebhookConfiguration
group: admissionregistration.k8s.io
path: webhooks/clientConfig/service/namespace
create: true
varReference:
- path: metadata/annotations

View File

@@ -0,0 +1,22 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: controller
namespace: system
spec:
template:
metadata:
labels:
monitor.demo.io/inject: "false"
spec:
containers:
- name: manager
ports:
- name: webhook-server
containerPort: 9443
volumeMounts:
- name: webhook
mountPath: /tmp/webhook
volumes:
- name: webhook
emptyDir: {}

View File

@@ -0,0 +1,70 @@
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
creationTimestamp: null
name: mutating-webhook-configuration
webhooks:
- admissionReviewVersions:
- v1beta1
- v1
clientConfig:
service:
name: webhook-service
namespace: system
path: /mutate--v1-pod
failurePolicy: Fail
name: namespace.injector.kb.io
namespaceSelector:
matchExpressions:
- key: monitor-injection
operator: In
values:
- enabled
objectSelector:
matchExpressions:
- key: monitor.demo.io/inject
operator: NotIn
values:
- "false"
rules:
- apiGroups:
- ""
apiVersions:
- v1
operations:
- CREATE
resources:
- pods
scope: '*'
sideEffects: None
- admissionReviewVersions:
- v1beta1
- v1
clientConfig:
service:
name: webhook-service
namespace: system
path: /mutate--v1-pod
failurePolicy: Fail
name: object.injector.kb.io
namespaceSelector:
matchExpressions:
- key: monitor-injection
operator: DoesNotExist
objectSelector:
matchExpressions:
- key: monitor.demo.io/inject
operator: In
values:
- "true"
rules:
- apiGroups:
- ""
apiVersions:
- v1
operations:
- CREATE
resources:
- pods
scope: '*'
sideEffects: None

View File

@@ -0,0 +1,12 @@
apiVersion: v1
kind: Service
metadata:
name: webhook-service
namespace: system
spec:
selector:
component: controller
ports:
- port: 443
targetPort: 9443

View File

@@ -20,15 +20,11 @@ type ServiceReconciler struct {
//+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;
func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
l := ctrl.LoggerFrom(ctx)
var svc corev1.Service
err := r.client.Get(ctx, req.NamespacedName, &svc)
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
l.Info("got svc")
return ctrl.Result{}, nil
}

View File

@@ -9,23 +9,29 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
)
var log = ctrl.Log.WithName("k8s")
type Config struct {
ctrl.Options
PProf bool
PProf bool
WebhookName string
CaKey types.NamespacedName
}
type Manager struct {
mgr ctrl.Manager
}
func NewManager(conf Config) (*Manager, error) {
func NewManager(conf Config, setupFinished chan struct{}) (*Manager, error) {
restConfig, err := config.GetConfig()
if err != nil {
return nil, err
@@ -52,6 +58,25 @@ func NewManager(conf Config) (*Manager, error) {
mgr.AddMetricsExtraHandler("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
}
if conf.CertDir != "" {
// Make sure certs are generated and valid if cert rotation is enabled.
cr := &CertRotator{
CaKey: conf.CaKey,
CertDir: conf.CertDir,
DNSName: fmt.Sprintf("%s.%s.svc", conf.WebhookName, conf.CaKey.Namespace),
IsReady: setupFinished,
Webhooks: []WebhookInfo{
{
Name: "monitor-mutating-webhook-configuration",
Type: Mutating,
},
},
}
if err = AddRotator(mgr, cr); err != nil {
return nil, err
}
}
return &Manager{mgr: mgr}, nil
}
@@ -63,8 +88,8 @@ func (s *Manager) Setup(f func(manager.Manager) error) error {
return f(s.mgr)
}
func (s *Manager) WebHook(obj runtime.Object) error {
return ctrl.NewWebhookManagedBy(s.mgr).For(obj).Complete()
func (s *Manager) WebHook(obj runtime.Object) *builder.WebhookBuilder {
return ctrl.NewWebhookManagedBy(s.mgr).For(obj)
}
func (s *Manager) Start() error {

576
pkg/k8s/rotator.go Normal file
View File

@@ -0,0 +1,576 @@
package k8s
import (
"bytes"
"context"
"crypto"
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"encoding/pem"
"errors"
"fmt"
"math/big"
"net"
"os"
"path/filepath"
"sync/atomic"
"time"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
var logcr = ctrl.Log.WithName("cert-rotation")
const (
rotationCheckFrequency = 12 * time.Hour
lookaheadInterval = 90 * rotationCheckFrequency
certValidityDuration = 10 * 365 * 24 * time.Hour
bits = 2048
CACertKey = "ca.crt"
)
// WebhookType it the type of webhook, either validating/mutating webhook or a CRD conversion webhook
type WebhookType int
const (
// ValidatingWebhook indicates the webhook is a ValidatingWebhook
Validating WebhookType = iota
// MutingWebhook indicates the webhook is a MutatingWebhook
Mutating
// CRDConversionWebhook indicates the webhook is a conversion webhook
CRDConversion
)
// AddRotator adds the CertRotator and ReconcileWH to the manager.
func AddRotator(mgr manager.Manager, cr *CertRotator) error {
// add a new namespace-scoped cache.Cache to the provided manager
c, err := cache.New(mgr.GetConfig(), cache.Options{
Scheme: mgr.GetScheme(),
Mapper: mgr.GetRESTMapper(),
Namespace: cr.CaKey.Namespace,
})
if err = mgr.Add(c); err != nil {
return err
}
cr.reader = c
cr.writer = mgr.GetClient()
cr.wasCAInjected = new(atomic.Value)
cr.caNotInjected = make(chan struct{})
if err = mgr.Add(cr); err != nil {
return err
}
r := &ReconcileWH{
Client: mgr.GetClient(),
secretKey: cr.CaKey,
webhooks: cr.Webhooks,
wasCAInjected: cr.wasCAInjected,
}
// Create a new controller
reconciler, err := controller.NewUnmanaged("cert-rotator", mgr, controller.Options{Reconciler: r})
if err != nil {
return err
}
err = reconciler.Watch(source.NewKindWithCache(&corev1.Secret{}, c),
&handler.EnqueueRequestForObject{})
if err != nil {
return err
}
for _, wh := range cr.Webhooks {
obj := new(unstructured.Unstructured)
obj.SetGroupVersionKind(wh.GVK())
err = reconciler.Watch(source.NewKindWithCache(obj, c),
handler.EnqueueRequestsFromMapFunc(func(object client.Object) []reconcile.Request {
if object.GetName() != wh.Name {
return nil
}
return []reconcile.Request{{NamespacedName: r.secretKey}}
}),
)
if err != nil {
return err
}
}
return mgr.Add(&controllerWrapper{Controller: reconciler})
}
type controllerWrapper struct {
controller.Controller
needLeaderElection bool
}
func (cw controllerWrapper) NeedLeaderElection() bool {
return cw.needLeaderElection
}
var _ manager.Runnable = &CertRotator{}
// WebhookInfo is used by the certmgr to receive info about resources to be updated with certificates
type WebhookInfo struct {
//Name is the name of the webhook for a validating or mutating webhook, or the CRD name in case of a CRD conversion webhook
Name string
Type WebhookType
}
func (w WebhookInfo) GVK() schema.GroupVersionKind {
t2g := map[WebhookType]schema.GroupVersionKind{
Validating: {Group: "admissionregistration.k8s.io", Version: "v1", Kind: "ValidatingWebhookConfiguration"},
Mutating: {Group: "admissionregistration.k8s.io", Version: "v1", Kind: "MutatingWebhookConfiguration"},
CRDConversion: {Group: "apiextensions.k8s.io", Version: "v1", Kind: "CustomResourceDefinition"},
}
return t2g[w.Type]
}
// SyncingSource is a reader that needs syncing prior to being usable.
type SyncingReader interface {
client.Reader
WaitForCacheSync(ctx context.Context) bool
}
// CertRotator contains cert artifacts and a channel to close when the certs are ready.
type CertRotator struct {
reader SyncingReader
writer client.Writer
CaKey types.NamespacedName
DNSName string
CertDir string
Webhooks []WebhookInfo
IsReady chan struct{}
wasCAInjected *atomic.Value
caNotInjected chan struct{}
// RequireLeaderElection should be set to true if the CertRotator needs to
// be run in the leader election mode.
RequireLeaderElection bool
}
func (cr *CertRotator) NeedLeaderElection() bool {
return cr.RequireLeaderElection
}
// Start starts the CertRotator runnable to rotate certs and ensure the certs are ready.
func (cr *CertRotator) Start(ctx context.Context) error {
// explicitly rotate on the first round so that the certificate
// can be bootstrapped, otherwise manager exits before a cert can be written
log.Info("starting cert certmgr controller")
if !cr.reader.WaitForCacheSync(ctx) {
return errors.New("failed waiting for reader to sync")
}
if err := cr.refreshCertIfNeeded(ctx); err != nil {
log.Error(err, "could not refresh cert on startup")
return err
}
// Once the certs are ready, close the channel.
go cr.ensureReady()
ticker := time.NewTicker(rotationCheckFrequency)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := cr.refreshCertIfNeeded(ctx); err != nil {
log.Error(err, "error rotating certs")
}
case <-cr.caNotInjected:
return errors.New("could not inject certs to webhooks")
case <-ctx.Done():
log.Info("stopping cert certmgr controller")
return nil
}
}
}
// refreshCertIfNeeded returns whether there's any error when refreshing the certs if needed.
func (cr *CertRotator) refreshCertIfNeeded(ctx context.Context) error {
refreshFn := func() (bool, error) {
lookahead := time.Now().Add(lookaheadInterval)
caKey, err := cr.refreshCA(ctx, lookahead)
if err != nil {
return false, err
}
if err = cr.refreshFile(ctx, caKey, lookahead); err != nil {
return false, err
}
return true, nil
}
if err := wait.ExponentialBackoff(wait.Backoff{
Duration: 10 * time.Millisecond,
Factor: 2,
Jitter: 1,
Steps: 10,
}, refreshFn); err != nil {
return err
}
return nil
}
// refreshCA creates the self-signed CA cert and private key that will
// be used to sign the server certificate
func (cr *CertRotator) refreshCA(ctx context.Context, lookahead time.Time) (*KeyPair, error) {
var ca = new(corev1.Secret)
caFile := filepath.Join(cr.CertDir, CACertKey)
if err := cr.reader.Get(ctx, cr.CaKey, ca); err != nil {
if !apierrors.IsNotFound(err) {
return nil, fmt.Errorf("acquiring ca to update certificates %w", err)
}
ca.Name = cr.CaKey.Name
ca.Namespace = cr.CaKey.Namespace
} else if caKey, err := loadCert(ca); err == nil {
// 已存在证书 验证是否有效
if err = validCert(ca, caKey.Cert, lookahead); err == nil {
if _, err = os.Stat(caFile); os.IsNotExist(err) {
if err = os.WriteFile(caFile, ca.Data[corev1.TLSCertKey], os.ModePerm); err != nil {
log.Error(err, "could not write ca", "path", caFile)
return nil, err
}
}
return caKey, nil
}
}
log.Info("refreshing ca", "name", cr.CaKey)
if err := cr.refreshCerts(ctx, nil, ca); err != nil {
log.Error(err, "could not refresh ca", "name", cr.CaKey)
return nil, err
}
if err := os.WriteFile(caFile, ca.Data[corev1.TLSCertKey], os.ModePerm); err != nil {
log.Error(err, "could not write ca", "path", caFile)
return nil, err
}
log.Info("server ca refreshed", "name", cr.CaKey, "file", caFile)
return loadCert(ca)
}
func (cr *CertRotator) refreshFile(ctx context.Context, ca *KeyPair, lookahead time.Time) error {
certFile := filepath.Join(cr.CertDir, corev1.TLSCertKey)
keyFile := filepath.Join(cr.CertDir, corev1.TLSPrivateKeyKey)
crt, err := tls.LoadX509KeyPair(certFile, keyFile)
if err == nil {
certDer, err := x509.ParseCertificate(crt.Certificate[0])
if err != nil {
return err
}
caPool := x509.NewCertPool()
caPool.AddCert(ca.Cert)
_, err = certDer.Verify(x509.VerifyOptions{
DNSName: cr.DNSName,
Roots: caPool,
CurrentTime: lookahead,
})
if err == nil {
return nil
}
}
certBytes, key, err := CreateCertPEM(ca, cr.DNSName)
if err != nil {
return err
}
if err = os.WriteFile(certFile, certBytes, os.ModePerm); err != nil {
return err
}
if err = os.WriteFile(keyFile, key, os.ModePerm); err != nil {
return err
}
log.Info("refresh cert complete", "cert", certFile, "key", keyFile)
return nil
}
func (cr *CertRotator) refreshCerts(ctx context.Context, ca *KeyPair, s *corev1.Secret, hosts ...string) error {
cert, key, err := CreateCertPEM(ca, hosts...)
if err != nil {
return err
}
// updateSecret
s.Data = map[string][]byte{
corev1.TLSCertKey: cert,
corev1.TLSPrivateKeyKey: key,
}
if !s.CreationTimestamp.IsZero() {
return cr.writer.Update(ctx, s)
}
s.Type = corev1.SecretTypeTLS
return cr.writer.Create(ctx, s)
}
// ensureReady ensure the cert files exist and the CAs are injected.
func (cr *CertRotator) ensureReady() {
checkFn := func() (bool, error) {
certFile := cr.CertDir + "/" + corev1.TLSCertKey
_, err := os.Stat(certFile)
if err != nil {
return false, nil
}
return cr.wasCAInjected.Load() != nil, nil
}
if err := wait.ExponentialBackoff(wait.Backoff{
Duration: 1 * time.Second,
Factor: 2,
Jitter: 1,
Steps: 10,
}, checkFn); err != nil {
log.Error(err, "max retries for checking CA injection")
close(cr.caNotInjected)
return
}
log.Info("CA certs are injected to webhooks")
close(cr.IsReady)
}
// KeyPair stores cert artifacts.
type KeyPair struct {
Cert *x509.Certificate
Key crypto.PrivateKey
}
func loadCert(s *corev1.Secret) (*KeyPair, error) {
c, err := tls.X509KeyPair(s.Data[corev1.TLSCertKey], s.Data[corev1.TLSPrivateKeyKey])
if err != nil {
return nil, err
}
certDer, err := x509.ParseCertificate(c.Certificate[0])
if err != nil {
return nil, err
}
return &KeyPair{
Key: c.PrivateKey,
Cert: certDer,
}, nil
}
func validCert(s *corev1.Secret, ca *x509.Certificate, lookahead time.Time) error {
caPool := x509.NewCertPool()
caPool.AddCert(ca)
c, err := tls.X509KeyPair(s.Data[corev1.TLSCertKey], s.Data[corev1.TLSPrivateKeyKey])
if err != nil {
return err
}
certDer, err := x509.ParseCertificate(c.Certificate[0])
if err != nil {
return err
}
_, err = certDer.Verify(x509.VerifyOptions{
Roots: caPool,
CurrentTime: lookahead,
})
return err
}
// CreateCertPEM takes the results of CreateCACert and uses it to create the
// PEM-encoded public certificate and private key, respectively
func CreateCertPEM(ca *KeyPair, hosts ...string) ([]byte, []byte, error) {
privKey, err := rsa.GenerateKey(rand.Reader, bits)
if err != nil {
return nil, nil, err
}
validFrom := time.Now().Add(-time.Hour) // valid an hour earlier to avoid flakes due to clock skew
tpl := &x509.Certificate{
NotBefore: validFrom,
NotAfter: validFrom.Add(certValidityDuration),
KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment | x509.KeyUsageCertSign,
BasicConstraintsValid: true,
IsCA: true,
}
max := new(big.Int).Lsh(big.NewInt(1), 128)
if tpl.SerialNumber, err = rand.Int(rand.Reader, max); err != nil {
return nil, nil, err
}
for _, h := range hosts {
if ip := net.ParseIP(h); ip != nil {
tpl.IPAddresses = append(tpl.IPAddresses, ip)
} else {
tpl.DNSNames = append(tpl.DNSNames, h)
}
}
if ca == nil {
ca = &KeyPair{
Key: privKey,
Cert: tpl,
}
} else if len(hosts) > 0 {
tpl.Subject.CommonName = hosts[0]
}
derBytes, err := x509.CreateCertificate(rand.Reader, tpl, ca.Cert, privKey.Public(), ca.Key)
if err != nil {
return nil, nil, err
}
// pemEncode takes a certificate and encodes it as PEM
certBuf := &bytes.Buffer{}
if err := pem.Encode(certBuf, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes}); err != nil {
return nil, nil, fmt.Errorf("encoding cert %w", err)
}
keyBuf := &bytes.Buffer{}
if err := pem.Encode(keyBuf, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(privKey)}); err != nil {
return nil, nil, fmt.Errorf("encoding key %w", err)
}
return certBuf.Bytes(), keyBuf.Bytes(), nil
}
// ReconcileWH reconciles a validatingwebhookconfiguration, making sure it
// has the appropriate CA cert
type ReconcileWH struct {
client.Client
secretKey types.NamespacedName
webhooks []WebhookInfo
wasCAInjected *atomic.Value
}
// +kubebuilder:rbac:groups="",resources=secrets,verbs=create;get;list;watch;update
// +kubebuilder:rbac:groups="admissionregistration.k8s.io",resources=validatingwebhookconfigurations;mutatingwebhookconfigurations,verbs=list;watch
// +kubebuilder:rbac:groups="admissionregistration.k8s.io",resources=validatingwebhookconfigurations,resourceNames=monitor-validating-webhook-configuration,verbs=get;update
// +kubebuilder:rbac:groups="admissionregistration.k8s.io",resources=mutatingwebhookconfigurations,resourceNames=monitor-mutating-webhook-configuration,verbs=get;update
// +kubebuilder:rbac:groups="apiextensions.k8s.io",resources=customresourcedefinitions,verbs=list;watch
// Reconcile reads that state of the cluster for a validatingwebhookconfiguration
// object and makes sure the most recent CA cert is included
func (r *ReconcileWH) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
if req.NamespacedName != r.secretKey {
return ctrl.Result{}, nil
}
var secret corev1.Secret
if err := r.Get(ctx, req.NamespacedName, &secret); err != nil {
if apierrors.IsNotFound(err) {
// Object not found, return. Created objects are automatically garbage collected.
// For additional cleanup logic use finalizers.
return ctrl.Result{}, nil
}
// Error reading the object - requeue the request.
return ctrl.Result{Requeue: true}, err
}
if secret.GetDeletionTimestamp().IsZero() {
caPem, ok := secret.Data[corev1.TLSCertKey]
if !ok {
return ctrl.Result{}, nil
}
// Ensure certs on webhooks
if err := r.ensureCerts(ctx, caPem); err != nil {
return ctrl.Result{}, err
}
// Set CAInjected if the reconciler has not exited early.
r.wasCAInjected.Store(true)
}
return ctrl.Result{}, nil
}
// ensureCerts returns an arbitrary error if multiple errors are encountered,
// while all the errors are logged.
// This is important to allow the controller to reconcile the secret. If an error
// is returned, request will be requeued, and the controller will attempt to reconcile
// the secret again.
// When an error is encountered for when processing a webhook, the error is logged, but
// following webhooks are also attempted to be updated. If multiple errors occur for different
// webhooks, only the last one will be returned. This is ok, as the returned error is only meant
// to indicate that reconciliation failed. The information about all the errors is passed not
// by the returned error, but rather in the logged errors.
func (r *ReconcileWH) ensureCerts(ctx context.Context, certPem []byte) error {
encodedPem := base64.StdEncoding.EncodeToString(certPem)
for _, wh := range r.webhooks {
obj := new(unstructured.Unstructured)
obj.SetGroupVersionKind(wh.GVK())
err := r.Get(ctx, types.NamespacedName{Name: wh.Name}, obj)
if err != nil {
log.Error(err, "Webhook not found. Unable to update certificate.")
continue
}
if !obj.GetDeletionTimestamp().IsZero() {
log.Info("Webhook is being deleted. Unable to update certificate")
continue
}
log.Info("Ensuring CA cert", "name", wh.Name, "GVK", wh.GVK())
switch wh.Type {
case Validating, Mutating:
err = injectCertToWebhook(obj, encodedPem)
case CRDConversion:
err = injectCertToConversionWebhook(obj, encodedPem)
default:
return fmt.Errorf("incorrect webhook type `%v`", wh.Type)
}
if err != nil {
log.Error(err, "Unable to inject cert to webhook.")
continue
}
if err = r.Update(ctx, obj); err != nil {
log.Error(err, "Error updating webhook with certificate")
}
}
return nil
}
func injectCertToWebhook(wh *unstructured.Unstructured, certPem string) error {
webhooks, found, err := unstructured.NestedSlice(wh.Object, "webhooks")
if err != nil {
return err
}
if !found {
return fmt.Errorf("`webhooks` field not found in ValidatingWebhookConfiguration")
}
for i, h := range webhooks {
hook, ok := h.(map[string]interface{})
if !ok {
return fmt.Errorf("webhook %d is not well-formed", i)
}
if err = unstructured.SetNestedField(hook, certPem, "clientConfig", "caBundle"); err != nil {
return err
}
webhooks[i] = hook
}
return unstructured.SetNestedSlice(wh.Object, webhooks, "webhooks")
}
func injectCertToConversionWebhook(crd *unstructured.Unstructured, certPem string) error {
_, found, err := unstructured.NestedMap(crd.Object, "spec", "conversion", "webhook", "clientConfig")
if err != nil {
return err
}
if !found {
return fmt.Errorf("`clientConfig` field not found in CustomResourceDefinition `%s`", crd.GetName())
}
return unstructured.SetNestedField(crd.Object, certPem, "spec", "conversion", "webhook", "clientConfig", "caBundle")
}

39
pkg/svc/handler.go Normal file
View File

@@ -0,0 +1,39 @@
package svc
import (
"context"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
)
var log = ctrl.Log.WithName("handler")
type EventHandler struct {
update chan interface{}
}
func NewHandler() *EventHandler {
return &EventHandler{
update: make(chan interface{}, 100),
}
}
func (ev *EventHandler) Default(ctx context.Context, obj runtime.Object) error {
switch obj := obj.(type) {
case *corev1.Pod:
log.Info("pod create", "namespace", obj.Namespace, "spec", obj.Spec)
}
return nil
}
func (ev *EventHandler) Start(ctx context.Context) error {
for {
select {
case <-ctx.Done():
log.Info("EventHandler exit")
return nil
}
}
}