Raw File
// Copyright 2016-2019 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
	"context"
	"fmt"
	"io/ioutil"
	"net"
	"os"
	"path/filepath"
	"strings"
	"sync"
	"time"

	"github.com/cilium/cilium/common"
	"github.com/cilium/cilium/pkg/defaults"
	"github.com/cilium/cilium/pkg/endpoint"
	"github.com/cilium/cilium/pkg/endpointmanager"
	"github.com/cilium/cilium/pkg/identity/cache"
	"github.com/cilium/cilium/pkg/ipcache"
	"github.com/cilium/cilium/pkg/k8s"
	"github.com/cilium/cilium/pkg/labels"
	"github.com/cilium/cilium/pkg/logging/logfields"
	"github.com/cilium/cilium/pkg/maps/ctmap"
	"github.com/cilium/cilium/pkg/maps/lxcmap"
	"github.com/cilium/cilium/pkg/option"
	"github.com/cilium/cilium/pkg/policy"
	"github.com/cilium/cilium/pkg/workloads"

	"github.com/sirupsen/logrus"
	"github.com/vishvananda/netlink"
	k8serrors "k8s.io/apimachinery/pkg/api/errors"
	meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type endpointRestoreState struct {
	restored []*endpoint.Endpoint
	toClean  []*endpoint.Endpoint
}

// restoreOldEndpoints reads the list of existing endpoints previously managed
// Cilium when it was last run and associated it with container workloads. This
// function performs the first step in restoring the endpoint structure,
// allocating their existing IP out of the CIDR block and then inserting the
// endpoints into the endpoints list. It needs to be followed by a call to
// regenerateRestoredEndpoints() once the endpoint builder is ready.
//
// If clean is true, endpoints which cannot be associated with a container
// workloads are deleted.
func (d *Daemon) restoreOldEndpoints(dir string, clean bool) (*endpointRestoreState, error) {
	failed := 0
	state := &endpointRestoreState{
		restored: []*endpoint.Endpoint{},
		toClean:  []*endpoint.Endpoint{},
	}

	if !option.Config.RestoreState {
		log.Info("Endpoint restore is disabled, skipping restore step")
		return state, nil
	}

	log.Info("Restoring endpoints...")

	var (
		existingEndpoints map[string]*lxcmap.EndpointInfo
		err               error
	)

	if !option.Config.DryMode {
		existingEndpoints, err = lxcmap.DumpToMap()
		if err != nil {
			log.WithError(err).Warning("Unable to open endpoint map while restoring. Skipping cleanup of endpoint map on startup")
		}
	}

	dirFiles, err := ioutil.ReadDir(dir)
	if err != nil {
		return state, err
	}
	eptsID := endpoint.FilterEPDir(dirFiles)

	possibleEPs := readEPsFromDirNames(dir, eptsID)

	if len(possibleEPs) == 0 {
		log.Info("No old endpoints found.")
		return state, nil
	}

	for _, ep := range possibleEPs {
		scopedLog := log.WithField(logfields.EndpointID, ep.ID)
		if k8s.IsEnabled() {
			scopedLog = scopedLog.WithField("k8sPodName", ep.GetK8sNamespaceAndPodNameLocked())
		}

		skipRestore := false

		// On each restart, the health endpoint is supposed to be recreated.
		// Hence we need to clean health endpoint state unconditionally.
		if ep.HasLabels(labels.LabelHealth) {
			// Ignore health endpoint and don't report
			// it as not restored. But we need to clean up the old
			// state files, so do this now.
			healthStateDir := ep.StateDirectoryPath()
			scopedLog.WithFields(logrus.Fields{
				logfields.Path: healthStateDir,
			}).Debug("Removing old health endpoint state directory")
			if err := os.RemoveAll(healthStateDir); err != nil {
				scopedLog.WithFields(logrus.Fields{
					logfields.Path: healthStateDir,
				}).Warning("Cannot clean up old health state directory")
			}
			continue
		} else {
			if ep.K8sPodName != "" && ep.K8sNamespace != "" && k8s.IsEnabled() {
				_, err := k8s.Client().CoreV1().Pods(ep.K8sNamespace).Get(ep.K8sPodName, meta_v1.GetOptions{})
				if err != nil && k8serrors.IsNotFound(err) {
					skipRestore = true
				}
			}

			if ep.HasIpvlanDataPath() {
				// FIXME: We cannot check whether ipvlan slave netdev exists,
				// because it requires entering container netns which is not
				// always accessible (e.g. in k8s case "/proc" has to be bind
				// mounted). Instead, we check whether the tail call map exists.
				if _, err := os.Stat(ep.BPFIpvlanMapPath()); err != nil {
					scopedLog.Warningf(
						"Ipvlan tail call map %s could not be found for endpoint being restored, ignoring",
						ep.BPFIpvlanMapPath())
					skipRestore = true
				}
			} else if _, err := netlink.LinkByName(ep.IfName); err != nil {
				scopedLog.Warningf("Interface %s could not be found for endpoint being restored, ignoring", ep.IfName)
				skipRestore = true
			}

			if !skipRestore && option.Config.WorkloadsEnabled() && !workloads.IsRunning(ep) {
				scopedLog.Warning("No workload could be associated with endpoint being restored, ignoring")
				skipRestore = true
			}
		}

		if clean && skipRestore {
			failed++
			state.toClean = append(state.toClean, ep)
			continue
		}

		ep.UnconditionalLock()
		scopedLog.Debug("Restoring endpoint")
		ep.LogStatusOKLocked(endpoint.Other, "Restoring endpoint from previous cilium instance")

		if err := d.allocateIPsLocked(ep); err != nil {
			ep.Unlock()
			scopedLog.WithError(err).Error("Failed to re-allocate IP of endpoint. Not restoring endpoint.")
			state.toClean = append(state.toClean, ep)
			continue
		}

		if !option.Config.KeepConfig {
			ep.SetDefaultOpts(option.Config.Opts)
			alwaysEnforce := policy.GetPolicyEnabled() == option.AlwaysEnforce
			ep.SetDesiredIngressPolicyEnabledLocked(alwaysEnforce)
			ep.SetDesiredEgressPolicyEnabledLocked(alwaysEnforce)
		}

		ep.Unlock()

		ep.SkipStateClean()

		state.restored = append(state.restored, ep)

		if existingEndpoints != nil {
			delete(existingEndpoints, ep.IPv4.String())
			delete(existingEndpoints, ep.IPv6.String())
		}
	}

	log.WithFields(logrus.Fields{
		"restored": len(state.restored),
		"failed":   failed,
	}).Info("Endpoints restored")

	if existingEndpoints != nil {
		for hostIP, info := range existingEndpoints {
			if ip := net.ParseIP(hostIP); !info.IsHost() && ip != nil {
				if err := lxcmap.DeleteEntry(ip); err != nil {
					log.WithError(err).Warn("Unable to delete obsolete endpoint from BPF map")
				} else {
					log.Debugf("Removed outdated endpoint %d from endpoint map", info.LxcID)
				}
			}
		}
	}

	return state, nil
}

