https://github.com/cilium/cilium
Tip revision: 46ce2a66fabd94d240a30a4f7b6616f19de7c2ae authored by Joe Stringer on 15 December 2022, 23:56:23 UTC
Prepare for release v1.10.18
Prepare for release v1.10.18
Tip revision: 46ce2a6
k8s_cep_gc.go
// Copyright 2016-2020 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"time"
"github.com/sirupsen/logrus"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/cilium/cilium/operator/metrics"
operatorOption "github.com/cilium/cilium/operator/option"
"github.com/cilium/cilium/operator/watchers"
"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/k8s"
cilium_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
k8sUtils "github.com/cilium/cilium/pkg/k8s/utils"
"github.com/cilium/cilium/pkg/logging/logfields"
)
// enableCiliumEndpointSyncGC starts the node-singleton sweeper for
// CiliumEndpoint objects where the managing node is no longer running. These
// objects are created by the sync-to-k8s-ciliumendpoint controller on each
// Endpoint.
// The general steps are:
// - list all CEPs in the cluster
// - for each CEP
// delete CEP if the corresponding pod does not exist
// CiliumEndpoint objects have the same name as the pod they represent
func enableCiliumEndpointSyncGC(once bool) {
var (
controllerName = "to-k8s-ciliumendpoint-gc"
scopedLog = log.WithField("controller", controllerName)
gcInterval time.Duration
stopCh = make(chan struct{})
)
ciliumClient := ciliumK8sClient.CiliumV2()
if once {
log.Info("Running the garbage collector only once to clean up leftover CiliumEndpoint custom resources")
gcInterval = 0
} else {
log.Info("Starting to garbage collect stale CiliumEndpoint custom resources")
gcInterval = operatorOption.Config.EndpointGCInterval
}
// This functions will block until the resources are synced with k8s.
watchers.CiliumEndpointsInit(ciliumClient, stopCh)
if !once {
// If we are running this function "once" it means that we
// will delete all CEPs in the cluster regardless of the pod
// state.
watchers.PodsInit(k8s.WatcherClient(), stopCh)
}
<-k8sCiliumNodesCacheSynced
// this dummy manager is needed only to add this controller to the global list
controller.NewManager().UpdateController(controllerName,
controller.ControllerParams{
RunInterval: gcInterval,
DoFunc: func(ctx context.Context) error {
return doCiliumEndpointSyncGC(ctx, once, stopCh, scopedLog)
},
})
}
func doCiliumEndpointSyncGC(ctx context.Context, once bool, stopCh chan struct{}, scopedLog *logrus.Entry) error {
ciliumClient := ciliumK8sClient.CiliumV2()
// For each CEP we fetched, check if we know about it
for _, cepObj := range watchers.CiliumEndpointStore.List() {
cep, ok := cepObj.(*cilium_v2.CiliumEndpoint)
if !ok {
log.WithField(logfields.Object, cepObj).
Errorf("Saw %T object while expecting *cilium_v2.CiliumEndpoint", cepObj)
continue
}
cepFullName := cep.Namespace + "/" + cep.Name
scopedLog = scopedLog.WithFields(logrus.Fields{
logfields.K8sPodName: cepFullName,
})
// If we are running this function "once" it means that we
// will delete all CEPs in the cluster regardless of the pod
// state therefore we won't even watch for the pod store.
if !once {
var podObj interface{}
var err error
exists := false
podChecked := false
for _, owner := range cep.ObjectMeta.OwnerReferences {
switch owner.Kind {
case "Pod":
podObj, exists, err = watchers.PodStore.GetByKey(cepFullName)
if err != nil {
scopedLog.WithError(err).Warn("Unable to get pod from store")
}
podChecked = true
case "CiliumNode":
podObj, exists, err = ciliumNodeStore.GetByKey(owner.Name)
if err != nil {
scopedLog.WithError(err).Warn("Unable to get CiliumNode from store")
}
}
// Stop looking when an existing owner has been found
if exists {
break
}
}
if !exists && !podChecked {
// Check for a Pod in case none of the owners existed
// This keeps the old behavior even if OwnerReferences are missing
podObj, exists, err = watchers.PodStore.GetByKey(cepFullName)
if err != nil {
scopedLog.WithError(err).Warn("Unable to get pod from store")
}
}
if exists {
switch pod := podObj.(type) {
case *cilium_v2.CiliumNode:
continue
case *slim_corev1.Pod:
// In Kubernetes Jobs, Pods can be left in Kubernetes until the Job
// is deleted. If the Job is never deleted, Cilium will never receive a Pod
// delete event, causing the IP to be left in the ipcache.
// For this reason we should delete the ipcache entries whenever the pod
// status is either PodFailed or PodSucceeded as it means the IP address
// is no longer in use.
if k8sUtils.IsPodRunning(pod.Status) {
continue
}
default:
log.WithField(logfields.Object, podObj).
Errorf("Saw %T object while expecting *slim_corev1.Pod or *cilium_v2.CiliumNode", podObj)
continue
}
}
}
// FIXME: this is fragile as we might have received the
// CEP notification first but not the pod notification
// so we need to have a similar mechanism that we have
// for the keep alive of security identities.
scopedLog = scopedLog.WithFields(logrus.Fields{
logfields.EndpointID: cep.Status.ID,
})
scopedLog.Debug("Orphaned CiliumEndpoint is being garbage collected")
PropagationPolicy := meta_v1.DeletePropagationBackground // because these are const strings but the API wants pointers
err := ciliumClient.CiliumEndpoints(cep.Namespace).Delete(
ctx,
cep.Name,
meta_v1.DeleteOptions{
PropagationPolicy: &PropagationPolicy,
// Set precondition to ensure we are only deleting CEPs owned by
// this agent.
Preconditions: &meta_v1.Preconditions{
UID: &cep.UID,
},
})
switch {
case err == nil:
successfulEndpointObjectGC()
case k8serrors.IsNotFound(err), k8serrors.IsConflict(err):
scopedLog.WithError(err).Debug("Unable to delete CEP, will retry again")
default:
scopedLog.WithError(err).Warning("Unable to delete orphaned CEP")
failedEndpointObjectGC()
return err
}
}
// We have cleaned up all CEPs from Kubernetes so we can stop
// the k8s watchers.
if once {
close(stopCh)
}
return nil
}
func successfulEndpointObjectGC() {
if operatorOption.Config.EnableMetrics {
metrics.EndpointGCObjects.WithLabelValues(metrics.LabelValueOutcomeSuccess).Inc()
}
}
func failedEndpointObjectGC() {
if operatorOption.Config.EnableMetrics {
metrics.EndpointGCObjects.WithLabelValues(metrics.LabelValueOutcomeFail).Inc()
}
}