cilium_node.go
// SPDX-License-Identifier: Apache-2.0
// Copyright 2019-2021 Authors of Cilium
package main
import (
"context"
"encoding/json"
"strings"
"sync"
operatorOption "github.com/cilium/cilium/operator/option"
"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/ipam/allocator"
"github.com/cilium/cilium/pkg/k8s"
cilium_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
v2 "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned/typed/cilium.io/v2"
"github.com/cilium/cilium/pkg/k8s/informer"
v1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/meta/v1"
"github.com/cilium/cilium/pkg/k8s/utils"
"github.com/cilium/cilium/pkg/kvstore/store"
nodeStore "github.com/cilium/cilium/pkg/node/store"
nodeTypes "github.com/cilium/cilium/pkg/node/types"
"github.com/cilium/cilium/pkg/option"
"golang.org/x/time/rate"
core_v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
// ciliumNodeName is only used to implement NamedKey interface.
type ciliumNodeName struct {
cluster string
name string
}
func (c *ciliumNodeName) GetKeyName() string {
return nodeTypes.GetKeyNodeName(c.cluster, c.name)
}
var (
// ciliumNodeStore contains all CiliumNodes present in k8s.
ciliumNodeStore cache.Store
k8sCiliumNodesCacheSynced = make(chan struct{})
ciliumNodeManagerQueueSynced = make(chan struct{})
)
type ciliumNodeManagerQueueSyncedKey struct{}
func startSynchronizingCiliumNodes(ctx context.Context, nodeManager allocator.NodeEventHandler, withKVStore bool) error {
var (
ciliumNodeKVStore *store.SharedStore
err error
nodeManagerSyncHandler func(key string) error
kvStoreSyncHandler func(key string) error
connectedToKVStore = make(chan struct{})
resourceEventHandler = cache.ResourceEventHandlerFuncs{}
ciliumNodeConvertFunc = k8s.ConvertToCiliumNode
ciliumNodeManagerQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
kvStoreQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
)
// KVStore is enabled -> we will run the event handler to sync objects into
// KVStore.
if withKVStore {
// Connect to the KVStore asynchronously so that we are able to start
// the operator without relying on the KVStore to be up.
// Start a go routine to GC all CiliumNodes from the KVStore that are
// no longer running.
go func() {
log.Info("Starting to synchronize CiliumNode custom resources to KVStore")
ciliumNodeKVStore, err = store.JoinSharedStore(store.Configuration{
Prefix: nodeStore.NodeStorePrefix,
KeyCreator: nodeStore.KeyCreator,
})
if err != nil {
log.WithError(err).Fatal("Unable to setup node watcher")
}
close(connectedToKVStore)
<-k8sCiliumNodesCacheSynced
// Since we processed all events received from k8s we know that
// at this point the list in ciliumNodeStore should be the source of
// truth and we need to delete all nodes in the kvNodeStore that are
// *not* present in the ciliumNodeStore.
listOfCiliumNodes := ciliumNodeStore.ListKeys()
kvStoreNodes := ciliumNodeKVStore.SharedKeysMap()
for _, ciliumNode := range listOfCiliumNodes {
// The remaining kvStoreNodes are leftovers that need to be GCed
kvStoreNodeName := nodeTypes.GetKeyNodeName(option.Config.ClusterName, ciliumNode)
delete(kvStoreNodes, kvStoreNodeName)
}
if len(listOfCiliumNodes) == 0 && len(kvStoreNodes) != 0 {
log.Warn("Preventing GC of nodes in the KVStore due the nonexistence of any CiliumNodes in kube-apiserver")
return
}
for _, kvStoreNode := range kvStoreNodes {
// Only delete the nodes that belong to our cluster
if strings.HasPrefix(kvStoreNode.GetKeyName(), option.Config.ClusterName) {
ciliumNodeKVStore.DeleteLocalKey(ctx, kvStoreNode)
}
}
}()
} else {
log.Info("Starting to synchronize CiliumNode custom resources")
}
if nodeManager != nil {
nodeManagerSyncHandler = syncHandlerConstructor(
func(name string) {
nodeManager.Delete(name)
},
func(node *cilium_v2.CiliumNode) {
// node is deep copied before it is stored in pkg/aws/eni
nodeManager.Update(node)
})
}
if withKVStore {
kvStoreSyncHandler = syncHandlerConstructor(
func(name string) {
nodeDel := ciliumNodeName{
cluster: option.Config.ClusterName,
name: name,
}
ciliumNodeKVStore.DeleteLocalKey(ctx, &nodeDel)
},
func(node *cilium_v2.CiliumNode) {
nodeNew := nodeTypes.ParseCiliumNode(node)
ciliumNodeKVStore.UpdateKeySync(ctx, &nodeNew, false)
})
}
// If both nodeManager and KVStore are nil, then we don't need to handle
// any watcher events, but we will need to keep all CiliumNodes in
// memory because 'ciliumNodeStore' is used across the operator
// to get the latest state of a CiliumNode.
if withKVStore || nodeManager != nil {
resourceEventHandler = cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
log.WithError(err).Warning("Unable to process CiliumNode Add event")
return
}
if nodeManager != nil {
ciliumNodeManagerQueue.Add(key)
}
if withKVStore {
kvStoreQueue.Add(key)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if oldNode := k8s.ObjToCiliumNode(oldObj); oldNode != nil {
if newNode := k8s.ObjToCiliumNode(newObj); newNode != nil {
if oldNode.DeepEqual(newNode) {
return
}
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(newObj)
if err != nil {
log.WithError(err).Warning("Unable to process CiliumNode Update event")
return
}
if nodeManager != nil {
ciliumNodeManagerQueue.Add(key)
}
if withKVStore {
kvStoreQueue.Add(key)
}
} else {
log.Warningf("Unknown CiliumNode object type %T received: %+v", newNode, newNode)
}
} else {
log.Warningf("Unknown CiliumNode object type %T received: %+v", oldNode, oldNode)
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
log.WithError(err).Warning("Unable to process CiliumNode Delete event")
return
}
if nodeManager != nil {
ciliumNodeManagerQueue.Add(key)
}
if withKVStore {
kvStoreQueue.Add(key)
}
},
}
} else {
// Since we won't be handling any events we don't need to convert
// objects.
ciliumNodeConvertFunc = nil
}
// TODO: The operator is currently storing a full copy of the
// CiliumNode resource, as the resource grows, we may want to consider
// introducing a slim version of it.
var ciliumNodeInformer cache.Controller
ciliumNodeStore, ciliumNodeInformer = informer.NewInformer(
cache.NewListWatchFromClient(ciliumK8sClient.CiliumV2().RESTClient(),
cilium_v2.CNPluralName, v1.NamespaceAll, fields.Everything()),
&cilium_v2.CiliumNode{},
0,
resourceEventHandler,
ciliumNodeConvertFunc,
)
go func() {
cache.WaitForCacheSync(wait.NeverStop, ciliumNodeInformer.HasSynced)
close(k8sCiliumNodesCacheSynced)
ciliumNodeManagerQueue.Add(ciliumNodeManagerQueueSyncedKey{})
log.Info("CiliumNodes caches synced with Kubernetes")
// Only handle events if nodeManagerSyncHandler is not nil. If it is nil
// then there isn't any event handler set for CiliumNodes events.
if nodeManagerSyncHandler != nil {
go func() {
// infinite loop. run in a go routine to unblock code execution
for processNextWorkItem(ciliumNodeManagerQueue, nodeManagerSyncHandler) {
}
}()
}
// Start handling events for KVStore **after** nodeManagerSyncHandler
// otherwise Cilium Operator will block until the KVStore is available.
// This might be problematic in clusters that have etcd-operator with
// cluster-pool ipam mode because they depend on Cilium Operator to be
// running and handling IP Addresses with nodeManagerSyncHandler.
// Only handle events if kvStoreSyncHandler is not nil. If it is nil
// then there isn't any event handler set for CiliumNodes events.
if withKVStore && kvStoreSyncHandler != nil {
<-connectedToKVStore
log.Info("Connected to the KVStore, syncing CiliumNodes to the KVStore")
// infinite loop it will block code execution
for processNextWorkItem(kvStoreQueue, kvStoreSyncHandler) {
}
}
}()
go ciliumNodeInformer.Run(wait.NeverStop)
return nil
}
func syncHandlerConstructor(notFoundHandler func(name string), foundHandler func(node *cilium_v2.CiliumNode)) func(key string) error {
return func(key string) error {
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
log.WithError(err).Error("Unable to process CiliumNode event")
return err
}
obj, exists, err := ciliumNodeStore.GetByKey(name)
// Delete handling
if !exists || errors.IsNotFound(err) {
notFoundHandler(name)
return nil
}
if err != nil {
log.WithError(err).Warning("Unable to retrieve CiliumNode from watcher store")
return err
}
cn, ok := obj.(*cilium_v2.CiliumNode)
if !ok {
log.Errorf("Object stored in store is not *cilium_v2.CiliumNode but %T", obj)
return err
}
foundHandler(cn)
return nil
}
}
// processNextWorkItem process all events from the workqueue.
func processNextWorkItem(queue workqueue.RateLimitingInterface, syncHandler func(key string) error) bool {
key, quit := queue.Get()
if quit {
return false
}
defer queue.Done(key)
if _, ok := key.(ciliumNodeManagerQueueSyncedKey); ok {
close(ciliumNodeManagerQueueSynced)
return true
}
err := syncHandler(key.(string))
if err == nil {
// If err is nil we can forget it from the queue, if it is not nil
// the queue handler will retry to process this key until it succeeds.
queue.Forget(key)
return true
}
log.WithError(err).Errorf("sync %q failed with %v", key, err)
queue.AddRateLimited(key)
return true
}
type ciliumNodeUpdateImplementation struct{}
func (c *ciliumNodeUpdateImplementation) Create(node *cilium_v2.CiliumNode) (*cilium_v2.CiliumNode, error) {
return ciliumK8sClient.CiliumV2().CiliumNodes().Create(context.TODO(), node, meta_v1.CreateOptions{})
}
func (c *ciliumNodeUpdateImplementation) Get(node string) (*cilium_v2.CiliumNode, error) {
return ciliumK8sClient.CiliumV2().CiliumNodes().Get(context.TODO(), node, meta_v1.GetOptions{})
}
func (c *ciliumNodeUpdateImplementation) UpdateStatus(origNode, node *cilium_v2.CiliumNode) (*cilium_v2.CiliumNode, error) {
if origNode == nil || !origNode.Status.DeepEqual(&node.Status) {
return ciliumK8sClient.CiliumV2().CiliumNodes().UpdateStatus(context.TODO(), node, meta_v1.UpdateOptions{})
}
return nil, nil
}
func (c *ciliumNodeUpdateImplementation) Update(origNode, node *cilium_v2.CiliumNode) (*cilium_v2.CiliumNode, error) {
if origNode == nil || !origNode.Spec.DeepEqual(&node.Spec) {
return ciliumK8sClient.CiliumV2().CiliumNodes().Update(context.TODO(), node, meta_v1.UpdateOptions{})
}
return nil, nil
}
func (c *ciliumNodeUpdateImplementation) Delete(nodeName string) error {
return ciliumK8sClient.CiliumV2().CiliumNodes().Delete(context.TODO(), nodeName, meta_v1.DeleteOptions{})
}
func RunCNPNodeStatusGC(nodeStore cache.Store) {
go runCNPNodeStatusGC("cnp-node-gc", false, nodeStore)
go runCNPNodeStatusGC("ccnp-node-gc", true, nodeStore)
}
// runCNPNodeStatusGC runs the node status garbage collector for cilium network
// policies. The policy corresponds to CiliumClusterwideNetworkPolicy if the clusterwide
// parameter is true and CiliumNetworkPolicy otherwise.
func runCNPNodeStatusGC(name string, clusterwide bool, nodeStore cache.Store) {
parallelRequests := 4
removeNodeFromCNP := make(chan func(), 50)
for i := 0; i < parallelRequests; i++ {
go func() {
for f := range removeNodeFromCNP {
f()
}
}()
}
controller.NewManager().UpdateController(name,
controller.ControllerParams{
RunInterval: operatorOption.Config.CNPNodeStatusGCInterval,
DoFunc: func(ctx context.Context) error {
lastRun := v1.NewTime(v1.Now().Add(-operatorOption.Config.CNPNodeStatusGCInterval))
continueID := ""
wg := sync.WaitGroup{}
defer wg.Wait()
for {
var cnpItemsList []cilium_v2.CiliumNetworkPolicy
if clusterwide {
ccnpList, err := ciliumK8sClient.CiliumV2().CiliumClusterwideNetworkPolicies().List(ctx,
meta_v1.ListOptions{
Limit: 10,
Continue: continueID,
})
if err != nil {
return err
}
cnpItemsList = make([]cilium_v2.CiliumNetworkPolicy, 0, len(ccnpList.Items))
for _, ccnp := range ccnpList.Items {
cnpItemsList = append(cnpItemsList, cilium_v2.CiliumNetworkPolicy{
ObjectMeta: meta_v1.ObjectMeta{
Name: ccnp.Name,
},
Status: ccnp.Status,
})
}
continueID = ccnpList.Continue
} else {
cnpList, err := ciliumK8sClient.CiliumV2().CiliumNetworkPolicies(core_v1.NamespaceAll).List(ctx,
meta_v1.ListOptions{
Limit: 10,
Continue: continueID,
})
if err != nil {
return err
}
cnpItemsList = cnpList.Items
continueID = cnpList.Continue
}
for _, cnp := range cnpItemsList {
needsUpdate := false
nodesToDelete := map[string]v1.Time{}
for n, status := range cnp.Status.Nodes {
if _, exists, err := nodeStore.GetByKey(n); !exists && err == nil {
// To avoid concurrency issues where a node is
// created and adds its CNP Status before the operator
// node watcher receives an event that the node
// was created, we will only delete the node
// from the CNP Status if the last time it was
// updated was before the lastRun.
if status.LastUpdated.Before(&lastRun) {
nodesToDelete[n] = status.LastUpdated
delete(cnp.Status.Nodes, n)
needsUpdate = true
}
}
}
if needsUpdate {
wg.Add(1)
cnpCpy := cnp.DeepCopy()
removeNodeFromCNP <- func() {
updateCNP(ctx, ciliumK8sClient.CiliumV2(), cnpCpy, nodesToDelete)
wg.Done()
}
}
}
// Nothing to continue, break from the loop here
if continueID == "" {
break
}
}
return nil
},
})
}
func updateCNP(ctx context.Context, ciliumClient v2.CiliumV2Interface, cnp *cilium_v2.CiliumNetworkPolicy, nodesToDelete map[string]v1.Time) {
if len(nodesToDelete) == 0 {
return
}
ns := utils.ExtractNamespace(&cnp.ObjectMeta)
var removeStatusNode, remainingStatusNode []k8s.JSONPatch
for nodeToDelete, timeStamp := range nodesToDelete {
removeStatusNode = append(removeStatusNode,
// It is really unlikely to happen but if a node reappears
// with the same name and updates the CNP Status we will perform
// a test to verify if the lastUpdated timestamp is the same to
// to avoid accidentally deleting that node.
// If any of the nodes fails this test *all* of the JSON patch
// will not be executed.
k8s.JSONPatch{
OP: "test",
Path: "/status/nodes/" + nodeToDelete + "/lastUpdated",
Value: timeStamp,
},
k8s.JSONPatch{
OP: "remove",
Path: "/status/nodes/" + nodeToDelete,
},
)
}
for {
if len(removeStatusNode) > k8s.MaxJSONPatchOperations {
remainingStatusNode = removeStatusNode[k8s.MaxJSONPatchOperations:]
removeStatusNode = removeStatusNode[:k8s.MaxJSONPatchOperations]
}
removeStatusNodeJSON, err := json.Marshal(removeStatusNode)
if err != nil {
break
}
// If the namespace is empty the policy is the clusterwide policy
// and not the namespaced CiliumNetworkPolicy.
if ns == "" {
_, err = ciliumClient.CiliumClusterwideNetworkPolicies().Patch(ctx,
cnp.GetName(), types.JSONPatchType, removeStatusNodeJSON, meta_v1.PatchOptions{}, "status")
} else {
_, err = ciliumClient.CiliumNetworkPolicies(ns).Patch(ctx,
cnp.GetName(), types.JSONPatchType, removeStatusNodeJSON, meta_v1.PatchOptions{}, "status")
}
if err != nil {
// We can leave the errors as debug as the GC happens on a best effort
log.WithError(err).Debug("Unable to PATCH")
}
removeStatusNode = remainingStatusNode
if len(remainingStatusNode) == 0 {
return
}
}
}
func RunCNPStatusNodesCleaner(ctx context.Context, ciliumClient v2.CiliumV2Interface, rateLimit *rate.Limiter) {
go clearCNPStatusNodes(ctx, false, ciliumClient, rateLimit)
go clearCNPStatusNodes(ctx, true, ciliumClient, rateLimit)
}
func clearCNPStatusNodes(ctx context.Context, clusterwide bool, ciliumClient v2.CiliumV2Interface, rateLimit *rate.Limiter) {
body, err := json.Marshal([]k8s.JSONPatch{
{
OP: "remove",
Path: "/status/nodes",
},
})
if err != nil {
log.WithError(err).Debug("Unable to json marshal")
return
}
continueID := ""
nCNPs, nGcCNPs := 0, 0
for {
if clusterwide {
if err := rateLimit.Wait(ctx); err != nil {
log.WithError(err).Debug("Error while rate limiting CCNP List requests")
return
}
ccnpList, err := ciliumClient.CiliumClusterwideNetworkPolicies().List(
ctx,
meta_v1.ListOptions{
Limit: 10,
Continue: continueID,
})
if err != nil {
log.WithError(err).Debug("Unable to list CCNPs")
return
}
nCNPs += len(ccnpList.Items)
continueID = ccnpList.Continue
for _, cnp := range ccnpList.Items {
if len(cnp.Status.Nodes) == 0 {
continue
}
if err := rateLimit.Wait(ctx); err != nil {
log.WithError(err).Debug("Error while rate limiting CCNP PATCH requests")
return
}
_, err = ciliumClient.CiliumClusterwideNetworkPolicies().Patch(ctx,
cnp.Name, types.JSONPatchType, body, meta_v1.PatchOptions{}, "status")
if err != nil {
if errors.IsInvalid(err) {
// An "Invalid" error may be returned if /status/nodes path does not exist.
// In that case, we simply ignore it, since there are no updates to clean up.
continue
}
log.WithError(err).Debug("Unable to PATCH while clearing status nodes in CCNP")
}
nGcCNPs++
}
} else {
if err := rateLimit.Wait(ctx); err != nil {
log.WithError(err).Debug("Error while rate limiting CNP List requests")
return
}
cnpList, err := ciliumClient.CiliumNetworkPolicies(core_v1.NamespaceAll).List(
ctx,
meta_v1.ListOptions{
Limit: 10,
Continue: continueID,
})
if err != nil {
log.WithError(err).Debug("Unable to list CNPs")
return
}
nCNPs += len(cnpList.Items)
continueID = cnpList.Continue
for _, cnp := range cnpList.Items {
if len(cnp.Status.Nodes) == 0 {
continue
}
namespace := utils.ExtractNamespace(&cnp.ObjectMeta)
if err := rateLimit.Wait(ctx); err != nil {
log.WithError(err).Debug("Error while rate limiting CNP PATCH requests")
return
}
_, err = ciliumClient.CiliumNetworkPolicies(namespace).Patch(ctx,
cnp.Name, types.JSONPatchType, body, meta_v1.PatchOptions{}, "status")
if err != nil {
if errors.IsInvalid(err) {
// An "Invalid" error may be returned if /status/nodes path does not exist.
// In that case, we simply ignore it, since there are no updates to clean up.
continue
}
log.WithError(err).Debug("Unable to PATCH while clearing status nodes in CNP")
}
nGcCNPs++
}
}
if continueID == "" {
break
}
}
if clusterwide {
log.Infof("Garbage collected status/nodes in Cilium Clusterwide Network Policies found=%d, gc=%d", nCNPs, nGcCNPs)
} else {
log.Infof("Garbage collected status/nodes in Cilium Network Policies found=%d, gc=%d", nCNPs, nGcCNPs)
}
}