func (d *Daemon) regenerateRestoredEndpoints(state *endpointRestoreState) (restoreComplete chan struct{}) {
	restoreComplete = make(chan struct{}, 0)

	log.Infof("Regenerating %d restored endpoints", len(state.restored))

	// Before regenerating, check whether the CT map has properties that
	// match this Cilium userspace instance. If not, it must be removed
	ctmap.DeleteIfUpgradeNeeded(nil)

	// we need to signalize when the endpoints are regenerated, i.e., when
	// they have finished to rebuild after being restored.
	epRegenerated := make(chan bool, len(state.restored))

	// Insert all endpoints into the endpoint list first before starting
	// the regeneration. This is required to ensure that if an individual
	// regeneration causes an identity change of an endpoint, the new
	// identity will trigger a policy recalculation of all endpoints to
	// account for the new identity during the grace period. For this
	// purpose, all endpoints being restored must already be in the
	// endpoint list.
	for i := len(state.restored) - 1; i >= 0; i-- {
		ep := state.restored[i]
		// If the endpoint has local conntrack option enabled, then
		// check whether the CT map needs upgrading (and do so).
		if ep.Options.IsEnabled(option.ConntrackLocal) {
			ctmap.DeleteIfUpgradeNeeded(ep)
		}

		// Insert into endpoint manager so it can be regenerated when calls to
		// RegenerateAllEndpoints() are made. This must be done synchronously (i.e.,
		// not in a goroutine) because regenerateRestoredEndpoints must guarantee
		// upon returning that endpoints are exposed to other subsystems via
		// endpointmanager.

		if err := endpointmanager.Insert(ep); err != nil {
			log.WithError(err).Warning("Unable to restore endpoint")
			// remove endpoint from slice of endpoints to restore
			state.restored = append(state.restored[:i], state.restored[i+1:]...)
		}
	}

	for _, ep := range state.restored {
		go func(ep *endpoint.Endpoint, epRegenerated chan<- bool) {
			if err := ep.RLockAlive(); err != nil {
				ep.LogDisconnectedMutexAction(err, "before filtering labels during regenerating restored endpoint")
				return
			}
			scopedLog := log.WithField(logfields.EndpointID, ep.ID)
			// Filter the restored labels with the new daemon's filter
			l, _ := labels.FilterLabels(ep.OpLabels.AllLabels())
			ep.RUnlock()

			identity, _, err := cache.AllocateIdentity(context.Background(), l)
			if err != nil {
				scopedLog.WithError(err).Warn("Unable to restore endpoint")
				epRegenerated <- false
			}

			// Wait for initial identities and ipcache from the
			// kvstore before doing any policy calculation for
			// endpoints that don't have a fixed identity or are
			// not well known.
			if !identity.IsFixed() && !identity.IsWellKnown() {
				cache.WaitForInitialIdentities(context.Background())
				ipcache.WaitForInitialSync()
			}

			if err := ep.LockAlive(); err != nil {
				scopedLog.Warn("Endpoint to restore has been deleted")
				return
			}

			ep.SetStateLocked(endpoint.StateRestoring, "Synchronizing endpoint labels with KVStore")

			if ep.SecurityIdentity != nil {
				if oldSecID := ep.SecurityIdentity.ID; identity.ID != oldSecID {
					log.WithFields(logrus.Fields{
						logfields.EndpointID:              ep.ID,
						logfields.IdentityLabels + ".old": oldSecID,
						logfields.IdentityLabels + ".new": identity.ID,
					}).Info("Security identity for endpoint is different from the security identity restored for the endpoint")

					// The identity of the endpoint being
					// restored has changed. This can be
					// caused by two main reasons:
					//
					// 1) Cilium has been upgraded,
					// downgraded or the configuration has
					// changed and the new version or
					// configuration causes different
					// labels to be considered security
					// relevant for this endpoint.
					//
					// Immediately using the identity may
					// cause connectivity problems if this
					// is the first endpoint in the cluster
					// to use the new identity. All other
					// nodes will not have had a chance to
					// adjust the security policies for
					// their endpoints. Hence, apply a
					// grace period to allow for the
					// update. It is not required to check
					// any local endpoints for potential
					// outdated security rules, the
					// notification of the new security
					// identity will have been received and
					// will trigger the necessary
					// recalculation of all local
					// endpoints.
					//
					// 2) The identity is outdated as the
					// state in the kvstore has changed.
					// This reason would justify an
					// immediate use of the new identity
					// but given the current identity is
					// already in place, it is also correct
					// to continue using it for the
					// duration of a grace period.
					time.Sleep(defaults.IdentityChangeGracePeriod)
				}
			}
			// The identity of a freshly restored endpoint is incomplete due to some
			// parts of the identity not being marshaled to JSON. Hence we must set
			// the identity even if has not changed.
			ep.SetIdentity(identity)

			// We don't need to hold the policy repository mutex here because
			// the content of the rules themselves are not being changed.
			d.policy.UpdateLocalConsumers([]policy.Endpoint{ep}).Wait()

			if ep.GetStateLocked() == endpoint.StateWaitingToRegenerate {
				ep.Unlock()
				// EP is already waiting to regenerate. This is no error so no logging.
				epRegenerated <- false
				return
			}

			ready := ep.SetStateLocked(endpoint.StateWaitingToRegenerate, "Triggering synchronous endpoint regeneration while syncing state to host")
			ep.Unlock()

			if !ready {
				scopedLog.WithField(logfields.EndpointState, ep.GetState()).Warn("Endpoint in inconsistent state")
				epRegenerated <- false
				return
			}
			regenerationMetadata := &endpoint.ExternalRegenerationMetadata{
				Reason: "syncing state to host",
			}
			if buildSuccess := <-ep.Regenerate(d, regenerationMetadata); !buildSuccess {
				scopedLog.Warn("Failed while regenerating endpoint")
				epRegenerated <- false
				return
			}

			// NOTE: UnconditionalRLock is used here because it's used only for logging an already restored endpoint
			ep.UnconditionalRLock()
			scopedLog.WithField(logfields.IPAddr, []string{ep.IPv4.String(), ep.IPv6.String()}).Info("Restored endpoint")
			ep.RUnlock()
			epRegenerated <- true
		}(ep, epRegenerated)
	}

	var endpointCleanupCompleted sync.WaitGroup
	for _, ep := range state.toClean {
		endpointCleanupCompleted.Add(1)
		go func(ep *endpoint.Endpoint) {
			// The IP was not allocated yet so does not need to be free.
			// The identity may be allocated in the kvstore but we can't
			// release it easily as it will require to block on kvstore
			// connectivity which we can't do at this point. Let the lease
			// expire to release the identity.
			d.deleteEndpointQuiet(ep, endpoint.DeleteConfig{
				NoIdentityRelease: true,
				NoIPRelease:       true,
			})
			endpointCleanupCompleted.Done()
		}(ep)
	}
	endpointCleanupCompleted.Wait()

	go func() {
		regenerated, total := 0, 0
		if len(state.restored) > 0 {
			for buildSuccess := range epRegenerated {
				if buildSuccess {
					regenerated++
				}
				total++
				if total >= len(state.restored) {
					break
				}
			}
		}
		close(epRegenerated)

		log.WithFields(logrus.Fields{
			"regenerated": regenerated,
			"total":       total,
		}).Info("Finished regenerating restored endpoints")
		close(restoreComplete)
	}()

	return
}

