https://github.com/cilium/cilium
Raw File
Tip revision: 595549347fff37d22982a587003d05f8862b30da authored by Daniel Borkmann on 05 October 2020, 15:23:12 UTC
test
Tip revision: 5955493
k8s_cep_gc.go
// Copyright 2016-2020 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
	"context"
	"time"

	operatorOption "github.com/cilium/cilium/operator/option"
	"github.com/cilium/cilium/operator/watchers"
	"github.com/cilium/cilium/pkg/controller"
	"github.com/cilium/cilium/pkg/k8s"
	cilium_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
	slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/core/v1"
	k8sUtils "github.com/cilium/cilium/pkg/k8s/utils"
	"github.com/cilium/cilium/pkg/logging/logfields"

	"github.com/sirupsen/logrus"
	k8serrors "k8s.io/apimachinery/pkg/api/errors"
	meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// enableCiliumEndpointSyncGC starts the node-singleton sweeper for
// CiliumEndpoint objects where the managing node is no longer running. These
// objects are created by the sync-to-k8s-ciliumendpoint controller on each
// Endpoint.
// The general steps are:
//   - list all CEPs in the cluster
//   - for each CEP
//       delete CEP if the corresponding pod does not exist
// CiliumEndpoint objects have the same name as the pod they represent
func enableCiliumEndpointSyncGC(once bool) {
	var (
		controllerName = "to-k8s-ciliumendpoint-gc"
		scopedLog      = log.WithField("controller", controllerName)
		gcInterval     time.Duration
		stopCh         = make(chan struct{})
	)

	ciliumClient := ciliumK8sClient.CiliumV2()

	if once {
		log.Info("Running the garbage collector only once to clean up leftover CiliumEndpoint custom resources...")
		gcInterval = 0
	} else {
		log.Info("Starting to garbage collect stale CiliumEndpoint custom resources...")
		gcInterval = operatorOption.Config.EndpointGCInterval
	}

	// This functions will block until the resources are synced with k8s.
	watchers.CiliumEndpointsInit(ciliumClient, stopCh)
	if !once {
		// If we are running this function "once" it means that we
		// will delete all CEPs in the cluster regardless of the pod
		// state.
		watchers.PodsInit(k8s.WatcherCli(), stopCh)
	}

	// this dummy manager is needed only to add this controller to the global list
	controller.NewManager().UpdateController(controllerName,
		controller.ControllerParams{
			RunInterval: gcInterval,
			DoFunc: func(ctx context.Context) error {
				// For each CEP we fetched, check if we know about it
				for _, cepObj := range watchers.CiliumEndpointStore.List() {
					cep, ok := cepObj.(*cilium_v2.CiliumEndpoint)
					if !ok {
						log.WithField(logfields.Object, cepObj).
							Errorf("Saw %T object while expecting *cilium_v2.CiliumEndpoint", cepObj)
						continue
					}
					cepFullName := cep.Namespace + "/" + cep.Name
					scopedLog = scopedLog.WithFields(logrus.Fields{
						logfields.K8sPodName: cepFullName,
					})

					// If we are running this function "once" it means that we
					// will delete all CEPs in the cluster regardless of the pod
					// state therefore we won't even watch for the pod store.
					if !once {
						podObj, exists, err := watchers.PodStore.GetByKey(cepFullName)
						if err != nil {
							scopedLog.WithError(err).Warn("Unable to get pod from store")
							continue
						}
						if exists {
							pod := podObj.(*slim_corev1.Pod)
							if !ok {
								log.WithField(logfields.Object, podObj).
									Errorf("Saw %T object while expecting *slim_corev1.Pod", podObj)
								continue
							}
							// In Kubernetes Jobs, Pods can be left in Kubernetes until the Job
							// is deleted. If the Job is never deleted, Cilium will never receive a Pod
							// delete event, causing the IP to be left in the ipcache.
							// For this reason we should delete the ipcache entries whenever the pod
							// status is either PodFailed or PodSucceeded as it means the IP address
							// is no longer in use.
							if k8sUtils.IsPodRunning(pod.Status) {
								continue
							}
						}
					}
					// FIXME: this is fragile as we might have received the
					// CEP notification first but not the pod notification
					// so we need to have a similar mechanism that we have
					// for the keep alive of security identities.
					scopedLog = scopedLog.WithFields(logrus.Fields{
						logfields.EndpointID: cep.Status.ID,
					})
					scopedLog.Debug("Orphaned CiliumEndpoint is being garbage collected")
					PropagationPolicy := meta_v1.DeletePropagationBackground // because these are const strings but the API wants pointers
					err := ciliumClient.CiliumEndpoints(cep.Namespace).Delete(
						ctx,
						cep.Name,
						meta_v1.DeleteOptions{PropagationPolicy: &PropagationPolicy})
					if !k8serrors.IsNotFound(err) {
						scopedLog.WithError(err).Warning("Unable to delete orphaned CEP")
						return err
					}
				}
				// We have cleaned up all CEPs from Kubernetes so we can stop
				// the k8s watchers.
				if once {
					close(stopCh)
				}
				return nil
			},
		})
}
back to top