Skip to content
234 changes: 7 additions & 227 deletions controller/api/destination/endpoint_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package destination
import (
"fmt"
"net/netip"
"reflect"

pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/linkerd/linkerd2-proxy-api/go/net"
Expand Down Expand Up @@ -37,24 +36,19 @@ type (
controllerNS string
identityTrustDomain string
nodeTopologyZone string
nodeName string
defaultOpaquePorts map[uint32]struct{}

forceOpaqueTransport,
enableH2Upgrade,
enableEndpointFiltering,
enableIPv6,

extEndpointZoneWeights bool

meshedHTTP2ClientParams *pb.Http2ClientParams

availableEndpoints watcher.AddressSet
filteredSnapshot watcher.AddressSet
stream pb.Destination_GetServer
endStream chan struct{}
log *logging.Entry
overflowCounter prometheus.Counter
stream pb.Destination_GetServer
endStream chan struct{}
log *logging.Entry
overflowCounter prometheus.Counter

updates chan interface{}
stop chan struct{}
Expand All @@ -67,10 +61,6 @@ type (
removeUpdate struct {
set watcher.AddressSet
}

noEndpointsUpdate struct {
exists bool
}
)

var updatesQueueOverflowCounter = promauto.NewCounterVec(
Expand All @@ -88,8 +78,6 @@ func newEndpointTranslator(
identityTrustDomain string,
forceOpaqueTransport,
enableH2Upgrade,
enableEndpointFiltering,
enableIPv6,
extEndpointZoneWeights bool,
meshedHTTP2ClientParams *pb.Http2ClientParams,
service string,
Expand All @@ -110,9 +98,6 @@ func newEndpointTranslator(
if err != nil {
log.Errorf("Failed to get node topology zone for node %s: %s", srcNodeName, err)
}
availableEndpoints := newEmptyAddressSet()

filteredSnapshot := newEmptyAddressSet()

counter, err := updatesQueueOverflowCounter.GetMetricWith(prometheus.Labels{"service": service})
if err != nil {
Expand All @@ -123,17 +108,12 @@ func newEndpointTranslator(
controllerNS,
identityTrustDomain,
nodeTopologyZone,
srcNodeName,
defaultOpaquePorts,
forceOpaqueTransport,
enableH2Upgrade,
enableEndpointFiltering,
enableIPv6,
extEndpointZoneWeights,
meshedHTTP2ClientParams,

availableEndpoints,
filteredSnapshot,
stream,
endStream,
log,
Expand All @@ -151,11 +131,7 @@ func (et *endpointTranslator) Remove(set watcher.AddressSet) {
et.enqueueUpdate(&removeUpdate{set})
}

func (et *endpointTranslator) NoEndpoints(exists bool) {
et.enqueueUpdate(&noEndpointsUpdate{exists})
}

// Add, Remove, and NoEndpoints are called from a client-go informer callback
// Add and Remove are called from a client-go informer callback
// and therefore must not block. For each of these, we enqueue an update in
// a channel so that it can be processed asyncronously. To ensure that enqueuing
// does not block, we first check to see if there is capacity in the buffered
Expand Down Expand Up @@ -214,201 +190,12 @@ func (et *endpointTranslator) DrainAndStop() {
func (et *endpointTranslator) processUpdate(update interface{}) {
switch update := update.(type) {
case *addUpdate:
et.add(update.set)
et.sendClientAdd(update.set)
case *removeUpdate:
et.remove(update.set)
case *noEndpointsUpdate:
et.noEndpoints(update.exists)
}
}

func (et *endpointTranslator) add(set watcher.AddressSet) {
for id, address := range set.Addresses {
et.availableEndpoints.Addresses[id] = address
}

et.availableEndpoints.Labels = set.Labels
et.availableEndpoints.LocalTrafficPolicy = set.LocalTrafficPolicy

et.sendFilteredUpdate()
}

func (et *endpointTranslator) remove(set watcher.AddressSet) {
for id := range set.Addresses {
delete(et.availableEndpoints.Addresses, id)
}

et.sendFilteredUpdate()
}

func (et *endpointTranslator) noEndpoints(exists bool) {
et.log.Debugf("NoEndpoints(%+v)", exists)

et.availableEndpoints.Addresses = map[watcher.ID]watcher.Address{}

et.sendFilteredUpdate()
}

func (et *endpointTranslator) sendFilteredUpdate() {
filtered := et.filterAddresses()
filtered = et.selectAddressFamily(filtered)
diffAdd, diffRemove := et.diffEndpoints(filtered)

if len(diffAdd.Addresses) > 0 {
et.sendClientAdd(diffAdd)
}
if len(diffRemove.Addresses) > 0 {
et.sendClientRemove(diffRemove)
}

et.filteredSnapshot = filtered
}

func (et *endpointTranslator) selectAddressFamily(addresses watcher.AddressSet) watcher.AddressSet {
filtered := make(map[watcher.ID]watcher.Address)
for id, addr := range addresses.Addresses {
if id.IPFamily == corev1.IPv6Protocol && !et.enableIPv6 {
continue
}

if id.IPFamily == corev1.IPv4Protocol && et.enableIPv6 {
// Only consider IPv4 address for which there's not already an IPv6
// alternative
altID := id
altID.IPFamily = corev1.IPv6Protocol
if _, ok := addresses.Addresses[altID]; ok {
continue
}
}

filtered[id] = addr
}

return watcher.AddressSet{
Addresses: filtered,
Labels: addresses.Labels,
LocalTrafficPolicy: addresses.LocalTrafficPolicy,
et.sendClientRemove(update.set)
}
}

// filterAddresses is responsible for filtering endpoints based on the node's
// topology zone. The client will only receive endpoints with the same
// consumption zone as the node. An endpoints consumption zone is set
// by its Hints field and can be different than its actual Topology zone.
// when service.spec.internalTrafficPolicy is set to local, Topology Aware
// Hints are not used.
func (et *endpointTranslator) filterAddresses() watcher.AddressSet {
filtered := make(map[watcher.ID]watcher.Address)

// If endpoint filtering is disabled, return all available addresses.
if !et.enableEndpointFiltering {
for k, v := range et.availableEndpoints.Addresses {
filtered[k] = v
}
return watcher.AddressSet{
Addresses: filtered,
Labels: et.availableEndpoints.Labels,
}
}

// If service.spec.internalTrafficPolicy is set to local, filter and return the addresses
// for local node only
if et.availableEndpoints.LocalTrafficPolicy {
et.log.Debugf("Filtering through addresses that should be consumed by node %s", et.nodeName)
for id, address := range et.availableEndpoints.Addresses {
if address.Pod != nil && address.Pod.Spec.NodeName == et.nodeName {
filtered[id] = address
}
}
et.log.Debugf("Filtered from %d to %d addresses", len(et.availableEndpoints.Addresses), len(filtered))
return watcher.AddressSet{
Addresses: filtered,
Labels: et.availableEndpoints.Labels,
LocalTrafficPolicy: et.availableEndpoints.LocalTrafficPolicy,
}
}
// If any address does not have a hint, then all hints are ignored and all
// available addresses are returned. This replicates kube-proxy behavior
// documented in the KEP: https://github.com/kubernetes/enhancements/blob/master/keps/sig-network/2433-topology-aware-hints/README.md#kube-proxy
for _, address := range et.availableEndpoints.Addresses {
if len(address.ForZones) == 0 {
for k, v := range et.availableEndpoints.Addresses {
filtered[k] = v
}
et.log.Debugf("Hints not available on endpointslice. Zone Filtering disabled. Falling back to routing to all pods")
return watcher.AddressSet{
Addresses: filtered,
Labels: et.availableEndpoints.Labels,
LocalTrafficPolicy: et.availableEndpoints.LocalTrafficPolicy,
}
}
}

// Each address that has a hint matching the node's zone should be added
// to the set of addresses that will be returned.
et.log.Debugf("Filtering through addresses that should be consumed by zone %s", et.nodeTopologyZone)
for id, address := range et.availableEndpoints.Addresses {
for _, zone := range address.ForZones {
if zone.Name == et.nodeTopologyZone {
filtered[id] = address
}
}
}
if len(filtered) > 0 {
et.log.Debugf("Filtered from %d to %d addresses", len(et.availableEndpoints.Addresses), len(filtered))
return watcher.AddressSet{
Addresses: filtered,
Labels: et.availableEndpoints.Labels,
LocalTrafficPolicy: et.availableEndpoints.LocalTrafficPolicy,
}
}

// If there were no filtered addresses, then fall to using endpoints from
// all zones.
for k, v := range et.availableEndpoints.Addresses {
filtered[k] = v
}
return watcher.AddressSet{
Addresses: filtered,
Labels: et.availableEndpoints.Labels,
LocalTrafficPolicy: et.availableEndpoints.LocalTrafficPolicy,
}
}

// diffEndpoints calculates the difference between the filtered set of
// endpoints in the current (Add/Remove) operation and the snapshot of
// previously filtered endpoints. This diff allows the client to receive only
// the endpoints that match the topological zone, by adding new endpoints and
// removing stale ones.
func (et *endpointTranslator) diffEndpoints(filtered watcher.AddressSet) (watcher.AddressSet, watcher.AddressSet) {
add := make(map[watcher.ID]watcher.Address)
remove := make(map[watcher.ID]watcher.Address)

for id, new := range filtered.Addresses {
old, ok := et.filteredSnapshot.Addresses[id]
if !ok {
add[id] = new
} else if !reflect.DeepEqual(old, new) {
add[id] = new
}
}

for id, address := range et.filteredSnapshot.Addresses {
if _, ok := filtered.Addresses[id]; !ok {
remove[id] = address
}
}

return watcher.AddressSet{
Addresses: add,
Labels: filtered.Labels,
},
watcher.AddressSet{
Addresses: remove,
Labels: filtered.Labels,
}
}

func (et *endpointTranslator) sendClientAdd(set watcher.AddressSet) {
addrs := []*pb.WeightedAddr{}
for _, address := range set.Addresses {
Expand Down Expand Up @@ -724,13 +511,6 @@ func getNodeTopologyZone(k8sAPI *k8s.MetadataAPI, srcNode string) (string, error
return "", nil
}

func newEmptyAddressSet() watcher.AddressSet {
return watcher.AddressSet{
Addresses: make(map[watcher.ID]watcher.Address),
Labels: make(map[string]string),
}
}

// getInboundPort gets the inbound port from the proxy container's environment
// variable.
func getInboundPort(podSpec *corev1.PodSpec) (uint32, error) {
Expand Down
Loading
Loading