resources.go
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package envoy
import (
"net"
"sort"
"sync"
envoyAPI "github.com/cilium/proxy/go/cilium/api"
"github.com/sirupsen/logrus"
"github.com/cilium/cilium/pkg/envoy/xds"
"github.com/cilium/cilium/pkg/ipcache"
"github.com/cilium/cilium/pkg/logging/logfields"
)
const (
// ListenerTypeURL is the type URL of Listener resources.
ListenerTypeURL = "type.googleapis.com/envoy.config.listener.v3.Listener"
// RouteTypeURL is the type URL of HTTP Route resources.
RouteTypeURL = "type.googleapis.com/envoy.config.route.v3.RouteConfiguration"
// ClusterTypeURL is the type URL of Cluster resources.
ClusterTypeURL = "type.googleapis.com/envoy.config.cluster.v3.Cluster"
// HttpConnectionManagerTypeURL is the type URL of HttpConnectionManager resources.
HttpConnectionManagerTypeURL = "type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager"
// EndpointTypeURL is the type URL of Endpoint resources.
EndpointTypeURL = "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"
// SecretTypeURL is the type URL of Endpoint resources.
SecretTypeURL = "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.Secret"
// NetworkPolicyTypeURL is the type URL of NetworkPolicy resources.
NetworkPolicyTypeURL = "type.googleapis.com/cilium.NetworkPolicy"
// NetworkPolicyHostsTypeURL is the type URL of NetworkPolicyHosts resources.
NetworkPolicyHostsTypeURL = "type.googleapis.com/cilium.NetworkPolicyHosts"
// DownstreamTlsContextURL is the type URL of DownstreamTlsContext
DownstreamTlsContextURL = "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.DownstreamTlsContext"
)
// NPHDSCache is a cache of resources in the Network Policy Hosts Discovery
// Service.
//
// NetworkPolicyHostsCache is the global cache of resources of type
// NetworkPolicyHosts. Resources in this cache must have the
// NetworkPolicyHostsTypeURL type URL.
type NPHDSCache struct {
*xds.Cache
ipcache *ipcache.IPCache
}
func newNPHDSCache(ipcache *ipcache.IPCache) NPHDSCache {
return NPHDSCache{Cache: xds.NewCache(), ipcache: ipcache}
}
var (
observerOnce = sync.Once{}
)
// HandleResourceVersionAck is required to implement ResourceVersionAckObserver.
// We use this to start the IP Cache listener on the first ACK so that we only
// start the IP Cache listener if there is an Envoy node that uses NPHDS (e.g.,
// Istio node, or host proxy running on kernel w/o LPM bpf map support).
func (cache *NPHDSCache) HandleResourceVersionAck(ackVersion uint64, nackVersion uint64, nodeIP string, resourceNames []string, typeURL string, detail string) {
// Start caching for IP/ID mappings on the first indication someone wants them
observerOnce.Do(func() {
cache.ipcache.AddListener(cache)
})
}
// OnIPIdentityCacheGC is required to implement IPIdentityMappingListener.
func (cache *NPHDSCache) OnIPIdentityCacheGC() {
// We don't have anything to synchronize in this case.
}
// OnIPIdentityCacheChange pushes modifications to the IP<->Identity mapping
// into the Network Policy Host Discovery Service (NPHDS).
func (cache *NPHDSCache) OnIPIdentityCacheChange(modType ipcache.CacheModification, cidr net.IPNet,
oldHostIP, newHostIP net.IP, oldID *ipcache.Identity, newID ipcache.Identity,
encryptKey uint8, k8sMeta *ipcache.K8sMetadata) {
// An upsert where an existing pair exists should translate into a
// delete (for the old Identity) followed by an upsert (for the new).
if oldID != nil && modType == ipcache.Upsert {
// Skip update if identity is identical
if oldID.ID == newID.ID {
return
}
cache.OnIPIdentityCacheChange(ipcache.Delete, cidr, nil, nil, nil, *oldID, encryptKey, k8sMeta)
}
cidrStr := cidr.String()
scopedLog := log.WithFields(logrus.Fields{
logfields.IPAddr: cidrStr,
logfields.Identity: newID,
logfields.Modification: modType,
})
// Look up the current resources for the specified Identity.
resourceName := newID.ID.StringID()
msg, err := cache.Lookup(NetworkPolicyHostsTypeURL, resourceName)
if err != nil {
scopedLog.WithError(err).Warning("Can't lookup NPHDS cache")
return
}
switch modType {
case ipcache.Upsert:
var hostAddresses []string
if msg == nil {
hostAddresses = make([]string, 0, 1)
} else {
// If the resource already exists, create a copy of it and insert
// the new IP address into its HostAddresses list.
npHost := msg.(*envoyAPI.NetworkPolicyHosts)
hostAddresses = make([]string, 0, len(npHost.HostAddresses)+1)
hostAddresses = append(hostAddresses, npHost.HostAddresses...)
}
hostAddresses = append(hostAddresses, cidrStr)
sort.Strings(hostAddresses)
newNpHost := envoyAPI.NetworkPolicyHosts{
Policy: uint64(newID.ID),
HostAddresses: hostAddresses,
}
if err := newNpHost.Validate(); err != nil {
scopedLog.WithError(err).WithFields(logrus.Fields{
logfields.XDSResource: newNpHost.String(),
}).Warning("Could not validate NPHDS resource update on upsert")
return
}
cache.Upsert(NetworkPolicyHostsTypeURL, resourceName, &newNpHost)
case ipcache.Delete:
if msg == nil {
// Doesn't exist; already deleted.
return
}
cache.handleIPDelete(msg.(*envoyAPI.NetworkPolicyHosts), resourceName, cidrStr)
}
}
// handleIPUpsert deletes elements from the NPHDS cache with the specified peer IP->ID mapping.
func (cache *NPHDSCache) handleIPDelete(npHost *envoyAPI.NetworkPolicyHosts, peerIdentity, peerIP string) {
targetIndex := -1
scopedLog := log.WithFields(logrus.Fields{
logfields.IPAddr: peerIP,
logfields.Identity: peerIdentity,
logfields.Modification: ipcache.Delete,
})
for i, endpointIP := range npHost.HostAddresses {
if endpointIP == peerIP {
targetIndex = i
break
}
}
if targetIndex < 0 {
scopedLog.Warning("Can't find IP in NPHDS cache")
return
}
// If removing this host would result in empty list, delete it.
// Otherwise, update to a list that doesn't contain the target IP
if len(npHost.HostAddresses) <= 1 {
cache.Delete(NetworkPolicyHostsTypeURL, peerIdentity)
} else {
// If the resource is to be updated, create a copy of it before
// removing the IP address from its HostAddresses list.
hostAddresses := make([]string, 0, len(npHost.HostAddresses)-1)
if len(npHost.HostAddresses) == targetIndex {
hostAddresses = append(hostAddresses, npHost.HostAddresses[0:targetIndex]...)
} else {
hostAddresses = append(hostAddresses, npHost.HostAddresses[0:targetIndex]...)
hostAddresses = append(hostAddresses, npHost.HostAddresses[targetIndex+1:]...)
}
newNpHost := envoyAPI.NetworkPolicyHosts{
Policy: uint64(npHost.Policy),
HostAddresses: hostAddresses,
}
if err := newNpHost.Validate(); err != nil {
scopedLog.WithError(err).Warning("Could not validate NPHDS resource update on delete")
return
}
cache.Upsert(NetworkPolicyHostsTypeURL, peerIdentity, &newNpHost)
}
}