func (d *Daemon) allocateIPsLocked(ep *endpoint.Endpoint) error {
	var err error

	if option.Config.EnableIPv6 && ep.IPv6 != nil {
		err = d.ipam.AllocateIP(ep.IPv6.IP(), ep.HumanStringLocked()+" [restored]")
		if err != nil {
			return fmt.Errorf("unable to reallocate IPv6 address: %s", err)
		}

		defer func() {
			if err != nil {
				d.ipam.ReleaseIP(ep.IPv6.IP())
			}
		}()
	}

	if option.Config.EnableIPv4 && ep.IPv4 != nil {
		if err = d.ipam.AllocateIP(ep.IPv4.IP(), ep.HumanStringLocked()+" [restored]"); err != nil {
			return fmt.Errorf("unable to reallocate IPv4 address: %s", err)
		}
	}

	return nil
}

// readEPsFromDirNames returns a mapping of endpoint ID to endpoint of endpoints
// from a list of directory names that can possible contain an endpoint.
func readEPsFromDirNames(basePath string, eptsDirNames []string) map[uint16]*endpoint.Endpoint {
	possibleEPs := map[uint16]*endpoint.Endpoint{}
	for _, epDirName := range eptsDirNames {
		epDir := filepath.Join(basePath, epDirName)
		readDir := func() string {
			scopedLog := log.WithFields(logrus.Fields{
				logfields.EndpointID: epDirName,
				logfields.Path:       filepath.Join(epDir, common.CHeaderFileName),
			})
			scopedLog.Debug("Reading directory")
			epFiles, err := ioutil.ReadDir(epDir)
			if err != nil {
				scopedLog.WithError(err).Warn("Error while reading directory. Ignoring it...")
				return ""
			}
			cHeaderFile := common.FindEPConfigCHeader(epDir, epFiles)
			if cHeaderFile == "" {
				return ""
			}
			return cHeaderFile
		}
		// There's an odd issue where the first read dir doesn't work.
		cHeaderFile := readDir()
		if cHeaderFile == "" {
			cHeaderFile = readDir()
		}

		scopedLog := log.WithFields(logrus.Fields{
			logfields.EndpointID: epDirName,
			logfields.Path:       cHeaderFile,
		})

		if cHeaderFile == "" {
			scopedLog.Warning("C header file not found. Ignoring endpoint")
			continue
		}

		scopedLog.Debug("Found endpoint C header file")

		strEp, err := common.GetCiliumVersionString(cHeaderFile)
		if err != nil {
			scopedLog.WithError(err).Warn("Unable to read the C header file")
			continue
		}
		ep, err := endpoint.ParseEndpoint(strEp)
		if err != nil {
			scopedLog.WithError(err).Warn("Unable to parse the C header file")
			continue
		}
		if _, ok := possibleEPs[ep.ID]; ok {
			// If the endpoint already exists then give priority to the directory
			// that contains an endpoint that didn't fail to be build.
			if strings.HasSuffix(ep.DirectoryPath(), epDirName) {
				possibleEPs[ep.ID] = ep
			}
		} else {
			possibleEPs[ep.ID] = ep
		}
	}
	return possibleEPs
}
back to top