https://github.com/cilium/cilium
Raw File
Tip revision: d37240f239e574a5da906a53ab786c69a6aa9f44 authored by Thomas Graf on 01 July 2019, 16:00:28 UTC
Release 1.4.5
Tip revision: d37240f
k8s_cep_gc.go
// Copyright 2016-2018 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
	"time"

	"github.com/cilium/cilium/pkg/controller"
	"github.com/cilium/cilium/pkg/k8s"
	"github.com/cilium/cilium/pkg/logging/logfields"
	"github.com/cilium/cilium/pkg/versioncheck"

	"github.com/sirupsen/logrus"
	core_v1 "k8s.io/api/core/v1"
	k8serrors "k8s.io/apimachinery/pkg/api/errors"
	meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// ciliumEndpointGCInterval is the interval between attempts of the CEP GC
// controller.
// Note that only one node per cluster should run this, and most iterations
// will simply return.
const ciliumEndpointGCInterval = 30 * time.Minute

var (
	// ciliumEPControllerLimit is the range of k8s versions with which we are
	// willing to run the EndpointCRD controllers
	ciliumEPControllerLimit = versioncheck.MustCompile("> 1.6")
)

// enableCiliumEndpointSyncGC starts the node-singleton sweeper for
// CiliumEndpoint objects where the managing node is no longer running. These
// objects are created by the sync-to-k8s-ciliumendpoint controller on each
// Endpoint.
// The general steps are:
//   - get list of nodes
//   - only run with probability 1/nodes
//   - get list of CEPs
//   - for each CEP
//       delete CEP if the corresponding pod does not exist
// CiliumEndpoint objects have the same name as the pod they represent
func enableCiliumEndpointSyncGC() {
	var (
		controllerName = "to-k8s-ciliumendpoint-gc"
		scopedLog      = log.WithField("controller", controllerName)
	)

	sv, err := k8s.GetServerVersion()
	if err != nil {
		scopedLog.WithError(err).Error("unable to retrieve kubernetes serverversion")
		return
	}
	if !ciliumEPControllerLimit.Check(sv) {
		scopedLog.WithFields(logrus.Fields{
			"expected": sv,
			"found":    ciliumEPControllerLimit,
		}).Warn("cannot run with this k8s version")
		return
	}

	ciliumClient := ciliumK8sClient.CiliumV2()

	// this dummy manager is needed only to add this controller to the global list
	controller.NewManager().UpdateController(controllerName,
		controller.ControllerParams{
			RunInterval: ciliumEndpointGCInterval,
			DoFunc: func() error {
				var (
					listOpts = meta_v1.ListOptions{Limit: 10}
					loopStop = time.Now().Add(ciliumEndpointGCInterval)
				)

				pods, err := k8s.Client().CoreV1().Pods("").List(meta_v1.ListOptions{})
				if err != nil {
					return err
				}

				podsCache := map[string]*core_v1.Pod{}
				for _, pod := range pods.Items {
					podsCache[pod.Namespace+"/"+pod.Name] = &pod
				}

			perCEPFetch:
				for time.Now().Before(loopStop) { // Guard against no-break bugs
					time.Sleep(time.Second) // Throttle lookups in case of a busy loop

					ceps, err := ciliumClient.CiliumEndpoints(meta_v1.NamespaceAll).List(listOpts)
					switch {
					case err != nil && k8serrors.IsResourceExpired(err) && ceps.Continue != "":
						// This combination means we saw a 410 ResourceExpired error but we
						// can iterate on the now-current snapshot. We need to refetch,
						// however.
						// See https://github.com/kubernetes/apimachinery/blob/master/pkg/apis/meta/v1/types.go#L350-L381
						// or the docs for k8s.io/apimachinery/pkg/apis/meta/v1.ListOptions
						// vendored into this repo.
						listOpts.Continue = ceps.Continue
						continue perCEPFetch

					case err != nil:
						scopedLog.WithError(err).Debug("Cannot list CEPs")
						return err
					}

					// setup listOpts for the next iteration
					listOpts.Continue = ceps.Continue

					// For each CEP we fetched, check if we know about it
					for _, cep := range ceps.Items {
						cepFullName := cep.Namespace + "/" + cep.Name
						_, exists := podsCache[cepFullName]
						if !exists {
							// delete
							scopedLog = scopedLog.WithFields(logrus.Fields{
								logfields.EndpointID: cep.Status.ID,
								logfields.K8sPodName: cepFullName,
							})
							scopedLog.Debug("Orphaned CiliumEndpoint is being garbage collected")
							PropagationPolicy := meta_v1.DeletePropagationBackground // because these are const strings but the API wants pointers
							if err := ciliumClient.CiliumEndpoints(cep.Namespace).Delete(cep.Name, &meta_v1.DeleteOptions{PropagationPolicy: &PropagationPolicy}); err != nil {
								scopedLog.WithError(err).Debug("Unable to delete orphaned CEP")
								return err
							}
						}
					}
					if ceps.Continue != "" {
						// there is more data, continue
						continue perCEPFetch
					}
					break perCEPFetch // break out as a safe default to avoid spammy loops
				}
				return nil
			},
		})
}
back to top