https://github.com/cilium/cilium
Raw File
Tip revision: 0338ca7a759b2080f2580048b7af167364b3184b authored by André Martins on 26 March 2022, 00:51:32 UTC
Prepare for release v1.10.9
Tip revision: 0338ca7
k8s_cep_gc.go
// Copyright 2016-2020 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
	"context"
	"time"

	"github.com/sirupsen/logrus"
	k8serrors "k8s.io/apimachinery/pkg/api/errors"
	meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

	"github.com/cilium/cilium/operator/metrics"
	operatorOption "github.com/cilium/cilium/operator/option"
	"github.com/cilium/cilium/operator/watchers"
	"github.com/cilium/cilium/pkg/controller"
	"github.com/cilium/cilium/pkg/k8s"
	cilium_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
	slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
	k8sUtils "github.com/cilium/cilium/pkg/k8s/utils"
	"github.com/cilium/cilium/pkg/logging/logfields"
)

// enableCiliumEndpointSyncGC starts the node-singleton sweeper for
// CiliumEndpoint objects where the managing node is no longer running. These
// objects are created by the sync-to-k8s-ciliumendpoint controller on each
// Endpoint.
// The general steps are:
//   - list all CEPs in the cluster
//   - for each CEP
//       delete CEP if the corresponding pod does not exist
// CiliumEndpoint objects have the same name as the pod they represent
func enableCiliumEndpointSyncGC(once bool) {
	var (
		controllerName = "to-k8s-ciliumendpoint-gc"
		scopedLog      = log.WithField("controller", controllerName)
		gcInterval     time.Duration
		stopCh         = make(chan struct{})
	)

	ciliumClient := ciliumK8sClient.CiliumV2()

	if once {
		log.Info("Running the garbage collector only once to clean up leftover CiliumEndpoint custom resources")
		gcInterval = 0
	} else {
		log.Info("Starting to garbage collect stale CiliumEndpoint custom resources")
		gcInterval = operatorOption.Config.EndpointGCInterval
	}

	// This functions will block until the resources are synced with k8s.
	watchers.CiliumEndpointsInit(ciliumClient, stopCh)
	if !once {
		// If we are running this function "once" it means that we
		// will delete all CEPs in the cluster regardless of the pod
		// state.
		watchers.PodsInit(k8s.WatcherClient(), stopCh)
	}
	<-k8sCiliumNodesCacheSynced

	// this dummy manager is needed only to add this controller to the global list
	controller.NewManager().UpdateController(controllerName,
		controller.ControllerParams{
			RunInterval: gcInterval,
			DoFunc: func(ctx context.Context) error {
				return doCiliumEndpointSyncGC(ctx, once, stopCh, scopedLog)
			},
		})
}

func doCiliumEndpointSyncGC(ctx context.Context, once bool, stopCh chan struct{}, scopedLog *logrus.Entry) error {
	ciliumClient := ciliumK8sClient.CiliumV2()
	// For each CEP we fetched, check if we know about it
	for _, cepObj := range watchers.CiliumEndpointStore.List() {
		cep, ok := cepObj.(*cilium_v2.CiliumEndpoint)
		if !ok {
			log.WithField(logfields.Object, cepObj).
				Errorf("Saw %T object while expecting *cilium_v2.CiliumEndpoint", cepObj)
			continue
		}
		cepFullName := cep.Namespace + "/" + cep.Name
		scopedLog = scopedLog.WithFields(logrus.Fields{
			logfields.K8sPodName: cepFullName,
		})

		// If we are running this function "once" it means that we
		// will delete all CEPs in the cluster regardless of the pod
		// state therefore we won't even watch for the pod store.
		if !once {
			var podObj interface{}
			var err error
			exists := false
			podChecked := false
			for _, owner := range cep.ObjectMeta.OwnerReferences {
				switch owner.Kind {
				case "Pod":
					podObj, exists, err = watchers.PodStore.GetByKey(cepFullName)
					if err != nil {
						scopedLog.WithError(err).Warn("Unable to get pod from store")
					}
					podChecked = true
				case "CiliumNode":
					podObj, exists, err = ciliumNodeStore.GetByKey(owner.Name)
					if err != nil {
						scopedLog.WithError(err).Warn("Unable to get CiliumNode from store")
					}
				}
				// Stop looking when an existing owner has been found
				if exists {
					break
				}
			}
			if !exists && !podChecked {
				// Check for a Pod in case none of the owners existed
				// This keeps the old behavior even if OwnerReferences are missing
				podObj, exists, err = watchers.PodStore.GetByKey(cepFullName)
				if err != nil {
					scopedLog.WithError(err).Warn("Unable to get pod from store")
				}
			}
			if exists {
				switch pod := podObj.(type) {
				case *cilium_v2.CiliumNode:
					continue
				case *slim_corev1.Pod:
					// In Kubernetes Jobs, Pods can be left in Kubernetes until the Job
					// is deleted. If the Job is never deleted, Cilium will never receive a Pod
					// delete event, causing the IP to be left in the ipcache.
					// For this reason we should delete the ipcache entries whenever the pod
					// status is either PodFailed or PodSucceeded as it means the IP address
					// is no longer in use.
					if k8sUtils.IsPodRunning(pod.Status) {
						continue
					}
				default:
					log.WithField(logfields.Object, podObj).
						Errorf("Saw %T object while expecting *slim_corev1.Pod or *cilium_v2.CiliumNode", podObj)
					continue
				}
			}
		}
		// FIXME: this is fragile as we might have received the
		// CEP notification first but not the pod notification
		// so we need to have a similar mechanism that we have
		// for the keep alive of security identities.
		scopedLog = scopedLog.WithFields(logrus.Fields{
			logfields.EndpointID: cep.Status.ID,
		})
		scopedLog.Debug("Orphaned CiliumEndpoint is being garbage collected")
		PropagationPolicy := meta_v1.DeletePropagationBackground // because these are const strings but the API wants pointers
		err := ciliumClient.CiliumEndpoints(cep.Namespace).Delete(
			ctx,
			cep.Name,
			meta_v1.DeleteOptions{
				PropagationPolicy: &PropagationPolicy,
				// Set precondition to ensure we are only deleting CEPs owned by
				// this agent.
				Preconditions: &meta_v1.Preconditions{
					UID: &cep.UID,
				},
			})
		switch {
		case err == nil:
			successfulEndpointObjectGC()
		case k8serrors.IsNotFound(err), k8serrors.IsConflict(err):
			// No-op.
		default:
			scopedLog.WithError(err).Warning("Unable to delete orphaned CEP")
			failedEndpointObjectGC()
			return err
		}
	}
	// We have cleaned up all CEPs from Kubernetes so we can stop
	// the k8s watchers.
	if once {
		close(stopCh)
	}
	return nil
}

func successfulEndpointObjectGC() {
	if operatorOption.Config.EnableMetrics {
		metrics.EndpointGCObjects.WithLabelValues(metrics.LabelValueOutcomeSuccess).Inc()
	}
}

func failedEndpointObjectGC() {
	if operatorOption.Config.EnableMetrics {
		metrics.EndpointGCObjects.WithLabelValues(metrics.LabelValueOutcomeFail).Inc()
	}
}
back to top