diff --git a/controller/api/destination/endpoint_translator.go b/controller/api/destination/endpoint_translator.go index 7fd292574cfd5..753623077952d 100644 --- a/controller/api/destination/endpoint_translator.go +++ b/controller/api/destination/endpoint_translator.go @@ -3,7 +3,6 @@ package destination import ( "fmt" "net/netip" - "reflect" pb "github.com/linkerd/linkerd2-proxy-api/go/destination" "github.com/linkerd/linkerd2-proxy-api/go/net" @@ -37,24 +36,19 @@ type ( controllerNS string identityTrustDomain string nodeTopologyZone string - nodeName string defaultOpaquePorts map[uint32]struct{} forceOpaqueTransport, enableH2Upgrade, - enableEndpointFiltering, - enableIPv6, extEndpointZoneWeights bool meshedHTTP2ClientParams *pb.Http2ClientParams - availableEndpoints watcher.AddressSet - filteredSnapshot watcher.AddressSet - stream pb.Destination_GetServer - endStream chan struct{} - log *logging.Entry - overflowCounter prometheus.Counter + stream pb.Destination_GetServer + endStream chan struct{} + log *logging.Entry + overflowCounter prometheus.Counter updates chan interface{} stop chan struct{} @@ -67,10 +61,6 @@ type ( removeUpdate struct { set watcher.AddressSet } - - noEndpointsUpdate struct { - exists bool - } ) var updatesQueueOverflowCounter = promauto.NewCounterVec( @@ -88,8 +78,6 @@ func newEndpointTranslator( identityTrustDomain string, forceOpaqueTransport, enableH2Upgrade, - enableEndpointFiltering, - enableIPv6, extEndpointZoneWeights bool, meshedHTTP2ClientParams *pb.Http2ClientParams, service string, @@ -110,9 +98,6 @@ func newEndpointTranslator( if err != nil { log.Errorf("Failed to get node topology zone for node %s: %s", srcNodeName, err) } - availableEndpoints := newEmptyAddressSet() - - filteredSnapshot := newEmptyAddressSet() counter, err := updatesQueueOverflowCounter.GetMetricWith(prometheus.Labels{"service": service}) if err != nil { @@ -123,17 +108,12 @@ func newEndpointTranslator( controllerNS, identityTrustDomain, nodeTopologyZone, - srcNodeName, defaultOpaquePorts, forceOpaqueTransport, enableH2Upgrade, - enableEndpointFiltering, - enableIPv6, extEndpointZoneWeights, meshedHTTP2ClientParams, - availableEndpoints, - filteredSnapshot, stream, endStream, log, @@ -151,11 +131,7 @@ func (et *endpointTranslator) Remove(set watcher.AddressSet) { et.enqueueUpdate(&removeUpdate{set}) } -func (et *endpointTranslator) NoEndpoints(exists bool) { - et.enqueueUpdate(&noEndpointsUpdate{exists}) -} - -// Add, Remove, and NoEndpoints are called from a client-go informer callback +// Add and Remove are called from a client-go informer callback // and therefore must not block. For each of these, we enqueue an update in // a channel so that it can be processed asyncronously. To ensure that enqueuing // does not block, we first check to see if there is capacity in the buffered @@ -214,201 +190,12 @@ func (et *endpointTranslator) DrainAndStop() { func (et *endpointTranslator) processUpdate(update interface{}) { switch update := update.(type) { case *addUpdate: - et.add(update.set) + et.sendClientAdd(update.set) case *removeUpdate: - et.remove(update.set) - case *noEndpointsUpdate: - et.noEndpoints(update.exists) - } -} - -func (et *endpointTranslator) add(set watcher.AddressSet) { - for id, address := range set.Addresses { - et.availableEndpoints.Addresses[id] = address - } - - et.availableEndpoints.Labels = set.Labels - et.availableEndpoints.LocalTrafficPolicy = set.LocalTrafficPolicy - - et.sendFilteredUpdate() -} - -func (et *endpointTranslator) remove(set watcher.AddressSet) { - for id := range set.Addresses { - delete(et.availableEndpoints.Addresses, id) - } - - et.sendFilteredUpdate() -} - -func (et *endpointTranslator) noEndpoints(exists bool) { - et.log.Debugf("NoEndpoints(%+v)", exists) - - et.availableEndpoints.Addresses = map[watcher.ID]watcher.Address{} - - et.sendFilteredUpdate() -} - -func (et *endpointTranslator) sendFilteredUpdate() { - filtered := et.filterAddresses() - filtered = et.selectAddressFamily(filtered) - diffAdd, diffRemove := et.diffEndpoints(filtered) - - if len(diffAdd.Addresses) > 0 { - et.sendClientAdd(diffAdd) - } - if len(diffRemove.Addresses) > 0 { - et.sendClientRemove(diffRemove) - } - - et.filteredSnapshot = filtered -} - -func (et *endpointTranslator) selectAddressFamily(addresses watcher.AddressSet) watcher.AddressSet { - filtered := make(map[watcher.ID]watcher.Address) - for id, addr := range addresses.Addresses { - if id.IPFamily == corev1.IPv6Protocol && !et.enableIPv6 { - continue - } - - if id.IPFamily == corev1.IPv4Protocol && et.enableIPv6 { - // Only consider IPv4 address for which there's not already an IPv6 - // alternative - altID := id - altID.IPFamily = corev1.IPv6Protocol - if _, ok := addresses.Addresses[altID]; ok { - continue - } - } - - filtered[id] = addr - } - - return watcher.AddressSet{ - Addresses: filtered, - Labels: addresses.Labels, - LocalTrafficPolicy: addresses.LocalTrafficPolicy, + et.sendClientRemove(update.set) } } -// filterAddresses is responsible for filtering endpoints based on the node's -// topology zone. The client will only receive endpoints with the same -// consumption zone as the node. An endpoints consumption zone is set -// by its Hints field and can be different than its actual Topology zone. -// when service.spec.internalTrafficPolicy is set to local, Topology Aware -// Hints are not used. -func (et *endpointTranslator) filterAddresses() watcher.AddressSet { - filtered := make(map[watcher.ID]watcher.Address) - - // If endpoint filtering is disabled, return all available addresses. - if !et.enableEndpointFiltering { - for k, v := range et.availableEndpoints.Addresses { - filtered[k] = v - } - return watcher.AddressSet{ - Addresses: filtered, - Labels: et.availableEndpoints.Labels, - } - } - - // If service.spec.internalTrafficPolicy is set to local, filter and return the addresses - // for local node only - if et.availableEndpoints.LocalTrafficPolicy { - et.log.Debugf("Filtering through addresses that should be consumed by node %s", et.nodeName) - for id, address := range et.availableEndpoints.Addresses { - if address.Pod != nil && address.Pod.Spec.NodeName == et.nodeName { - filtered[id] = address - } - } - et.log.Debugf("Filtered from %d to %d addresses", len(et.availableEndpoints.Addresses), len(filtered)) - return watcher.AddressSet{ - Addresses: filtered, - Labels: et.availableEndpoints.Labels, - LocalTrafficPolicy: et.availableEndpoints.LocalTrafficPolicy, - } - } - // If any address does not have a hint, then all hints are ignored and all - // available addresses are returned. This replicates kube-proxy behavior - // documented in the KEP: https://github.com/kubernetes/enhancements/blob/master/keps/sig-network/2433-topology-aware-hints/README.md#kube-proxy - for _, address := range et.availableEndpoints.Addresses { - if len(address.ForZones) == 0 { - for k, v := range et.availableEndpoints.Addresses { - filtered[k] = v - } - et.log.Debugf("Hints not available on endpointslice. Zone Filtering disabled. Falling back to routing to all pods") - return watcher.AddressSet{ - Addresses: filtered, - Labels: et.availableEndpoints.Labels, - LocalTrafficPolicy: et.availableEndpoints.LocalTrafficPolicy, - } - } - } - - // Each address that has a hint matching the node's zone should be added - // to the set of addresses that will be returned. - et.log.Debugf("Filtering through addresses that should be consumed by zone %s", et.nodeTopologyZone) - for id, address := range et.availableEndpoints.Addresses { - for _, zone := range address.ForZones { - if zone.Name == et.nodeTopologyZone { - filtered[id] = address - } - } - } - if len(filtered) > 0 { - et.log.Debugf("Filtered from %d to %d addresses", len(et.availableEndpoints.Addresses), len(filtered)) - return watcher.AddressSet{ - Addresses: filtered, - Labels: et.availableEndpoints.Labels, - LocalTrafficPolicy: et.availableEndpoints.LocalTrafficPolicy, - } - } - - // If there were no filtered addresses, then fall to using endpoints from - // all zones. - for k, v := range et.availableEndpoints.Addresses { - filtered[k] = v - } - return watcher.AddressSet{ - Addresses: filtered, - Labels: et.availableEndpoints.Labels, - LocalTrafficPolicy: et.availableEndpoints.LocalTrafficPolicy, - } -} - -// diffEndpoints calculates the difference between the filtered set of -// endpoints in the current (Add/Remove) operation and the snapshot of -// previously filtered endpoints. This diff allows the client to receive only -// the endpoints that match the topological zone, by adding new endpoints and -// removing stale ones. -func (et *endpointTranslator) diffEndpoints(filtered watcher.AddressSet) (watcher.AddressSet, watcher.AddressSet) { - add := make(map[watcher.ID]watcher.Address) - remove := make(map[watcher.ID]watcher.Address) - - for id, new := range filtered.Addresses { - old, ok := et.filteredSnapshot.Addresses[id] - if !ok { - add[id] = new - } else if !reflect.DeepEqual(old, new) { - add[id] = new - } - } - - for id, address := range et.filteredSnapshot.Addresses { - if _, ok := filtered.Addresses[id]; !ok { - remove[id] = address - } - } - - return watcher.AddressSet{ - Addresses: add, - Labels: filtered.Labels, - }, - watcher.AddressSet{ - Addresses: remove, - Labels: filtered.Labels, - } -} - func (et *endpointTranslator) sendClientAdd(set watcher.AddressSet) { addrs := []*pb.WeightedAddr{} for _, address := range set.Addresses { @@ -724,13 +511,6 @@ func getNodeTopologyZone(k8sAPI *k8s.MetadataAPI, srcNode string) (string, error return "", nil } -func newEmptyAddressSet() watcher.AddressSet { - return watcher.AddressSet{ - Addresses: make(map[watcher.ID]watcher.Address), - Labels: make(map[string]string), - } -} - // getInboundPort gets the inbound port from the proxy container's environment // variable. func getInboundPort(podSpec *corev1.PodSpec) (uint32, error) { diff --git a/controller/api/destination/endpoint_translator_test.go b/controller/api/destination/endpoint_translator_test.go index 43fe66fbbe21f..363514306b417 100644 --- a/controller/api/destination/endpoint_translator_test.go +++ b/controller/api/destination/endpoint_translator_test.go @@ -61,45 +61,6 @@ var ( OwnerName: "rc-name", } - pod1IPv6 = watcher.Address{ - IP: "2001:0db8:85a3:0000:0000:8a2e:0370:7333", - Port: 1, - Pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - Namespace: "ns", - Labels: map[string]string{ - k8s.ControllerNSLabel: "linkerd", - k8s.ProxyDeploymentLabel: "deployment-name", - }, - }, - Spec: corev1.PodSpec{ - ServiceAccountName: "serviceaccount-name", - Containers: []corev1.Container{ - { - Name: k8s.ProxyContainerName, - Env: []corev1.EnvVar{ - { - Name: envInboundListenAddr, - Value: "[::]:4143", - }, - { - Name: envAdminListenAddr, - Value: "[::]:4191", - }, - { - Name: envControlListenAddr, - Value: "[::]:4190", - }, - }, - }, - }, - }, - }, - OwnerKind: "replicationcontroller", - OwnerName: "rc-name", - } - pod2 = watcher.Address{ IP: "1.1.1.2", Port: 2, @@ -826,56 +787,6 @@ func TestEndpointTranslatorForPods(t *testing.T) { t.Fatalf("ProtocolHint: %v", diff) } }) - - t.Run("Sends IPv6 only when pod has both IPv4 and IPv6", func(t *testing.T) { - mockGetServer, translator := makeEndpointTranslator(t) - translator.Start() - defer translator.Stop() - - translator.Add(mkAddressSetForPods(t, pod1, pod1IPv6)) - - addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs() - if len(addrs) != 1 { - t.Fatalf("Expected [1] address returned, got %v", addrs) - } - if ipPort := addr.ProxyAddressToString(addrs[0].GetAddr()); ipPort != "[2001:db8:85a3::8a2e:370:7333]:1" { - t.Fatalf("Expected address to be [%s], got [%s]", "[2001:db8:85a3::8a2e:370:7333]:1", ipPort) - } - - if updates := len(mockGetServer.updatesReceived); updates > 0 { - t.Fatalf("Expected to receive no more messages, received [%d]", updates) - } - }) - - t.Run("Sends IPv4 only when pod has both IPv4 and IPv6 but the latter in another zone ", func(t *testing.T) { - mockGetServer, translator := makeEndpointTranslator(t) - translator.Start() - defer translator.Stop() - - pod1West1a := pod1 - pod1West1a.ForZones = []v1.ForZone{ - {Name: "west-1a"}, - } - - pod1IPv6West1b := pod1IPv6 - pod1IPv6West1b.ForZones = []v1.ForZone{ - {Name: "west-1b"}, - } - - translator.Add(mkAddressSetForPods(t, pod1West1a, pod1IPv6West1b)) - - addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs() - if len(addrs) != 1 { - t.Fatalf("Expected [1] address returned, got %v", addrs) - } - if ipPort := addr.ProxyAddressToString(addrs[0].GetAddr()); ipPort != "1.1.1.1:1" { - t.Fatalf("Expected address to be [%s], got [%s]", "1.1.1.1:1", ipPort) - } - - if updates := len(mockGetServer.updatesReceived); updates > 0 { - t.Fatalf("Expected to receive no more messages, received [%d]", updates) - } - }) } func TestEndpointTranslatorExternalWorkloads(t *testing.T) { @@ -1063,27 +974,6 @@ func TestEndpointTranslatorExternalWorkloads(t *testing.T) { }) } -func TestEndpointTranslatorTopologyAwareFilter(t *testing.T) { - t.Run("Sends one update for add and none for remove", func(t *testing.T) { - mockGetServer, translator := makeEndpointTranslator(t) - translator.Start() - defer translator.Stop() - - translator.Add(mkAddressSetForServices(west1aAddress, west1bAddress)) - translator.Remove(mkAddressSetForServices(west1bAddress)) - - // Only the address meant for west-1a should be added, which means - // that when we try to remove the address meant for west-1b there - // should be no remove update. - expectedNumUpdates := 1 - <-mockGetServer.updatesReceived // Add - - if len(mockGetServer.updatesReceived) != 0 { - t.Fatalf("Expecting [%d] updates, got [%d].", expectedNumUpdates, expectedNumUpdates+len(mockGetServer.updatesReceived)) - } - }) -} - func TestEndpointTranslatorExperimentalZoneWeights(t *testing.T) { zoneA := "west-1a" zoneB := "west-1b" @@ -1137,53 +1027,6 @@ func TestEndpointTranslatorExperimentalZoneWeights(t *testing.T) { }) } -func TestEndpointTranslatorForLocalTrafficPolicy(t *testing.T) { - t.Run("Sends one update for add and none for remove", func(t *testing.T) { - mockGetServer, translator := makeEndpointTranslator(t) - translator.Start() - defer translator.Stop() - addressSet := mkAddressSetForServices(AddressOnTest123Node, AddressNotOnTest123Node) - addressSet.LocalTrafficPolicy = true - translator.Add(addressSet) - translator.Remove(mkAddressSetForServices(AddressNotOnTest123Node)) - - // Only the address meant for AddressOnTest123Node should be added, which means - // that when we try to remove the address meant for AddressNotOnTest123Node there - // should be no remove update. - expectedNumUpdates := 1 - <-mockGetServer.updatesReceived // Add - - if len(mockGetServer.updatesReceived) != 0 { - t.Fatalf("Expecting [%d] updates, got [%d].", expectedNumUpdates, expectedNumUpdates+len(mockGetServer.updatesReceived)) - } - }) - - t.Run("Removes cannot change LocalTrafficPolicy", func(t *testing.T) { - mockGetServer, translator := makeEndpointTranslator(t) - translator.Start() - defer translator.Stop() - addressSet := mkAddressSetForServices(AddressOnTest123Node, AddressNotOnTest123Node) - addressSet.LocalTrafficPolicy = true - translator.Add(addressSet) - set := watcher.AddressSet{ - Addresses: make(map[watcher.ServiceID]watcher.Address), - Labels: map[string]string{"service": "service-name", "namespace": "service-ns"}, - LocalTrafficPolicy: false, - } - translator.Remove(set) - - // Only the address meant for AddressOnTest123Node should be added. - // The remove with no addresses should not change the LocalTrafficPolicy - // and should be a noop that does not send an update. - expectedNumUpdates := 1 - <-mockGetServer.updatesReceived // Add - - if len(mockGetServer.updatesReceived) != 0 { - t.Fatalf("Expecting [%d] updates, got [%d].", expectedNumUpdates, expectedNumUpdates+len(mockGetServer.updatesReceived)) - } - }) -} - // TestConcurrency, to be triggered with `go test -race`, shouldn't report a race condition func TestConcurrency(t *testing.T) { _, translator := makeEndpointTranslator(t) @@ -1238,12 +1081,10 @@ func TestGetInboundPort(t *testing.T) { func mkAddressSetForServices(gatewayAddresses ...watcher.Address) watcher.AddressSet { set := watcher.AddressSet{ - Addresses: make(map[watcher.ServiceID]watcher.Address), + Addresses: make(map[watcher.ID]watcher.Address), Labels: map[string]string{"service": "service-name", "namespace": "service-ns"}, } for _, a := range gatewayAddresses { - a := a // pin - id := watcher.ServiceID{ Name: strings.Join([]string{ a.IP, @@ -1259,7 +1100,7 @@ func mkAddressSetForPods(t *testing.T, podAddresses ...watcher.Address) watcher. t.Helper() set := watcher.AddressSet{ - Addresses: make(map[watcher.PodID]watcher.Address), + Addresses: make(map[watcher.ID]watcher.Address), Labels: map[string]string{"service": "service-name", "namespace": "service-ns"}, } for _, p := range podAddresses { @@ -1286,7 +1127,7 @@ func mkAddressSetForPods(t *testing.T, podAddresses ...watcher.Address) watcher. func mkAddressSetForExternalWorkloads(ewAddresses ...watcher.Address) watcher.AddressSet { set := watcher.AddressSet{ - Addresses: make(map[watcher.PodID]watcher.Address), + Addresses: make(map[watcher.ID]watcher.Address), Labels: map[string]string{"service": "service-name", "namespace": "service-ns"}, } for _, ew := range ewAddresses { diff --git a/controller/api/destination/federated_service_watcher.go b/controller/api/destination/federated_service_watcher.go index 96bbd22654e27..93c6253ec6897 100644 --- a/controller/api/destination/federated_service_watcher.go +++ b/controller/api/destination/federated_service_watcher.go @@ -266,17 +266,27 @@ func (fs *federatedService) delete() { defer fs.Unlock() for _, subscriber := range fs.subscribers { + remoteFilterKey := watcher.FilterKey{ + Hostname: subscriber.instanceID, + NodeName: subscriber.nodeName, + EnableEndpointFiltering: false, + } for id, translator := range subscriber.remoteTranslators { remoteWatcher, _, found := fs.clusterStore.Get(id.cluster) if !found { fs.log.Errorf("Failed to get remote cluster %s", id.cluster) continue } - remoteWatcher.Unsubscribe(id.service, subscriber.port, subscriber.instanceID, translator) + remoteWatcher.Unsubscribe(id.service, subscriber.port, remoteFilterKey, translator, false) translator.Stop() } + localFilterKey := watcher.FilterKey{ + Hostname: subscriber.instanceID, + NodeName: subscriber.nodeName, + EnableEndpointFiltering: true, // Endpoint filtering is enabled for local discovery. + } for localDiscovery, translator := range subscriber.localTranslators { - fs.localEndpoints.Unsubscribe(watcher.ServiceID{Namespace: fs.namespace, Name: localDiscovery}, subscriber.port, subscriber.instanceID, translator) + fs.localEndpoints.Unsubscribe(watcher.ServiceID{Namespace: fs.namespace, Name: localDiscovery}, subscriber.port, localFilterKey, translator, false) translator.Stop() } close(subscriber.endStream) @@ -353,8 +363,6 @@ func (fs *federatedService) remoteDiscoverySubscribe( remoteConfig.TrustDomain, fs.config.ForceOpaqueTransport, fs.config.EnableH2Upgrade, - false, // Disable endpoint filtering for remote discovery. - fs.config.EnableIPv6, fs.config.ExtEndpointZoneWeights, fs.config.MeshedHttp2ClientParams, fmt.Sprintf("%s.%s.svc.%s:%d", id.service, fs.namespace, remoteConfig.ClusterDomain, subscriber.port), @@ -375,7 +383,12 @@ func (fs *federatedService) remoteDiscoverySubscribe( subscriber.remoteTranslators[id] = translator fs.log.Debugf("Subscribing to remote discovery service %s in cluster %s", id.service, id.cluster) - err = remoteWatcher.Subscribe(watcher.ServiceID{Namespace: id.service.Namespace, Name: id.service.Name}, subscriber.port, subscriber.instanceID, translator) + filterKey := watcher.FilterKey{ + Hostname: subscriber.instanceID, + NodeName: subscriber.nodeName, + EnableEndpointFiltering: false, // Endpoint filtering is disabled for remote discovery. + } + err = remoteWatcher.Subscribe(watcher.ServiceID{Namespace: id.service.Namespace, Name: id.service.Name}, subscriber.port, filterKey, translator) if err != nil { fs.log.Errorf("Failed to subscribe to remote discovery service %q in cluster %s: %s", id.service.Name, id.cluster, err) } @@ -393,8 +406,12 @@ func (fs *federatedService) remoteDiscoveryUnsubscribe( translator := subscriber.remoteTranslators[id] fs.log.Debugf("Unsubscribing from remote discovery service %s in cluster %s", id.service, id.cluster) - remoteWatcher.Unsubscribe(id.service, subscriber.port, subscriber.instanceID, translator) - translator.NoEndpoints(true) + filterKey := watcher.FilterKey{ + Hostname: subscriber.instanceID, + NodeName: subscriber.nodeName, + EnableEndpointFiltering: false, // Endpoint filtering is disabled for remote discovery. + } + remoteWatcher.Unsubscribe(id.service, subscriber.port, filterKey, translator, true) translator.DrainAndStop() delete(subscriber.remoteTranslators, id) } @@ -408,8 +425,6 @@ func (fs *federatedService) localDiscoverySubscribe( fs.config.IdentityTrustDomain, fs.config.ForceOpaqueTransport, fs.config.EnableH2Upgrade, - true, - fs.config.EnableIPv6, fs.config.ExtEndpointZoneWeights, fs.config.MeshedHttp2ClientParams, localDiscovery, @@ -429,7 +444,12 @@ func (fs *federatedService) localDiscoverySubscribe( subscriber.localTranslators[localDiscovery] = translator fs.log.Debugf("Subscribing to local discovery service %s", localDiscovery) - err = fs.localEndpoints.Subscribe(watcher.ServiceID{Namespace: fs.namespace, Name: localDiscovery}, subscriber.port, subscriber.instanceID, translator) + filterKey := watcher.FilterKey{ + Hostname: subscriber.instanceID, + NodeName: subscriber.nodeName, + EnableEndpointFiltering: true, // Endpoint filtering is enabled for local discovery. + } + err = fs.localEndpoints.Subscribe(watcher.ServiceID{Namespace: fs.namespace, Name: localDiscovery}, subscriber.port, filterKey, translator) if err != nil { fs.log.Errorf("Failed to subscribe to %s: %s", localDiscovery, err) } @@ -442,8 +462,12 @@ func (fs *federatedService) localDiscoveryUnsubscribe( translator, found := subscriber.localTranslators[localDiscovery] if found { fs.log.Debugf("Unsubscribing to local discovery service %s", localDiscovery) - fs.localEndpoints.Unsubscribe(watcher.ServiceID{Namespace: fs.namespace, Name: localDiscovery}, subscriber.port, subscriber.instanceID, translator) - translator.NoEndpoints(true) + filterKey := watcher.FilterKey{ + Hostname: subscriber.instanceID, + NodeName: subscriber.nodeName, + EnableEndpointFiltering: true, // Endpoint filtering is enabled for local discovery. + } + fs.localEndpoints.Unsubscribe(watcher.ServiceID{Namespace: fs.namespace, Name: localDiscovery}, subscriber.port, filterKey, translator, true) translator.DrainAndStop() delete(subscriber.localTranslators, localDiscovery) } diff --git a/controller/api/destination/federated_service_watcher_test.go b/controller/api/destination/federated_service_watcher_test.go index ef1a0d6bbd13a..d5b6f59786ac8 100644 --- a/controller/api/destination/federated_service_watcher_test.go +++ b/controller/api/destination/federated_service_watcher_test.go @@ -135,13 +135,13 @@ func mockFederatedServiceWatcher(t *testing.T) (*federatedServiceWatcher, error) if err != nil { return nil, fmt.Errorf("NewFakeMetadataAPI returned an error: %w", err) } - localEndpoints, err := watcher.NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, "local") + localEndpoints, err := watcher.NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, false, "local") if err != nil { return nil, fmt.Errorf("NewEndpointsWatcher returned an error: %w", err) } prom := prometheus.NewRegistry() - clusterStore, err := watcher.NewClusterStoreWithDecoder(k8sAPI.Client, "linkerd", false, + clusterStore, err := watcher.NewClusterStoreWithDecoder(k8sAPI.Client, "linkerd", false, false, watcher.CreateMulticlusterDecoder(map[string][]string{ "east": eastConfigs, "north": northConfigs, diff --git a/controller/api/destination/server.go b/controller/api/destination/server.go index 62131b729552d..7c0b1833fd643 100644 --- a/controller/api/destination/server.go +++ b/controller/api/destination/server.go @@ -101,7 +101,7 @@ func NewServer( if err != nil { return nil, err } - endpoints, err := watcher.NewEndpointsWatcher(k8sAPI, metadataAPI, log, config.EnableEndpointSlices, "local") + endpoints, err := watcher.NewEndpointsWatcher(k8sAPI, metadataAPI, log, config.EnableEndpointSlices, config.EnableIPv6, "local") if err != nil { return nil, err } @@ -214,8 +214,6 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e remoteConfig.TrustDomain, s.config.ForceOpaqueTransport, s.config.EnableH2Upgrade, - false, // Disable endpoint filtering for remote discovery. - s.config.EnableIPv6, s.config.ExtEndpointZoneWeights, s.config.MeshedHttp2ClientParams, fmt.Sprintf("%s.%s.svc.%s:%d", remoteSvc, service.Namespace, remoteConfig.ClusterDomain, port), @@ -233,7 +231,13 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e translator.Start() defer translator.Stop() - err = remoteWatcher.Subscribe(watcher.ServiceID{Namespace: service.Namespace, Name: remoteSvc}, port, instanceID, translator) + filterKey := watcher.FilterKey{ + Hostname: instanceID, + NodeName: token.NodeName, + EnableEndpointFiltering: false, // Disable endpoint filtering for remote discovery. + } + + err = remoteWatcher.Subscribe(watcher.ServiceID{Namespace: service.Namespace, Name: remoteSvc}, port, filterKey, translator) if err != nil { var ise watcher.InvalidService if errors.As(err, &ise) { @@ -243,7 +247,7 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e log.Errorf("Failed to subscribe to remote discovery service %q in cluster %s: %s", dest.GetPath(), cluster, err) return err } - defer remoteWatcher.Unsubscribe(watcher.ServiceID{Namespace: service.Namespace, Name: remoteSvc}, port, instanceID, translator) + defer remoteWatcher.Unsubscribe(watcher.ServiceID{Namespace: service.Namespace, Name: remoteSvc}, port, filterKey, translator, false) } else { log.Debug("Local discovery service detected") @@ -253,8 +257,6 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e s.config.IdentityTrustDomain, s.config.ForceOpaqueTransport, s.config.EnableH2Upgrade, - true, - s.config.EnableIPv6, s.config.ExtEndpointZoneWeights, s.config.MeshedHttp2ClientParams, dest.GetPath(), @@ -272,7 +274,13 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e translator.Start() defer translator.Stop() - err = s.endpoints.Subscribe(service, port, instanceID, translator) + filterKey := watcher.FilterKey{ + Hostname: instanceID, + NodeName: token.NodeName, + EnableEndpointFiltering: true, // Enable endpoint filtering for local discovery. + } + + err = s.endpoints.Subscribe(service, port, filterKey, translator) if err != nil { var ise watcher.InvalidService if errors.As(err, &ise) { @@ -282,7 +290,7 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e log.Errorf("Failed to subscribe to %s: %s", dest.GetPath(), err) return err } - defer s.endpoints.Unsubscribe(service, port, instanceID, translator) + defer s.endpoints.Unsubscribe(service, port, filterKey, translator, false) } select { diff --git a/controller/api/destination/test_util.go b/controller/api/destination/test_util.go index 6ba8b4161cbad..8bd2a8930d68d 100644 --- a/controller/api/destination/test_util.go +++ b/controller/api/destination/test_util.go @@ -1043,7 +1043,7 @@ spec: if err != nil { t.Fatalf("can't create Workloads watcher: %s", err) } - endpoints, err := watcher.NewEndpointsWatcher(k8sAPI, metadataAPI, log, true, "local") + endpoints, err := watcher.NewEndpointsWatcher(k8sAPI, metadataAPI, log, true, true, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } @@ -1057,7 +1057,7 @@ spec: } prom := prometheus.NewRegistry() - clusterStore, err := watcher.NewClusterStoreWithDecoder(k8sAPI.Client, "linkerd", true, watcher.CreateMockDecoder(exportedServiceResources...), prom) + clusterStore, err := watcher.NewClusterStoreWithDecoder(k8sAPI.Client, "linkerd", true, true, watcher.CreateMockDecoder(exportedServiceResources...), prom) if err != nil { t.Fatalf("can't create cluster store: %s", err) } @@ -1181,8 +1181,6 @@ metadata: "trust.domain", forceOpaqueTransport, true, // enableH2Upgrade - true, // enableEndpointFiltering - true, // enableIPv6 false, // extEndpointZoneWeights nil, // meshedHttp2ClientParams "service-name.service-ns", diff --git a/controller/api/destination/watcher/address.go b/controller/api/destination/watcher/address.go new file mode 100644 index 0000000000000..bf05a6b143eba --- /dev/null +++ b/controller/api/destination/watcher/address.go @@ -0,0 +1,59 @@ +package watcher + +import ( + "maps" + + ewv1beta1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1" + corev1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" +) + +type ( + // Address represents an individual port on a specific endpoint. + // This endpoint might be the result of a the existence of a pod + // that is targeted by this service; alternatively it can be the + // case that this endpoint is not associated with a pod and maps + // to some other IP (i.e. a remote service gateway) + Address struct { + IP string + Port Port + Pod *corev1.Pod + ExternalWorkload *ewv1beta1.ExternalWorkload + OwnerName string + OwnerKind string + Identity string + AuthorityOverride string + Zone *string + ForZones []discovery.ForZone + OpaqueProtocol bool + Hostname *string + } + + // AddressSet is a set of Address, indexed by ID. + // The ID can be either: + // 1) A reference to service: id.Name contains both the service name and + // the target IP and port (see newServiceRefAddress) + // 2) A reference to a pod: id.Name refers to the pod's name, and + // id.IPFamily refers to the ES AddressType (see newPodRefAddress). + // 3) A reference to an ExternalWorkload: id.Name refers to the EW's name. + AddressSet struct { + Addresses map[ID]Address + Labels map[string]string + } +) + +// shallowCopy returns a shallow copy of addr, in the sense that the Pod and +// ExternalWorkload fields of the Addresses map values still point to the +// locations of the original variable +func (addr AddressSet) shallowCopy() AddressSet { + addresses := make(map[ID]Address) + maps.Copy(addresses, addr.Addresses) + + labels := make(map[string]string) + maps.Copy(labels, addr.Labels) + + return AddressSet{ + Addresses: addresses, + Labels: labels, + } +} diff --git a/controller/api/destination/watcher/cluster_store.go b/controller/api/destination/watcher/cluster_store.go index c8161e217420e..789d43415b370 100644 --- a/controller/api/destination/watcher/cluster_store.go +++ b/controller/api/destination/watcher/cluster_store.go @@ -29,6 +29,7 @@ type ( api *k8s.API store map[string]remoteCluster enableEndpointSlices bool + enableIPv6 bool log *logging.Entry // Function used to parse a kubeconfig from a byte buffer. Based on the @@ -69,8 +70,8 @@ const ( // When created, a pair of event handlers are registered for the local cluster's // Secret informer. The event handlers are responsible for driving the discovery // of remote clusters and their configuration -func NewClusterStore(client kubernetes.Interface, namespace string, enableEndpointSlices bool) (*ClusterStore, error) { - return NewClusterStoreWithDecoder(client, namespace, enableEndpointSlices, decodeK8sConfigFromSecret, prometheus.DefaultRegisterer) +func NewClusterStore(client kubernetes.Interface, namespace string, enableEndpointSlices bool, enableIPv6 bool) (*ClusterStore, error) { + return NewClusterStoreWithDecoder(client, namespace, enableEndpointSlices, enableIPv6, decodeK8sConfigFromSecret, prometheus.DefaultRegisterer) } func (cs *ClusterStore) Sync(stopCh <-chan struct{}) { @@ -81,7 +82,7 @@ func (cs *ClusterStore) Sync(stopCh <-chan struct{}) { // store with an arbitrary `configDecoder` function. func NewClusterStoreWithDecoder( client kubernetes.Interface, - namespace string, enableEndpointSlices bool, + namespace string, enableEndpointSlices bool, enableIPv6 bool, decodeFn configDecoder, prom prometheus.Registerer, ) (*ClusterStore, error) { @@ -93,6 +94,7 @@ func NewClusterStoreWithDecoder( "component": "cluster-store", }), enableEndpointSlices: enableEndpointSlices, + enableIPv6: enableIPv6, api: api, decodeFn: decodeFn, } @@ -228,6 +230,7 @@ func (cs *ClusterStore) addCluster(clusterName string, secret *v1.Secret) error "remote-cluster": clusterName, }), cs.enableEndpointSlices, + cs.enableIPv6, clusterName, ) if err != nil { diff --git a/controller/api/destination/watcher/cluster_store_test.go b/controller/api/destination/watcher/cluster_store_test.go index e94b2490d8a00..f4e3acd35e327 100644 --- a/controller/api/destination/watcher/cluster_store_test.go +++ b/controller/api/destination/watcher/cluster_store_test.go @@ -84,7 +84,7 @@ func TestClusterStoreHandlers(t *testing.T) { } prom := prometheus.NewRegistry() - cs, err := NewClusterStoreWithDecoder(k8sAPI.Client, "linkerd", tt.enableEndpointSlices, CreateMockDecoder(), prom) + cs, err := NewClusterStoreWithDecoder(k8sAPI.Client, "linkerd", tt.enableEndpointSlices, false, CreateMockDecoder(), prom) if err != nil { t.Fatalf("Unexpected error when starting watcher cache: %s", err) } diff --git a/controller/api/destination/watcher/endpoints_watcher.go b/controller/api/destination/watcher/endpoints_watcher.go index 394970cc76ec9..0622d93ff6052 100644 --- a/controller/api/destination/watcher/endpoints_watcher.go +++ b/controller/api/destination/watcher/endpoints_watcher.go @@ -1,24 +1,16 @@ package watcher import ( - "context" "fmt" - "net" "sort" - "strconv" - "strings" "sync" "time" - ewv1beta1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1" "github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta3" "github.com/linkerd/linkerd2/controller/k8s" - consts "github.com/linkerd/linkerd2/pkg/k8s" - "github.com/prometheus/client_golang/prometheus" logging "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/intstr" @@ -40,42 +32,6 @@ const endpointTargetRefPod = "Pod" const endpointTargetRefExternalWorkload = "ExternalWorkload" type ( - // Address represents an individual port on a specific endpoint. - // This endpoint might be the result of a the existence of a pod - // that is targeted by this service; alternatively it can be the - // case that this endpoint is not associated with a pod and maps - // to some other IP (i.e. a remote service gateway) - Address struct { - IP string - Port Port - Pod *corev1.Pod - ExternalWorkload *ewv1beta1.ExternalWorkload - OwnerName string - OwnerKind string - Identity string - AuthorityOverride string - Zone *string - ForZones []discovery.ForZone - OpaqueProtocol bool - } - - // AddressSet is a set of Address, indexed by ID. - // The ID can be either: - // 1) A reference to service: id.Name contains both the service name and - // the target IP and port (see newServiceRefAddress) - // 2) A reference to a pod: id.Name refers to the pod's name, and - // id.IPFamily refers to the ES AddressType (see newPodRefAddress). - // 3) A reference to an ExternalWorkload: id.Name refers to the EW's name. - AddressSet struct { - Addresses map[ID]Address - Labels map[string]string - LocalTrafficPolicy bool - } - - portAndHostname struct { - port Port - hostname string - } // EndpointsWatcher watches all endpoints and services in the Kubernetes // cluster. Listeners can subscribe to a particular service and port and @@ -89,6 +45,7 @@ type ( cluster string log *logging.Entry enableEndpointSlices bool + enableIPv6 bool sync.RWMutex // This mutex protects modification of the map itself. informerHandlers @@ -104,55 +61,15 @@ type ( srvHandle cache.ResourceEventHandlerRegistration } - // servicePublisher represents a service. It keeps a map of portPublishers - // keyed by port and hostname. This is because each watch on a service - // will have a port and optionally may specify a hostname. The port - // and hostname will influence the endpoint set which is why a separate - // portPublisher is required for each port and hostname combination. The - // service's port mapping will be applied to the requested port and the - // mapped port will be used in the addresses set. If a hostname is - // requested, the address set will be filtered to only include addresses - // with the requested hostname. - servicePublisher struct { - id ServiceID - log *logging.Entry - k8sAPI *k8s.API - metadataAPI *k8s.MetadataAPI - enableEndpointSlices bool - localTrafficPolicy bool - cluster string - ports map[portAndHostname]*portPublisher - // All access to the servicePublisher and its portPublishers is explicitly synchronized by - // this mutex. - sync.Mutex - } - - // portPublisher represents a service along with a port and optionally a - // hostname. Multiple listeners may be subscribed to a portPublisher. - // portPublisher maintains the current state of the address set and - // publishes diffs to all listeners when updates come from either the - // endpoints API or the service API. - portPublisher struct { - id ServiceID - targetPort namedPort - srcPort Port - hostname string - log *logging.Entry - k8sAPI *k8s.API - metadataAPI *k8s.MetadataAPI - enableEndpointSlices bool - exists bool - addresses AddressSet - listeners []EndpointUpdateListener - metrics endpointsMetrics - localTrafficPolicy bool - } - - // EndpointUpdateListener is the interface that subscribers must implement. EndpointUpdateListener interface { Add(set AddressSet) Remove(set AddressSet) - NoEndpoints(exists bool) + } + + FilterKey struct { + EnableEndpointFiltering bool + NodeName string + Hostname string } ) @@ -160,36 +77,16 @@ var endpointsVecs = newEndpointsMetricsVecs() var undefinedEndpointPort = Port(0) -// shallowCopy returns a shallow copy of addr, in the sense that the Pod and -// ExternalWorkload fields of the Addresses map values still point to the -// locations of the original variable -func (addr AddressSet) shallowCopy() AddressSet { - addresses := make(map[ID]Address) - for k, v := range addr.Addresses { - addresses[k] = v - } - - labels := make(map[string]string) - for k, v := range addr.Labels { - labels[k] = v - } - - return AddressSet{ - Addresses: addresses, - Labels: labels, - LocalTrafficPolicy: addr.LocalTrafficPolicy, - } -} - // NewEndpointsWatcher creates an EndpointsWatcher and begins watching the // k8sAPI for pod, service, and endpoint changes. An EndpointsWatcher will // watch on Endpoints or EndpointSlice resources, depending on cluster configuration. -func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *logging.Entry, enableEndpointSlices bool, cluster string) (*EndpointsWatcher, error) { +func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *logging.Entry, enableEndpointSlices bool, enableIPv6 bool, cluster string) (*EndpointsWatcher, error) { ew := &EndpointsWatcher{ publishers: make(map[ServiceID]*servicePublisher), k8sAPI: k8sAPI, metadataAPI: metadataAPI, enableEndpointSlices: enableEndpointSlices, + enableIPv6: enableIPv6, cluster: cluster, log: log.WithFields(logging.Fields{ "component": "endpoints-watcher", @@ -247,29 +144,29 @@ func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *log // Subscribe to an authority. // The provided listener will be updated each time the address set for the // given authority is changed. -func (ew *EndpointsWatcher) Subscribe(id ServiceID, port Port, hostname string, listener EndpointUpdateListener) error { +func (ew *EndpointsWatcher) Subscribe(id ServiceID, port Port, filterKey FilterKey, listener EndpointUpdateListener) error { svc, _ := ew.k8sAPI.Svc().Lister().Services(id.Namespace).Get(id.Name) if svc != nil && svc.Spec.Type == corev1.ServiceTypeExternalName { return invalidService(id.String()) } - if hostname == "" { + if filterKey.Hostname == "" { ew.log.Debugf("Establishing watch on endpoint [%s:%d]", id, port) } else { - ew.log.Debugf("Establishing watch on endpoint [%s.%s:%d]", hostname, id, port) + ew.log.Debugf("Establishing watch on endpoint [%s.%s:%d]", filterKey.Hostname, id, port) } sp := ew.getOrNewServicePublisher(id) - return sp.subscribe(port, hostname, listener) + return sp.subscribe(port, listener, filterKey) } // Unsubscribe removes a listener from the subscribers list for this authority. -func (ew *EndpointsWatcher) Unsubscribe(id ServiceID, port Port, hostname string, listener EndpointUpdateListener) { - if hostname == "" { +func (ew *EndpointsWatcher) Unsubscribe(id ServiceID, port Port, filterKey FilterKey, listener EndpointUpdateListener, withRemove bool) { + if filterKey.Hostname == "" { ew.log.Debugf("Stopping watch on endpoint [%s:%d]", id, port) } else { - ew.log.Debugf("Stopping watch on endpoint [%s.%s:%d]", hostname, id, port) + ew.log.Debugf("Stopping watch on endpoint [%s.%s:%d]", filterKey.Hostname, id, port) } sp, ok := ew.getServicePublisher(id) @@ -277,7 +174,7 @@ func (ew *EndpointsWatcher) Unsubscribe(id ServiceID, port Port, hostname string ew.log.Errorf("Cannot unsubscribe from unknown service [%s:%d]", id, port) return } - sp.unsubscribe(port, hostname, listener) + sp.unsubscribe(port, listener, filterKey, withRemove) } // removeHandlers will de-register any event handlers used by the @@ -517,8 +414,9 @@ func (ew *EndpointsWatcher) getOrNewServicePublisher(id ServiceID) *servicePubli k8sAPI: ew.k8sAPI, metadataAPI: ew.metadataAPI, cluster: ew.cluster, - ports: make(map[portAndHostname]*portPublisher), + ports: make(map[Port]*portPublisher), enableEndpointSlices: ew.enableEndpointSlices, + enableIPv6: ew.enableIPv6, } ew.publishers[id] = sp } @@ -581,824 +479,6 @@ func (ew *EndpointsWatcher) deleteServer(obj interface{}) { } } -//////////////////////// -/// servicePublisher /// -//////////////////////// - -func (sp *servicePublisher) updateEndpoints(newEndpoints *corev1.Endpoints) { - sp.Lock() - defer sp.Unlock() - sp.log.Debugf("Updating endpoints for %s", sp.id) - for _, port := range sp.ports { - port.updateEndpoints(newEndpoints) - } -} - -func (sp *servicePublisher) deleteEndpoints() { - sp.Lock() - defer sp.Unlock() - sp.log.Debugf("Deleting endpoints for %s", sp.id) - for _, port := range sp.ports { - port.noEndpoints(false) - } -} - -func (sp *servicePublisher) addEndpointSlice(newSlice *discovery.EndpointSlice) { - sp.Lock() - defer sp.Unlock() - - sp.log.Debugf("Adding ES %s/%s", newSlice.Namespace, newSlice.Name) - for _, port := range sp.ports { - port.addEndpointSlice(newSlice) - } -} - -func (sp *servicePublisher) updateEndpointSlice(oldSlice *discovery.EndpointSlice, newSlice *discovery.EndpointSlice) { - sp.Lock() - defer sp.Unlock() - - sp.log.Debugf("Updating ES %s/%s", oldSlice.Namespace, oldSlice.Name) - for _, port := range sp.ports { - port.updateEndpointSlice(oldSlice, newSlice) - } -} - -func (sp *servicePublisher) deleteEndpointSlice(es *discovery.EndpointSlice) { - sp.Lock() - defer sp.Unlock() - - sp.log.Debugf("Deleting ES %s/%s", es.Namespace, es.Name) - for _, port := range sp.ports { - port.deleteEndpointSlice(es) - } -} - -func (sp *servicePublisher) updateService(newService *corev1.Service) { - sp.Lock() - defer sp.Unlock() - sp.log.Debugf("Updating service for %s", sp.id) - - // set localTrafficPolicy to true if InternalTrafficPolicy is set to local - if newService.Spec.InternalTrafficPolicy != nil { - sp.localTrafficPolicy = *newService.Spec.InternalTrafficPolicy == corev1.ServiceInternalTrafficPolicyLocal - } else { - sp.localTrafficPolicy = false - } - - for key, port := range sp.ports { - newTargetPort := getTargetPort(newService, key.port) - if newTargetPort != port.targetPort { - port.updatePort(newTargetPort) - } - // update service endpoints with new localTrafficPolicy - if port.localTrafficPolicy != sp.localTrafficPolicy { - port.updateLocalTrafficPolicy(sp.localTrafficPolicy) - } - } - -} - -func (sp *servicePublisher) subscribe(srcPort Port, hostname string, listener EndpointUpdateListener) error { - sp.Lock() - defer sp.Unlock() - - key := portAndHostname{ - port: srcPort, - hostname: hostname, - } - port, ok := sp.ports[key] - if !ok { - var err error - port, err = sp.newPortPublisher(srcPort, hostname) - if err != nil { - return err - } - sp.ports[key] = port - } - port.subscribe(listener) - return nil -} - -func (sp *servicePublisher) unsubscribe(srcPort Port, hostname string, listener EndpointUpdateListener) { - sp.Lock() - defer sp.Unlock() - - key := portAndHostname{ - port: srcPort, - hostname: hostname, - } - port, ok := sp.ports[key] - if ok { - port.unsubscribe(listener) - if len(port.listeners) == 0 { - endpointsVecs.unregister(sp.metricsLabels(srcPort, hostname)) - delete(sp.ports, key) - } - } -} - -func (sp *servicePublisher) newPortPublisher(srcPort Port, hostname string) (*portPublisher, error) { - targetPort := intstr.FromInt(int(srcPort)) - svc, err := sp.k8sAPI.Svc().Lister().Services(sp.id.Namespace).Get(sp.id.Name) - if err != nil && !apierrors.IsNotFound(err) { - sp.log.Errorf("error getting service: %s", err) - } - exists := false - if err == nil { - targetPort = getTargetPort(svc, srcPort) - exists = true - } - - log := sp.log.WithField("port", srcPort) - - metrics, err := endpointsVecs.newEndpointsMetrics(sp.metricsLabels(srcPort, hostname)) - if err != nil { - return nil, err - } - port := &portPublisher{ - listeners: []EndpointUpdateListener{}, - targetPort: targetPort, - srcPort: srcPort, - hostname: hostname, - exists: exists, - k8sAPI: sp.k8sAPI, - metadataAPI: sp.metadataAPI, - log: log, - metrics: metrics, - enableEndpointSlices: sp.enableEndpointSlices, - localTrafficPolicy: sp.localTrafficPolicy, - } - - if port.enableEndpointSlices { - matchLabels := map[string]string{discovery.LabelServiceName: sp.id.Name} - selector := labels.Set(matchLabels).AsSelector() - - sliceList, err := sp.k8sAPI.ES().Lister().EndpointSlices(sp.id.Namespace).List(selector) - if err != nil && !apierrors.IsNotFound(err) { - sp.log.Errorf("error getting endpointSlice list: %s", err) - } - if err == nil { - for _, slice := range sliceList { - port.addEndpointSlice(slice) - } - } - } else { - endpoints, err := sp.k8sAPI.Endpoint().Lister().Endpoints(sp.id.Namespace).Get(sp.id.Name) - if err != nil && !apierrors.IsNotFound(err) { - sp.log.Errorf("error getting endpoints: %s", err) - } - if err == nil { - port.updateEndpoints(endpoints) - } - } - - return port, nil -} - -func (sp *servicePublisher) metricsLabels(port Port, hostname string) prometheus.Labels { - return endpointsLabels(sp.cluster, sp.id.Namespace, sp.id.Name, strconv.Itoa(int(port)), hostname) -} - -func (sp *servicePublisher) updateServer(oldServer, newServer *v1beta3.Server) { - sp.Lock() - defer sp.Unlock() - - for _, pp := range sp.ports { - pp.updateServer(oldServer, newServer) - } -} - -///////////////////// -/// portPublisher /// -///////////////////// - -// Note that portPublishers methods are generally NOT thread-safe. You should -// hold the parent servicePublisher's mutex before calling methods on a -// portPublisher. - -func (pp *portPublisher) updateEndpoints(endpoints *corev1.Endpoints) { - newAddressSet := pp.endpointsToAddresses(endpoints) - if len(newAddressSet.Addresses) == 0 { - for _, listener := range pp.listeners { - listener.NoEndpoints(true) - } - } else { - add, remove := diffAddresses(pp.addresses, newAddressSet) - for _, listener := range pp.listeners { - if len(remove.Addresses) > 0 { - listener.Remove(remove) - } - if len(add.Addresses) > 0 { - listener.Add(add) - } - } - } - pp.addresses = newAddressSet - pp.exists = true - pp.metrics.incUpdates() - pp.metrics.setPods(len(pp.addresses.Addresses)) - pp.metrics.setExists(true) -} - -func (pp *portPublisher) addEndpointSlice(slice *discovery.EndpointSlice) { - newAddressSet := pp.endpointSliceToAddresses(slice) - for id, addr := range pp.addresses.Addresses { - if _, ok := newAddressSet.Addresses[id]; !ok { - newAddressSet.Addresses[id] = addr - } - } - - add, _ := diffAddresses(pp.addresses, newAddressSet) - if len(add.Addresses) > 0 { - for _, listener := range pp.listeners { - listener.Add(add) - } - } - - // even if the ES doesn't have addresses yet we need to create a new - // pp.addresses entry with the appropriate Labels and LocalTrafficPolicy, - // which isn't going to be captured during the ES update event when - // addresses get added - - pp.addresses = newAddressSet - pp.exists = true - pp.metrics.incUpdates() - pp.metrics.setPods(len(pp.addresses.Addresses)) - pp.metrics.setExists(true) -} - -func (pp *portPublisher) updateEndpointSlice(oldSlice *discovery.EndpointSlice, newSlice *discovery.EndpointSlice) { - updatedAddressSet := AddressSet{ - Addresses: make(map[ID]Address), - Labels: pp.addresses.Labels, - LocalTrafficPolicy: pp.localTrafficPolicy, - } - - for id, address := range pp.addresses.Addresses { - updatedAddressSet.Addresses[id] = address - } - - for _, id := range pp.endpointSliceToIDs(oldSlice) { - delete(updatedAddressSet.Addresses, id) - } - - newAddressSet := pp.endpointSliceToAddresses(newSlice) - for id, address := range newAddressSet.Addresses { - updatedAddressSet.Addresses[id] = address - } - - add, remove := diffAddresses(pp.addresses, updatedAddressSet) - for _, listener := range pp.listeners { - if len(remove.Addresses) > 0 { - listener.Remove(remove) - } - if len(add.Addresses) > 0 { - listener.Add(add) - } - } - - pp.addresses = updatedAddressSet - pp.exists = true - pp.metrics.incUpdates() - pp.metrics.setPods(len(pp.addresses.Addresses)) - pp.metrics.setExists(true) -} - -func metricLabels(resource interface{}) map[string]string { - var serviceName, ns string - var resLabels, resAnnotations map[string]string - switch res := resource.(type) { - case *corev1.Endpoints: - { - serviceName, ns = res.Name, res.Namespace - resLabels, resAnnotations = res.Labels, res.Annotations - } - case *discovery.EndpointSlice: - { - serviceName, ns = res.Labels[discovery.LabelServiceName], res.Namespace - resLabels, resAnnotations = res.Labels, res.Annotations - } - } - - labels := map[string]string{service: serviceName, namespace: ns} - - remoteClusterName, hasRemoteClusterName := resLabels[consts.RemoteClusterNameLabel] - serviceFqn, hasServiceFqn := resAnnotations[consts.RemoteServiceFqName] - - if hasRemoteClusterName { - // this means we are looking at Endpoints created for the purpose of mirroring - // an out of cluster service. - labels[targetCluster] = remoteClusterName - if hasServiceFqn { - fqParts := strings.Split(serviceFqn, ".") - if len(fqParts) >= 2 { - labels[targetService] = fqParts[0] - labels[targetServiceNamespace] = fqParts[1] - } - } - } - return labels -} - -func (pp *portPublisher) endpointSliceToAddresses(es *discovery.EndpointSlice) AddressSet { - resolvedPort := pp.resolveESTargetPort(es.Ports) - if resolvedPort == undefinedEndpointPort { - return AddressSet{ - Labels: metricLabels(es), - Addresses: make(map[ID]Address), - LocalTrafficPolicy: pp.localTrafficPolicy, - } - } - - serviceID, err := getEndpointSliceServiceID(es) - if err != nil { - pp.log.Errorf("Could not fetch resource service name:%v", err) - } - - addresses := make(map[ID]Address) - for _, endpoint := range es.Endpoints { - if endpoint.Hostname != nil { - if pp.hostname != "" && pp.hostname != *endpoint.Hostname { - continue - } - } - if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready { - continue - } - - if endpoint.TargetRef == nil { - for _, IPAddr := range endpoint.Addresses { - var authorityOverride string - if fqName, ok := es.Annotations[consts.RemoteServiceFqName]; ok { - authorityOverride = net.JoinHostPort(fqName, fmt.Sprintf("%d", pp.srcPort)) - } - - identity := es.Annotations[consts.RemoteGatewayIdentity] - address, id := pp.newServiceRefAddress(resolvedPort, IPAddr, serviceID.Name, es.Namespace) - address.Identity, address.AuthorityOverride = identity, authorityOverride - - if endpoint.Hints != nil { - zones := make([]discovery.ForZone, len(endpoint.Hints.ForZones)) - copy(zones, endpoint.Hints.ForZones) - address.ForZones = zones - } - addresses[id] = address - } - continue - } - - if endpoint.TargetRef.Kind == endpointTargetRefPod { - for _, IPAddr := range endpoint.Addresses { - address, id, err := pp.newPodRefAddress( - resolvedPort, - es.AddressType, - IPAddr, - endpoint.TargetRef.Name, - endpoint.TargetRef.Namespace, - ) - if err != nil { - pp.log.Errorf("Unable to create new address:%v", err) - continue - } - err = SetToServerProtocol(pp.k8sAPI, &address, pp.log) - if err != nil { - pp.log.Errorf("failed to set address OpaqueProtocol: %s", err) - } - - address.Zone = endpoint.Zone - if endpoint.Hints != nil { - zones := make([]discovery.ForZone, len(endpoint.Hints.ForZones)) - copy(zones, endpoint.Hints.ForZones) - address.ForZones = zones - } - addresses[id] = address - } - } - - if endpoint.TargetRef.Kind == endpointTargetRefExternalWorkload { - for _, IPAddr := range endpoint.Addresses { - address, id, err := pp.newExtRefAddress(resolvedPort, IPAddr, endpoint.TargetRef.Name, es.Namespace) - if err != nil { - pp.log.Errorf("Unable to create new address: %v", err) - continue - } - - err = SetToServerProtocolExternalWorkload(pp.k8sAPI, &address) - if err != nil { - pp.log.Errorf("failed to set address OpaqueProtocol: %s", err) - continue - } - - address.Zone = endpoint.Zone - if endpoint.Hints != nil { - zones := make([]discovery.ForZone, len(endpoint.Hints.ForZones)) - copy(zones, endpoint.Hints.ForZones) - address.ForZones = zones - } - - addresses[id] = address - } - - } - - } - return AddressSet{ - Addresses: addresses, - Labels: metricLabels(es), - LocalTrafficPolicy: pp.localTrafficPolicy, - } -} - -// endpointSliceToIDs is similar to endpointSliceToAddresses but instead returns -// only the IDs of the endpoints rather than the addresses themselves. -func (pp *portPublisher) endpointSliceToIDs(es *discovery.EndpointSlice) []ID { - resolvedPort := pp.resolveESTargetPort(es.Ports) - if resolvedPort == undefinedEndpointPort { - return []ID{} - } - - serviceID, err := getEndpointSliceServiceID(es) - if err != nil { - pp.log.Errorf("Could not fetch resource service name:%v", err) - } - - ids := []ID{} - for _, endpoint := range es.Endpoints { - if endpoint.Hostname != nil { - if pp.hostname != "" && pp.hostname != *endpoint.Hostname { - continue - } - } - if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready { - continue - } - - if endpoint.TargetRef == nil { - for _, IPAddr := range endpoint.Addresses { - ids = append(ids, ServiceID{ - Name: strings.Join([]string{ - serviceID.Name, - IPAddr, - fmt.Sprint(resolvedPort), - }, "-"), - Namespace: es.Namespace, - }) - } - continue - } - - if endpoint.TargetRef.Kind == endpointTargetRefPod { - ids = append(ids, PodID{ - Name: endpoint.TargetRef.Name, - Namespace: endpoint.TargetRef.Namespace, - IPFamily: corev1.IPFamily(es.AddressType), - }) - } else if endpoint.TargetRef.Kind == endpointTargetRefExternalWorkload { - ids = append(ids, ExternalWorkloadID{ - Name: endpoint.TargetRef.Name, - Namespace: endpoint.TargetRef.Namespace, - }) - } - - } - return ids -} - -func (pp *portPublisher) endpointsToAddresses(endpoints *corev1.Endpoints) AddressSet { - addresses := make(map[ID]Address) - for _, subset := range endpoints.Subsets { - resolvedPort := pp.resolveTargetPort(subset) - if resolvedPort == undefinedEndpointPort { - continue - } - for _, endpoint := range subset.Addresses { - if pp.hostname != "" && pp.hostname != endpoint.Hostname { - continue - } - - if endpoint.TargetRef == nil { - var authorityOverride string - if fqName, ok := endpoints.Annotations[consts.RemoteServiceFqName]; ok { - authorityOverride = fmt.Sprintf("%s:%d", fqName, pp.srcPort) - } - - identity := endpoints.Annotations[consts.RemoteGatewayIdentity] - address, id := pp.newServiceRefAddress(resolvedPort, endpoint.IP, endpoints.Name, endpoints.Namespace) - address.Identity, address.AuthorityOverride = identity, authorityOverride - - addresses[id] = address - continue - } - - if endpoint.TargetRef.Kind == endpointTargetRefPod { - address, id, err := pp.newPodRefAddress( - resolvedPort, - "", - endpoint.IP, - endpoint.TargetRef.Name, - endpoint.TargetRef.Namespace, - ) - if err != nil { - pp.log.Errorf("Unable to create new address:%v", err) - continue - } - err = SetToServerProtocol(pp.k8sAPI, &address, pp.log) - if err != nil { - pp.log.Errorf("failed to set address OpaqueProtocol: %s", err) - } - addresses[id] = address - } - } - } - return AddressSet{ - Addresses: addresses, - Labels: metricLabels(endpoints), - } -} - -func (pp *portPublisher) newServiceRefAddress(endpointPort Port, endpointIP, serviceName, serviceNamespace string) (Address, ServiceID) { - id := ServiceID{ - Name: strings.Join([]string{ - serviceName, - endpointIP, - fmt.Sprint(endpointPort), - }, "-"), - Namespace: serviceNamespace, - } - - return Address{IP: endpointIP, Port: endpointPort}, id -} - -func (pp *portPublisher) newPodRefAddress( - endpointPort Port, - ipFamily discovery.AddressType, - endpointIP, - podName, - podNamespace string, -) (Address, PodID, error) { - id := PodID{ - Name: podName, - Namespace: podNamespace, - IPFamily: corev1.IPFamily(ipFamily), - } - pod, err := pp.k8sAPI.Pod().Lister().Pods(id.Namespace).Get(id.Name) - if err != nil { - return Address{}, PodID{}, fmt.Errorf("unable to fetch pod %v: %w", id, err) - } - ownerKind, ownerName, err := pp.metadataAPI.GetOwnerKindAndName(context.Background(), pod, false) - if err != nil { - return Address{}, PodID{}, err - } - addr := Address{ - IP: endpointIP, - Port: endpointPort, - Pod: pod, - OwnerName: ownerName, - OwnerKind: ownerKind, - } - - return addr, id, nil -} - -func (pp *portPublisher) newExtRefAddress(endpointPort Port, endpointIP, externalWorkloadName, externalWorkloadNamespace string) (Address, ExternalWorkloadID, error) { - id := ExternalWorkloadID{ - Name: externalWorkloadName, - Namespace: externalWorkloadNamespace, - } - - ew, err := pp.k8sAPI.ExtWorkload().Lister().ExternalWorkloads(id.Namespace).Get(id.Name) - if err != nil { - return Address{}, ExternalWorkloadID{}, fmt.Errorf("unable to fetch ExternalWorkload %v: %w", id, err) - } - - addr := Address{ - IP: endpointIP, - Port: endpointPort, - ExternalWorkload: ew, - } - - ownerRefs := ew.GetOwnerReferences() - if len(ownerRefs) == 1 { - parent := ownerRefs[0] - addr.OwnerName = parent.Name - addr.OwnerName = strings.ToLower(parent.Kind) - } - - return addr, id, nil -} - -func (pp *portPublisher) resolveESTargetPort(slicePorts []discovery.EndpointPort) Port { - if slicePorts == nil { - return undefinedEndpointPort - } - - switch pp.targetPort.Type { - case intstr.Int: - return Port(pp.targetPort.IntVal) - case intstr.String: - for _, p := range slicePorts { - name := "" - if p.Name != nil { - name = *p.Name - } - if name == pp.targetPort.StrVal { - return Port(*p.Port) - } - } - } - return undefinedEndpointPort -} - -func (pp *portPublisher) resolveTargetPort(subset corev1.EndpointSubset) Port { - switch pp.targetPort.Type { - case intstr.Int: - return Port(pp.targetPort.IntVal) - case intstr.String: - for _, p := range subset.Ports { - if p.Name == pp.targetPort.StrVal { - return Port(p.Port) - } - } - } - return undefinedEndpointPort -} - -func (pp *portPublisher) updateLocalTrafficPolicy(localTrafficPolicy bool) { - pp.localTrafficPolicy = localTrafficPolicy - pp.addresses.LocalTrafficPolicy = localTrafficPolicy - for _, listener := range pp.listeners { - listener.Add(pp.addresses.shallowCopy()) - } -} - -func (pp *portPublisher) updatePort(targetPort namedPort) { - pp.targetPort = targetPort - - if pp.enableEndpointSlices { - matchLabels := map[string]string{discovery.LabelServiceName: pp.id.Name} - selector := labels.Set(matchLabels).AsSelector() - - endpointSlices, err := pp.k8sAPI.ES().Lister().EndpointSlices(pp.id.Namespace).List(selector) - if err == nil { - pp.addresses = AddressSet{} - for _, slice := range endpointSlices { - pp.addEndpointSlice(slice) - } - } else { - pp.log.Errorf("Unable to get EndpointSlices during port update: %s", err) - } - } else { - endpoints, err := pp.k8sAPI.Endpoint().Lister().Endpoints(pp.id.Namespace).Get(pp.id.Name) - if err == nil { - pp.updateEndpoints(endpoints) - } else { - pp.log.Errorf("Unable to get endpoints during port update: %s", err) - } - } -} - -func (pp *portPublisher) deleteEndpointSlice(es *discovery.EndpointSlice) { - addrSet := pp.endpointSliceToAddresses(es) - for id := range addrSet.Addresses { - delete(pp.addresses.Addresses, id) - } - - for _, listener := range pp.listeners { - listener.Remove(addrSet) - } - - if len(pp.addresses.Addresses) == 0 { - pp.noEndpoints(false) - } else { - pp.exists = true - pp.metrics.incUpdates() - pp.metrics.setPods(len(pp.addresses.Addresses)) - pp.metrics.setExists(true) - } -} - -func (pp *portPublisher) noEndpoints(exists bool) { - pp.exists = exists - pp.addresses = AddressSet{} - for _, listener := range pp.listeners { - listener.NoEndpoints(exists) - } - - pp.metrics.incUpdates() - pp.metrics.setExists(exists) - pp.metrics.setPods(0) -} - -func (pp *portPublisher) subscribe(listener EndpointUpdateListener) { - if pp.exists { - if len(pp.addresses.Addresses) > 0 { - listener.Add(pp.addresses.shallowCopy()) - } else { - listener.NoEndpoints(true) - } - } else { - listener.NoEndpoints(false) - } - pp.listeners = append(pp.listeners, listener) - - pp.metrics.setSubscribers(len(pp.listeners)) -} - -func (pp *portPublisher) unsubscribe(listener EndpointUpdateListener) { - for i, e := range pp.listeners { - if e == listener { - n := len(pp.listeners) - pp.listeners[i] = pp.listeners[n-1] - pp.listeners[n-1] = nil - pp.listeners = pp.listeners[:n-1] - break - } - } - - pp.metrics.setSubscribers(len(pp.listeners)) -} -func (pp *portPublisher) updateServer(oldServer, newServer *v1beta3.Server) { - updated := false - for id, address := range pp.addresses.Addresses { - - if pp.isAddressSelected(address, oldServer) || pp.isAddressSelected(address, newServer) { - if newServer != nil && pp.isAddressSelected(address, newServer) && newServer.Spec.ProxyProtocol == opaqueProtocol { - address.OpaqueProtocol = true - } else { - address.OpaqueProtocol = false - } - if pp.addresses.Addresses[id].OpaqueProtocol != address.OpaqueProtocol { - pp.addresses.Addresses[id] = address - updated = true - } - } - } - if updated { - for _, listener := range pp.listeners { - listener.Add(pp.addresses.shallowCopy()) - } - pp.metrics.incUpdates() - } -} - -func (pp *portPublisher) isAddressSelected(address Address, server *v1beta3.Server) bool { - if server == nil { - return false - } - - if address.Pod != nil { - selector, err := metav1.LabelSelectorAsSelector(server.Spec.PodSelector) - if err != nil { - pp.log.Errorf("failed to create Selector: %s", err) - return false - } - - if !selector.Matches(labels.Set(address.Pod.Labels)) { - return false - } - - switch server.Spec.Port.Type { - case intstr.Int: - if server.Spec.Port.IntVal == int32(address.Port) { - return true - } - case intstr.String: - for _, c := range append(address.Pod.Spec.InitContainers, address.Pod.Spec.Containers...) { - for _, p := range c.Ports { - if p.ContainerPort == int32(address.Port) && p.Name == server.Spec.Port.StrVal { - return true - } - } - } - } - - } else if address.ExternalWorkload != nil { - selector, err := metav1.LabelSelectorAsSelector(server.Spec.ExternalWorkloadSelector) - if err != nil { - pp.log.Errorf("failed to create Selector: %s", err) - return false - } - - if !selector.Matches(labels.Set(address.ExternalWorkload.Labels)) { - return false - } - - switch server.Spec.Port.Type { - case intstr.Int: - if server.Spec.Port.IntVal == int32(address.Port) { - return true - } - case intstr.String: - for _, p := range address.ExternalWorkload.Spec.Ports { - if p.Port == int32(address.Port) && p.Name == server.Spec.Port.StrVal { - return true - } - } - } - } - return false -} - //////////// /// util /// //////////// @@ -1458,6 +538,10 @@ func addressChanged(oldAddress Address, newAddress Address) bool { } } + if oldAddress.OpaqueProtocol != newAddress.OpaqueProtocol { + return true + } + if oldAddress.Pod != nil && newAddress.Pod != nil { // if these addresses are owned by pods we can check the resource versions return oldAddress.Pod.ResourceVersion != newAddress.Pod.ResourceVersion @@ -1487,9 +571,8 @@ func diffAddresses(oldAddresses, newAddresses AddressSet) (add, remove AddressSe } } add = AddressSet{ - Addresses: addAddresses, - Labels: newAddresses.Labels, - LocalTrafficPolicy: newAddresses.LocalTrafficPolicy, + Addresses: addAddresses, + Labels: newAddresses.Labels, } remove = AddressSet{ Addresses: removeAddresses, diff --git a/controller/api/destination/watcher/endpoints_watcher_test.go b/controller/api/destination/watcher/endpoints_watcher_test.go index 3c1c20ae1349d..a45eb9f98b633 100644 --- a/controller/api/destination/watcher/endpoints_watcher_test.go +++ b/controller/api/destination/watcher/endpoints_watcher_test.go @@ -20,11 +20,8 @@ import ( ) type bufferingEndpointListener struct { - added []string - removed []string - localTrafficPolicy bool - noEndpointsCalled bool - noEndpointsExist bool + added []string + removed []string sync.Mutex } @@ -63,25 +60,16 @@ func (bel *bufferingEndpointListener) ExpectRemoved(expected []string, t *testin testCompare(t, expected, bel.removed) } -func (bel *bufferingEndpointListener) endpointsAreNotCalled() bool { - bel.Lock() - defer bel.Unlock() - return bel.noEndpointsCalled -} - -func (bel *bufferingEndpointListener) endpointsDoNotExist() bool { - bel.Lock() - defer bel.Unlock() - return bel.noEndpointsExist -} - func (bel *bufferingEndpointListener) Add(set AddressSet) { bel.Lock() defer bel.Unlock() for _, address := range set.Addresses { bel.added = append(bel.added, addressString(address)) } - bel.localTrafficPolicy = set.LocalTrafficPolicy +} + +func (bel *bufferingEndpointListener) AddFiltered(set AddressSet) { + bel.Add(set) } func (bel *bufferingEndpointListener) Remove(set AddressSet) { @@ -90,14 +78,26 @@ func (bel *bufferingEndpointListener) Remove(set AddressSet) { for _, address := range set.Addresses { bel.removed = append(bel.removed, addressString(address)) } - bel.localTrafficPolicy = set.LocalTrafficPolicy } -func (bel *bufferingEndpointListener) NoEndpoints(exists bool) { - bel.Lock() - defer bel.Unlock() - bel.noEndpointsCalled = true - bel.noEndpointsExist = exists +func (bel *bufferingEndpointListener) RemoveFiltered(set AddressSet) { + bel.Remove(set) +} + +func (bel *bufferingEndpointListener) NodeName() string { + return "" +} + +func (bel *bufferingEndpointListener) NodeTopologyZone() string { + return "" +} + +func (bel *bufferingEndpointListener) EnableEndpointFiltering() bool { + return false +} + +func (bel *bufferingEndpointListener) EnableIPv6() bool { + return false } type bufferingEndpointListenerWithResVersion struct { @@ -140,6 +140,10 @@ func (bel *bufferingEndpointListenerWithResVersion) Add(set AddressSet) { } } +func (bel *bufferingEndpointListenerWithResVersion) AddFiltered(set AddressSet) { + bel.Add(set) +} + func (bel *bufferingEndpointListenerWithResVersion) Remove(set AddressSet) { bel.Lock() defer bel.Unlock() @@ -148,19 +152,41 @@ func (bel *bufferingEndpointListenerWithResVersion) Remove(set AddressSet) { } } +func (bel *bufferingEndpointListenerWithResVersion) RemoveFiltered(set AddressSet) { + bel.Remove(set) +} + func (bel *bufferingEndpointListenerWithResVersion) NoEndpoints(exists bool) {} +func (bel *bufferingEndpointListenerWithResVersion) NodeName() string { + return "" +} + +func (bel *bufferingEndpointListenerWithResVersion) NodeTopologyZone() string { + return "" +} + +func (bel *bufferingEndpointListenerWithResVersion) EnableEndpointFiltering() bool { + return false +} + +func (bel *bufferingEndpointListenerWithResVersion) EnableIPv6() bool { + return false +} + +func testFilterKey(hostname string) FilterKey { + return FilterKey{Hostname: hostname} +} + func TestEndpointsWatcher(t *testing.T) { for _, tt := range []struct { - serviceType string - k8sConfigs []string - id ServiceID - hostname string - port Port - expectedAddresses []string - expectedNoEndpoints bool - expectedNoEndpointsServiceExists bool - expectedError bool + serviceType string + k8sConfigs []string + id ServiceID + hostname string + port Port + expectedAddresses []string + expectedError bool }{ { serviceType: "local services", @@ -245,9 +271,7 @@ status: "172.17.0.20:8989", "172.17.0.21:8989", }, - expectedNoEndpoints: false, - expectedNoEndpointsServiceExists: false, - expectedError: false, + expectedError: false, }, { // Test for the issue described in linkerd/linkerd2#1405. @@ -315,9 +339,7 @@ status: "10.233.66.239:8990", "10.233.88.244:8990", }, - expectedNoEndpoints: false, - expectedNoEndpointsServiceExists: false, - expectedError: false, + expectedError: false, }, { // Test for the issue described in linkerd/linkerd2#1853. @@ -369,9 +391,7 @@ status: expectedAddresses: []string{ "10.1.30.135:7779", }, - expectedNoEndpoints: false, - expectedNoEndpointsServiceExists: false, - expectedError: false, + expectedError: false, }, { serviceType: "local services with missing addresses", @@ -428,9 +448,7 @@ status: expectedAddresses: []string{ "172.17.0.25:8989", }, - expectedNoEndpoints: false, - expectedNoEndpointsServiceExists: false, - expectedError: false, + expectedError: false, }, { serviceType: "local services with no endpoints", @@ -445,12 +463,10 @@ spec: ports: - port: 7979`, }, - id: ServiceID{Name: "name2", Namespace: "ns"}, - port: 7979, - expectedAddresses: []string{}, - expectedNoEndpoints: true, - expectedNoEndpointsServiceExists: true, - expectedError: false, + id: ServiceID{Name: "name2", Namespace: "ns"}, + port: 7979, + expectedAddresses: []string{}, + expectedError: false, }, { serviceType: "external name services", @@ -464,22 +480,18 @@ spec: type: ExternalName externalName: foo`, }, - id: ServiceID{Name: "name3", Namespace: "ns"}, - port: 6969, - expectedAddresses: []string{}, - expectedNoEndpoints: false, - expectedNoEndpointsServiceExists: false, - expectedError: true, + id: ServiceID{Name: "name3", Namespace: "ns"}, + port: 6969, + expectedAddresses: []string{}, + expectedError: true, }, { - serviceType: "services that do not yet exist", - k8sConfigs: []string{}, - id: ServiceID{Name: "name4", Namespace: "ns"}, - port: 5959, - expectedAddresses: []string{}, - expectedNoEndpoints: true, - expectedNoEndpointsServiceExists: false, - expectedError: false, + serviceType: "services that do not yet exist", + k8sConfigs: []string{}, + id: ServiceID{Name: "name4", Namespace: "ns"}, + port: 5959, + expectedAddresses: []string{}, + expectedError: false, }, { serviceType: "stateful sets", @@ -558,12 +570,10 @@ status: phase: Running podIP: 172.17.0.20`, }, - id: ServiceID{Name: "name1", Namespace: "ns"}, - hostname: "name1-3", - port: 5959, - expectedAddresses: []string{"172.17.0.20:5959"}, - expectedNoEndpoints: false, - expectedNoEndpointsServiceExists: false, + id: ServiceID{Name: "name1", Namespace: "ns"}, + hostname: "name1-3", + port: 5959, + expectedAddresses: []string{"172.17.0.20:5959"}, }, { serviceType: "local service with new named port mid rollout and two subsets but only first subset is relevant", @@ -658,9 +668,7 @@ status: "172.17.0.1:8989", "172.17.0.2:8989", }, - expectedNoEndpoints: false, - expectedNoEndpointsServiceExists: false, - expectedError: false, + expectedError: false, }, } { tt := tt // pin @@ -675,7 +683,7 @@ status: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, "local") + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, false, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } @@ -685,7 +693,7 @@ status: listener := newBufferingEndpointListener() - err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener) + err = watcher.Subscribe(tt.id, tt.port, testFilterKey(tt.hostname), listener) if tt.expectedError && err == nil { t.Fatal("Expected error but was ok") } @@ -694,32 +702,20 @@ status: } listener.ExpectAdded(tt.expectedAddresses, t) - - if listener.endpointsAreNotCalled() != tt.expectedNoEndpoints { - t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]", - tt.expectedNoEndpoints, listener.endpointsAreNotCalled()) - } - - if listener.endpointsDoNotExist() != tt.expectedNoEndpointsServiceExists { - t.Fatalf("Expected noEndpointsExist to be [%t], got [%t]", - tt.expectedNoEndpointsServiceExists, listener.endpointsDoNotExist()) - } }) } } func TestEndpointsWatcherWithEndpointSlices(t *testing.T) { for _, tt := range []struct { - serviceType string - k8sConfigs []string - id ServiceID - hostname string - port Port - expectedAddresses []string - expectedNoEndpoints bool - expectedNoEndpointsServiceExists bool - expectedError bool - expectedLocalTrafficPolicy bool + serviceType string + k8sConfigs []string + id ServiceID + hostname string + port Port + expectedAddresses []string + expectedError bool + expectedLocalTrafficPolicy bool }{ { serviceType: "local services with EndpointSlice", @@ -846,10 +842,8 @@ status: "172.17.0.20:8989", "172.17.0.21:8989", }, - expectedNoEndpoints: false, - expectedNoEndpointsServiceExists: false, - expectedError: false, - expectedLocalTrafficPolicy: true, + expectedError: false, + expectedLocalTrafficPolicy: true, }, { serviceType: "local services with missing addresses and EndpointSlice", @@ -935,12 +929,10 @@ status: podIP: 172.17.0.25 phase: Running`, }, - id: ServiceID{Name: "name-1", Namespace: "ns"}, - port: 8989, - expectedAddresses: []string{"172.17.0.25:8989"}, - expectedNoEndpoints: false, - expectedNoEndpointsServiceExists: false, - expectedError: false, + id: ServiceID{Name: "name-1", Namespace: "ns"}, + port: 8989, + expectedAddresses: []string{"172.17.0.25:8989"}, + expectedError: false, }, { serviceType: "local services with no EndpointSlices", @@ -973,12 +965,10 @@ spec: ports: - port: 7979`, }, - id: ServiceID{Name: "name-2", Namespace: "ns"}, - port: 7979, - expectedAddresses: []string{}, - expectedNoEndpoints: true, - expectedNoEndpointsServiceExists: true, - expectedError: false, + id: ServiceID{Name: "name-2", Namespace: "ns"}, + port: 7979, + expectedAddresses: []string{}, + expectedError: false, }, { serviceType: "external name services with EndpointSlices", @@ -1010,22 +1000,18 @@ spec: type: ExternalName externalName: foo`, }, - id: ServiceID{Name: "name-3-external-svc", Namespace: "ns"}, - port: 7777, - expectedAddresses: []string{}, - expectedNoEndpoints: false, - expectedNoEndpointsServiceExists: false, - expectedError: true, + id: ServiceID{Name: "name-3-external-svc", Namespace: "ns"}, + port: 7777, + expectedAddresses: []string{}, + expectedError: true, }, { - serviceType: "services that do not exist", - k8sConfigs: []string{}, - id: ServiceID{Name: "name-4-inexistent-svc", Namespace: "ns"}, - port: 5555, - expectedAddresses: []string{}, - expectedNoEndpoints: true, - expectedNoEndpointsServiceExists: false, - expectedError: false, + serviceType: "services that do not exist", + k8sConfigs: []string{}, + id: ServiceID{Name: "name-4-inexistent-svc", Namespace: "ns"}, + port: 5555, + expectedAddresses: []string{}, + expectedError: false, }, { serviceType: "stateful sets with EndpointSlices", @@ -1138,13 +1124,11 @@ status: phase: Running podIP: 172.17.0.20`, }, - id: ServiceID{Name: "name-1", Namespace: "ns"}, - hostname: "name-1-3", - port: 6000, - expectedAddresses: []string{"172.17.0.20:6000"}, - expectedNoEndpoints: false, - expectedNoEndpointsServiceExists: false, - expectedError: false, + id: ServiceID{Name: "name-1", Namespace: "ns"}, + hostname: "name-1-3", + port: 6000, + expectedAddresses: []string{"172.17.0.20:6000"}, + expectedError: false, }, { serviceType: "service with EndpointSlice without labels", @@ -1210,12 +1194,10 @@ status: phase: Running podIP: 172.17.0.12`, }, - id: ServiceID{Name: "name-5", Namespace: "ns"}, - port: 8989, - expectedAddresses: []string{}, - expectedNoEndpoints: true, - expectedNoEndpointsServiceExists: true, - expectedError: false, + id: ServiceID{Name: "name-5", Namespace: "ns"}, + port: 8989, + expectedAddresses: []string{}, + expectedError: false, }, { serviceType: "service with IPv6 address type EndpointSlice", @@ -1284,12 +1266,10 @@ status: phase: Running podIP: 0:0:0:0:0:0:0:1`, }, - id: ServiceID{Name: "name-5", Namespace: "ns"}, - port: 9000, - expectedAddresses: []string{}, - expectedNoEndpoints: true, - expectedNoEndpointsServiceExists: true, - expectedError: false, + id: ServiceID{Name: "name-5", Namespace: "ns"}, + port: 9000, + expectedAddresses: []string{}, + expectedError: false, }} { tt := tt // pin t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) { @@ -1303,7 +1283,7 @@ status: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local") + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, false, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } @@ -1313,7 +1293,7 @@ status: listener := newBufferingEndpointListener() - err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener) + err = watcher.Subscribe(tt.id, tt.port, testFilterKey(tt.hostname), listener) if tt.expectedError && err == nil { t.Fatal("Expected error but was ok") } @@ -1321,37 +1301,21 @@ status: t.Fatalf("Expected no error, got [%s]", err) } - if listener.localTrafficPolicy != tt.expectedLocalTrafficPolicy { - t.Fatalf("Expected localTrafficPolicy [%v], got [%v]", tt.expectedLocalTrafficPolicy, listener.localTrafficPolicy) - } - listener.ExpectAdded(tt.expectedAddresses, t) - - if listener.endpointsAreNotCalled() != tt.expectedNoEndpoints { - t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]", - tt.expectedNoEndpoints, listener.endpointsAreNotCalled()) - } - - if listener.endpointsDoNotExist() != tt.expectedNoEndpointsServiceExists { - t.Fatalf("Expected noEndpointsExist to be [%t], got [%t]", - tt.expectedNoEndpointsServiceExists, listener.endpointsDoNotExist()) - } }) } } func TestEndpointsWatcherWithEndpointSlicesExternalWorkload(t *testing.T) { for _, tt := range []struct { - serviceType string - k8sConfigs []string - id ServiceID - hostname string - port Port - expectedAddresses []string - expectedNoEndpoints bool - expectedNoEndpointsServiceExists bool - expectedError bool - expectedLocalTrafficPolicy bool + serviceType string + k8sConfigs []string + id ServiceID + hostname string + port Port + expectedAddresses []string + expectedError bool + expectedLocalTrafficPolicy bool }{ { serviceType: "local services with EndpointSlice", @@ -1469,10 +1433,8 @@ status: "172.17.0.20:8989", "172.17.0.21:8989", }, - expectedNoEndpoints: false, - expectedNoEndpointsServiceExists: false, - expectedError: false, - expectedLocalTrafficPolicy: true, + expectedError: false, + expectedLocalTrafficPolicy: true, }, { serviceType: "local services with missing addresses and EndpointSlice", @@ -1555,12 +1517,10 @@ status: conditions: ready: true`, }, - id: ServiceID{Name: "name-1", Namespace: "ns"}, - port: 8989, - expectedAddresses: []string{"172.17.0.25:8989"}, - expectedNoEndpoints: false, - expectedNoEndpointsServiceExists: false, - expectedError: false, + id: ServiceID{Name: "name-1", Namespace: "ns"}, + port: 8989, + expectedAddresses: []string{"172.17.0.25:8989"}, + expectedError: false, }, { serviceType: "service with EndpointSlice without labels", @@ -1623,12 +1583,10 @@ status: conditions: ready: true`, }, - id: ServiceID{Name: "name-5", Namespace: "ns"}, - port: 8989, - expectedAddresses: []string{}, - expectedNoEndpoints: true, - expectedNoEndpointsServiceExists: true, - expectedError: false, + id: ServiceID{Name: "name-5", Namespace: "ns"}, + port: 8989, + expectedAddresses: []string{}, + expectedError: false, }, { @@ -1695,12 +1653,10 @@ status: conditions: ready: true`, }, - id: ServiceID{Name: "name-5", Namespace: "ns"}, - port: 9000, - expectedAddresses: []string{}, - expectedNoEndpoints: true, - expectedNoEndpointsServiceExists: true, - expectedError: false, + id: ServiceID{Name: "name-5", Namespace: "ns"}, + port: 9000, + expectedAddresses: []string{}, + expectedError: false, }, } { tt := tt // pin @@ -1715,7 +1671,7 @@ status: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local") + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, false, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } @@ -1725,7 +1681,7 @@ status: listener := newBufferingEndpointListener() - err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener) + err = watcher.Subscribe(tt.id, tt.port, testFilterKey(tt.hostname), listener) if tt.expectedError && err == nil { t.Fatal("Expected error but was ok") } @@ -1733,21 +1689,7 @@ status: t.Fatalf("Expected no error, got [%s]", err) } - if listener.localTrafficPolicy != tt.expectedLocalTrafficPolicy { - t.Fatalf("Expected localTrafficPolicy [%v], got [%v]", tt.expectedLocalTrafficPolicy, listener.localTrafficPolicy) - } - listener.ExpectAdded(tt.expectedAddresses, t) - - if listener.endpointsAreNotCalled() != tt.expectedNoEndpoints { - t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]", - tt.expectedNoEndpoints, listener.endpointsAreNotCalled()) - } - - if listener.endpointsDoNotExist() != tt.expectedNoEndpointsServiceExists { - t.Fatalf("Expected noEndpointsExist to be [%t], got [%t]", - tt.expectedNoEndpointsServiceExists, listener.endpointsDoNotExist()) - } }) } } @@ -1792,7 +1734,6 @@ status: serviceType string k8sConfigs []string id ServiceID - hostname string port Port objectToDelete interface{} deletingServices bool @@ -1802,7 +1743,6 @@ status: k8sConfigs: k8sConfigs, id: ServiceID{Name: "name1", Namespace: "ns"}, port: 8989, - hostname: "name1-1", objectToDelete: &corev1.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: "name1", Namespace: "ns"}}, }, { @@ -1810,7 +1750,6 @@ status: k8sConfigs: k8sConfigs, id: ServiceID{Name: "name1", Namespace: "ns"}, port: 8989, - hostname: "name1-1", objectToDelete: &corev1.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: "name1", Namespace: "ns"}}, }, { @@ -1818,7 +1757,6 @@ status: k8sConfigs: k8sConfigs, id: ServiceID{Name: "name1", Namespace: "ns"}, port: 8989, - hostname: "name1-1", objectToDelete: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Name: "name1", Namespace: "ns"}}, deletingServices: true, }, @@ -1827,7 +1765,6 @@ status: k8sConfigs: k8sConfigs, id: ServiceID{Name: "name1", Namespace: "ns"}, port: 8989, - hostname: "name1-1", objectToDelete: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Name: "name1", Namespace: "ns"}}, deletingServices: true, }, @@ -1845,7 +1782,7 @@ status: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, "local") + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, false, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } @@ -1855,7 +1792,7 @@ status: listener := newBufferingEndpointListener() - err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener) + err = watcher.Subscribe(tt.id, tt.port, FilterKey{}, listener) if err != nil { t.Fatal(err) } @@ -1866,9 +1803,7 @@ status: watcher.deleteEndpoints(tt.objectToDelete) } - if !listener.endpointsAreNotCalled() { - t.Fatal("Expected NoEndpoints to be Called") - } + listener.ExpectRemoved([]string{"172.17.0.12:8989"}, t) }) } @@ -1966,45 +1901,41 @@ status: podIP: 172.17.0.13`}...) for _, tt := range []struct { - serviceType string - k8sConfigs []string - id ServiceID - hostname string - port Port - objectToDelete interface{} - deletingServices bool - hasSliceAccess bool - noEndpointsCalled bool + serviceType string + k8sConfigs []string + id ServiceID + hostname string + port Port + objectToDelete interface{} + deletingServices bool + hasSliceAccess bool }{ { - serviceType: "can delete an EndpointSlice", - k8sConfigs: k8sConfigsWithES, - id: ServiceID{Name: "name1", Namespace: "ns"}, - port: 8989, - hostname: "name1-1", - objectToDelete: createTestEndpointSlice(consts.PodKind), - hasSliceAccess: true, - noEndpointsCalled: true, + serviceType: "can delete an EndpointSlice", + k8sConfigs: k8sConfigsWithES, + id: ServiceID{Name: "name1", Namespace: "ns"}, + port: 8989, + hostname: "name1-1", + objectToDelete: createTestEndpointSlice(consts.PodKind), + hasSliceAccess: true, }, { - serviceType: "can delete an EndpointSlice when wrapped in a DeletedFinalStateUnknown", - k8sConfigs: k8sConfigsWithES, - id: ServiceID{Name: "name1", Namespace: "ns"}, - port: 8989, - hostname: "name1-1", - objectToDelete: createTestEndpointSlice(consts.PodKind), - hasSliceAccess: true, - noEndpointsCalled: true, + serviceType: "can delete an EndpointSlice when wrapped in a DeletedFinalStateUnknown", + k8sConfigs: k8sConfigsWithES, + id: ServiceID{Name: "name1", Namespace: "ns"}, + port: 8989, + hostname: "name1-1", + objectToDelete: createTestEndpointSlice(consts.PodKind), + hasSliceAccess: true, }, { - serviceType: "can delete an EndpointSlice when there are multiple ones", - k8sConfigs: k8sConfigWithMultipleES, - id: ServiceID{Name: "name1", Namespace: "ns"}, - port: 8989, - hostname: "name1-1", - objectToDelete: createTestEndpointSlice(consts.PodKind), - hasSliceAccess: true, - noEndpointsCalled: false, + serviceType: "can delete an EndpointSlice when there are multiple ones", + k8sConfigs: k8sConfigWithMultipleES, + id: ServiceID{Name: "name1", Namespace: "ns"}, + port: 8989, + hostname: "name1-1", + objectToDelete: createTestEndpointSlice(consts.PodKind), + hasSliceAccess: true, }, } { tt := tt // pin @@ -2019,7 +1950,7 @@ status: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local") + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, false, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } @@ -2029,17 +1960,14 @@ status: listener := newBufferingEndpointListener() - err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener) + err = watcher.Subscribe(tt.id, tt.port, testFilterKey(tt.hostname), listener) if err != nil { t.Fatal(err) } watcher.deleteEndpointSlice(tt.objectToDelete) - if listener.endpointsAreNotCalled() != tt.noEndpointsCalled { - t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]", - tt.noEndpointsCalled, listener.endpointsAreNotCalled()) - } + listener.ExpectRemoved([]string{"172.17.0.12:8989"}, t) }) } } @@ -2136,45 +2064,41 @@ status: ready: true`}...) for _, tt := range []struct { - serviceType string - k8sConfigs []string - id ServiceID - hostname string - port Port - objectToDelete interface{} - deletingServices bool - hasSliceAccess bool - noEndpointsCalled bool + serviceType string + k8sConfigs []string + id ServiceID + hostname string + port Port + objectToDelete interface{} + deletingServices bool + hasSliceAccess bool }{ { - serviceType: "can delete an EndpointSlice", - k8sConfigs: k8sConfigsWithES, - id: ServiceID{Name: "name1", Namespace: "ns"}, - port: 8989, - hostname: "name1-1", - objectToDelete: createTestEndpointSlice(consts.ExtWorkloadKind), - hasSliceAccess: true, - noEndpointsCalled: true, + serviceType: "can delete an EndpointSlice", + k8sConfigs: k8sConfigsWithES, + id: ServiceID{Name: "name1", Namespace: "ns"}, + port: 8989, + hostname: "name1-1", + objectToDelete: createTestEndpointSlice(consts.ExtWorkloadKind), + hasSliceAccess: true, }, { - serviceType: "can delete an EndpointSlice when wrapped in a DeletedFinalStateUnknown", - k8sConfigs: k8sConfigsWithES, - id: ServiceID{Name: "name1", Namespace: "ns"}, - port: 8989, - hostname: "name1-1", - objectToDelete: createTestEndpointSlice(consts.ExtWorkloadKind), - hasSliceAccess: true, - noEndpointsCalled: true, + serviceType: "can delete an EndpointSlice when wrapped in a DeletedFinalStateUnknown", + k8sConfigs: k8sConfigsWithES, + id: ServiceID{Name: "name1", Namespace: "ns"}, + port: 8989, + hostname: "name1-1", + objectToDelete: createTestEndpointSlice(consts.ExtWorkloadKind), + hasSliceAccess: true, }, { - serviceType: "can delete an EndpointSlice when there are multiple ones", - k8sConfigs: k8sConfigWithMultipleES, - id: ServiceID{Name: "name1", Namespace: "ns"}, - port: 8989, - hostname: "name1-1", - objectToDelete: createTestEndpointSlice(consts.ExtWorkloadKind), - hasSliceAccess: true, - noEndpointsCalled: false, + serviceType: "can delete an EndpointSlice when there are multiple ones", + k8sConfigs: k8sConfigWithMultipleES, + id: ServiceID{Name: "name1", Namespace: "ns"}, + port: 8989, + hostname: "name1-1", + objectToDelete: createTestEndpointSlice(consts.ExtWorkloadKind), + hasSliceAccess: true, }, } { tt := tt // pin @@ -2189,7 +2113,7 @@ status: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local") + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, false, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } @@ -2199,32 +2123,27 @@ status: listener := newBufferingEndpointListener() - err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener) + err = watcher.Subscribe(tt.id, tt.port, testFilterKey(tt.hostname), listener) if err != nil { t.Fatal(err) } watcher.deleteEndpointSlice(tt.objectToDelete) - if listener.endpointsAreNotCalled() != tt.noEndpointsCalled { - t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]", - tt.noEndpointsCalled, listener.endpointsAreNotCalled()) - } + listener.ExpectRemoved([]string{"172.17.0.12:8989"}, t) }) } } func TestEndpointsWatcherServiceMirrors(t *testing.T) { for _, tt := range []struct { - serviceType string - k8sConfigs []string - id ServiceID - hostname string - port Port - expectedAddresses []string - expectedNoEndpoints bool - expectedNoEndpointsServiceExists bool - enableEndpointSlices bool + serviceType string + k8sConfigs []string + id ServiceID + hostname string + port Port + expectedAddresses []string + enableEndpointSlices bool }{ { k8sConfigs: []string{` @@ -2260,8 +2179,6 @@ subsets: expectedAddresses: []string{ "172.17.0.12:8989/gateway-identity-1/name1-remote-fq:8989", }, - expectedNoEndpoints: false, - expectedNoEndpointsServiceExists: false, }, { k8sConfigs: []string{` @@ -2298,9 +2215,7 @@ ports: expectedAddresses: []string{ "172.17.0.12:8989/gateway-identity-1/name1-remote-fq:8989", }, - expectedNoEndpoints: false, - expectedNoEndpointsServiceExists: false, - enableEndpointSlices: true, + enableEndpointSlices: true, }, { k8sConfigs: []string{` @@ -2335,8 +2250,6 @@ subsets: expectedAddresses: []string{ "172.17.0.12:8989/name1-remote-fq:8989", }, - expectedNoEndpoints: false, - expectedNoEndpointsServiceExists: false, }, { @@ -2373,8 +2286,6 @@ subsets: expectedAddresses: []string{ "172.17.0.12:9999/gateway-identity-1/name1-remote-fq:8989", }, - expectedNoEndpoints: false, - expectedNoEndpointsServiceExists: false, }, { k8sConfigs: []string{` @@ -2410,8 +2321,6 @@ subsets: expectedAddresses: []string{ "172.17.0.12:9999/name1-remote-fq:8989", }, - expectedNoEndpoints: false, - expectedNoEndpointsServiceExists: false, }, } { tt := tt // pin @@ -2426,7 +2335,7 @@ subsets: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), tt.enableEndpointSlices, "local") + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), tt.enableEndpointSlices, false, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } @@ -2436,23 +2345,13 @@ subsets: listener := newBufferingEndpointListener() - err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener) + err = watcher.Subscribe(tt.id, tt.port, testFilterKey(tt.hostname), listener) if err != nil { t.Fatalf("NewFakeAPI returned an error: %s", err) } listener.ExpectAdded(tt.expectedAddresses, t) - - if listener.endpointsAreNotCalled() != tt.expectedNoEndpoints { - t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]", - tt.expectedNoEndpoints, listener.endpointsAreNotCalled()) - } - - if listener.endpointsDoNotExist() != tt.expectedNoEndpointsServiceExists { - t.Fatalf("Expected noEndpointsExist to be [%t], got [%t]", - tt.expectedNoEndpointsServiceExists, listener.endpointsDoNotExist()) - } }) } } @@ -2595,7 +2494,7 @@ subsets: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, "local") + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, false, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } @@ -2605,7 +2504,7 @@ subsets: listener := newBufferingEndpointListener() - err = watcher.Subscribe(tt.id, tt.port, "", listener) + err = watcher.Subscribe(tt.id, tt.port, testFilterKey(""), listener) if err != nil { t.Fatal(err) } @@ -2727,7 +2626,7 @@ status: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, "local") + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, false, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } @@ -2737,7 +2636,7 @@ status: listener := newBufferingEndpointListenerWithResVersion() - err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener) + err = watcher.Subscribe(tt.id, tt.port, testFilterKey(tt.hostname), listener) if err != nil { t.Fatal(err) } @@ -2829,7 +2728,7 @@ status: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local") + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, false, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } @@ -2839,7 +2738,7 @@ status: listener := newBufferingEndpointListener() - err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener) + err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, testFilterKey(""), listener) if err != nil { t.Fatal(err) } @@ -2992,7 +2891,7 @@ status: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local") + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, false, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } @@ -3002,7 +2901,7 @@ status: listener := newBufferingEndpointListener() - err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener) + err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, testFilterKey(""), listener) if err != nil { t.Fatal(err) } @@ -3179,7 +3078,7 @@ status: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local") + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, false, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } @@ -3189,7 +3088,7 @@ status: listener := newBufferingEndpointListener() - err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener) + err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, testFilterKey(""), listener) if err != nil { t.Fatal(err) } @@ -3299,7 +3198,7 @@ status: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local") + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, false, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } @@ -3309,7 +3208,7 @@ status: listener := newBufferingEndpointListener() - err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener) + err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, testFilterKey(""), listener) if err != nil { t.Fatal(err) } @@ -3419,7 +3318,7 @@ status: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local") + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, false, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } @@ -3429,7 +3328,7 @@ status: listener := newBufferingEndpointListener() - err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener) + err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, testFilterKey(""), listener) if err != nil { t.Fatal(err) } @@ -3462,3 +3361,412 @@ status: listener.ExpectAdded([]string{"172.17.0.12:8989", "172.17.0.12:8989"}, t) } + +func TestEndpointSliceSelectsAddressFamilyAfterZoneFiltering(t *testing.T) { + nodeConfig := ` +apiVersion: v1 +kind: Node +metadata: + name: node-1 + labels: + topology.kubernetes.io/zone: west-1a` + + for _, tc := range []struct { + name string + ipv4Zone string + ipv6Zone string + expectedAddresses []string + }{ + { + name: "Sends IPv6 only when pod has both IPv4 and IPv6", + ipv4Zone: "west-1a", + ipv6Zone: "west-1a", + expectedAddresses: []string{"2001:db8:85a3::8a2e:370:7333:1"}, + }, + { + name: "Sends IPv4 only when pod has both IPv4 and IPv6 but the latter in another zone", + ipv4Zone: "west-1a", + ipv6Zone: "west-1b", + expectedAddresses: []string{"1.1.1.1:1"}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + k8sConfigsWithES := []string{` +kind: APIResourceList +apiVersion: v1 +groupVersion: discovery.k8s.io/v1 +resources: +- name: endpointslices + singularName: endpointslice + namespaced: true + kind: EndpointSlice + verbs: + - delete + - deletecollection + - get + - list + - patch + - create + - update + - watch +`, ` +apiVersion: v1 +kind: Service +metadata: + name: name1 + namespace: ns +spec: + type: LoadBalancer + ports: + - port: 1`, fmt.Sprintf(` +addressType: IPv4 +apiVersion: discovery.k8s.io/v1 +endpoints: +- addresses: + - 1.1.1.1 + conditions: + ready: true + hints: + forZones: + - name: %s + targetRef: + kind: Pod + name: name1-1 + namespace: ns +kind: EndpointSlice +metadata: + labels: + kubernetes.io/service-name: name1 + name: name1-ipv4 + namespace: ns +ports: +- name: "" + port: 1`, tc.ipv4Zone), fmt.Sprintf(` +addressType: IPv6 +apiVersion: discovery.k8s.io/v1 +endpoints: +- addresses: + - 2001:db8:85a3::8a2e:370:7333 + conditions: + ready: true + hints: + forZones: + - name: %s + targetRef: + kind: Pod + name: name1-1 + namespace: ns +kind: EndpointSlice +metadata: + labels: + kubernetes.io/service-name: name1 + name: name1-ipv6 + namespace: ns +ports: +- name: "" + port: 1`, tc.ipv6Zone), ` +apiVersion: v1 +kind: Pod +metadata: + name: name1-1 + namespace: ns +status: + phase: Running + podIP: 1.1.1.1`} + + k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...) + if err != nil { + t.Fatalf("NewFakeAPI returned an error: %s", err) + } + + metadataAPI, err := k8s.NewFakeMetadataAPI([]string{nodeConfig}) + if err != nil { + t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) + } + + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, true, "local") + if err != nil { + t.Fatalf("can't create Endpoints watcher: %s", err) + } + + k8sAPI.Sync(nil) + metadataAPI.Sync(nil) + + listener := newBufferingEndpointListener() + + err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 1, FilterKey{ + EnableEndpointFiltering: true, + NodeName: "node-1", + }, listener) + if err != nil { + t.Fatal(err) + } + + k8sAPI.Sync(nil) + metadataAPI.Sync(nil) + + listener.ExpectAdded(tc.expectedAddresses, t) + listener.ExpectRemoved([]string{}, t) + }) + } +} + +func TestEndpointSliceTopologyAwareFilter(t *testing.T) { + nodeConfig := ` +apiVersion: v1 +kind: Node +metadata: + name: node-1 + labels: + topology.kubernetes.io/zone: west-1a` + + k8sConfigsWithES := []string{` +kind: APIResourceList +apiVersion: v1 +groupVersion: discovery.k8s.io/v1 +resources: +- name: endpointslices + singularName: endpointslice + namespaced: true + kind: EndpointSlice + verbs: + - delete + - deletecollection + - get + - list + - patch + - create + - update + - watch +`, ` +apiVersion: v1 +kind: Service +metadata: + name: name1 + namespace: ns +spec: + type: LoadBalancer + ports: + - port: 1`, ` +addressType: IPv4 +apiVersion: discovery.k8s.io/v1 +endpoints: +- addresses: + - 1.1.1.1 + conditions: + ready: true + hints: + forZones: + - name: west-1a +- addresses: + - 1.1.1.2 + conditions: + ready: true + hints: + forZones: + - name: west-1b +kind: EndpointSlice +metadata: + labels: + kubernetes.io/service-name: name1 + name: name1-es + namespace: ns +ports: +- name: "" + port: 1 +`} + + k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...) + if err != nil { + t.Fatalf("NewFakeAPI returned an error: %s", err) + } + + metadataAPI, err := k8s.NewFakeMetadataAPI([]string{nodeConfig}) + if err != nil { + t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) + } + + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, false, "local") + if err != nil { + t.Fatalf("can't create Endpoints watcher: %s", err) + } + + k8sAPI.Sync(nil) + metadataAPI.Sync(nil) + + listener := newBufferingEndpointListener() + + err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 1, FilterKey{ + EnableEndpointFiltering: true, + NodeName: "node-1", + }, listener) + if err != nil { + t.Fatal(err) + } + + k8sAPI.Sync(nil) + metadataAPI.Sync(nil) + + listener.ExpectAdded([]string{"1.1.1.1:1"}, t) + listener.ExpectRemoved([]string{}, t) + + es, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + + es.Endpoints = es.Endpoints[:1] + + _, err = k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Update(context.Background(), es, metav1.UpdateOptions{}) + if err != nil { + t.Fatal(err) + } + + k8sAPI.Sync(nil) + metadataAPI.Sync(nil) + + time.Sleep(50 * time.Millisecond) + + listener.ExpectAdded([]string{"1.1.1.1:1"}, t) + listener.ExpectRemoved([]string{}, t) +} + +func TestEndpointSliceLocalTrafficPolicyIgnoresRemovalOfUnfilteredEndpoint(t *testing.T) { + nodeConfig := ` +apiVersion: v1 +kind: Node +metadata: + name: node-1` + + k8sConfigsWithES := []string{` +kind: APIResourceList +apiVersion: v1 +groupVersion: discovery.k8s.io/v1 +resources: +- name: endpointslices + singularName: endpointslice + namespaced: true + kind: EndpointSlice + verbs: + - delete + - deletecollection + - get + - list + - patch + - create + - update + - watch +`, ` +apiVersion: v1 +kind: Service +metadata: + name: name1 + namespace: ns +spec: + type: LoadBalancer + internalTrafficPolicy: Local + ports: + - port: 1`, ` +addressType: IPv4 +apiVersion: discovery.k8s.io/v1 +endpoints: +- addresses: + - 1.1.1.1 + conditions: + ready: true + targetRef: + kind: Pod + name: name1-1 + namespace: ns +- addresses: + - 1.1.1.2 + conditions: + ready: true + targetRef: + kind: Pod + name: name1-2 + namespace: ns +kind: EndpointSlice +metadata: + labels: + kubernetes.io/service-name: name1 + name: name1-es + namespace: ns +ports: +- name: "" + port: 1`, ` +apiVersion: v1 +kind: Pod +metadata: + name: name1-1 + namespace: ns +spec: + nodeName: node-1 +status: + phase: Running + podIP: 1.1.1.1`, ` +apiVersion: v1 +kind: Pod +metadata: + name: name1-2 + namespace: ns +spec: + nodeName: node-2 +status: + phase: Running + podIP: 1.1.1.2`} + + k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...) + if err != nil { + t.Fatalf("NewFakeAPI returned an error: %s", err) + } + + metadataAPI, err := k8s.NewFakeMetadataAPI([]string{nodeConfig}) + if err != nil { + t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) + } + + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, false, "local") + if err != nil { + t.Fatalf("can't create Endpoints watcher: %s", err) + } + + k8sAPI.Sync(nil) + metadataAPI.Sync(nil) + + listener := newBufferingEndpointListener() + + err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 1, FilterKey{ + EnableEndpointFiltering: true, + NodeName: "node-1", + }, listener) + if err != nil { + t.Fatal(err) + } + + k8sAPI.Sync(nil) + metadataAPI.Sync(nil) + + listener.ExpectAdded([]string{"1.1.1.1:1"}, t) + listener.ExpectRemoved([]string{}, t) + + es, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + + es.Endpoints = es.Endpoints[:1] + + _, err = k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Update(context.Background(), es, metav1.UpdateOptions{}) + if err != nil { + t.Fatal(err) + } + + k8sAPI.Sync(nil) + metadataAPI.Sync(nil) + + time.Sleep(50 * time.Millisecond) + + listener.ExpectAdded([]string{"1.1.1.1:1"}, t) + listener.ExpectRemoved([]string{}, t) +} diff --git a/controller/api/destination/watcher/filtered_listener_group.go b/controller/api/destination/watcher/filtered_listener_group.go new file mode 100644 index 0000000000000..92057487ac773 --- /dev/null +++ b/controller/api/destination/watcher/filtered_listener_group.go @@ -0,0 +1,168 @@ +package watcher + +import ( + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/discovery/v1" +) + +type ( + filteredListenerGroup struct { + key FilterKey + nodeTopologyZone string + enableEndpointFiltering bool + enableIPv6 bool + localTrafficPolicy bool + snapshot AddressSet + listeners []EndpointUpdateListener + metrics endpointsMetrics + } +) + +func newFilteredListenerGroup(key FilterKey, nodeTopologyZone string, enableIPv6 bool, localTrafficPolicy bool, metrics endpointsMetrics) *filteredListenerGroup { + return &filteredListenerGroup{ + key: key, + nodeTopologyZone: nodeTopologyZone, + enableEndpointFiltering: key.EnableEndpointFiltering, + enableIPv6: enableIPv6, + localTrafficPolicy: localTrafficPolicy, + metrics: metrics, + snapshot: AddressSet{Addresses: make(map[ID]Address)}, + } +} + +func (group *filteredListenerGroup) publishDiff(addresses AddressSet) { + filtered := group.filterAddresses(addresses) + add, remove := diffAddresses(group.snapshot, filtered) + group.snapshot = filtered + + for _, listener := range group.listeners { + if len(add.Addresses) > 0 { + listener.Add(add) + } + if len(remove.Addresses) > 0 { + listener.Remove(remove) + } + } + + group.metrics.incUpdates() + group.metrics.setPods(len(group.snapshot.Addresses)) + group.metrics.setExists(true) +} + +func (group *filteredListenerGroup) publishNoEndpoints(exists bool) { + remove := group.snapshot + group.snapshot = AddressSet{Addresses: make(map[ID]Address)} + + for _, listener := range group.listeners { + if len(remove.Addresses) > 0 { + listener.Remove(remove) + } + } + + group.metrics.incUpdates() + group.metrics.setPods(0) + group.metrics.setExists(exists) +} + +func (group *filteredListenerGroup) updateLocalTrafficPolicy(localTrafficPolicy bool) { + group.localTrafficPolicy = localTrafficPolicy + group.publishDiff(group.snapshot) +} + +func (group *filteredListenerGroup) filterAddresses(addresses AddressSet) AddressSet { + candidates := make(map[ID]Address) + + // If hostname filtering is specified, only include addresses that match the hostname. + // This filtering should be applied even if endpoint filtering is disabled. + for id, address := range addresses.Addresses { + if address.Hostname != nil { + if group.key.Hostname != "" && group.key.Hostname != *address.Hostname { + continue + } + } + candidates[id] = address + } + + // If endpoint filtering is disabled, return all hostname-matching addresses. + if !group.enableEndpointFiltering { + return selectAddressFamily(AddressSet{ + Addresses: candidates, + Labels: addresses.Labels, + }, group.enableIPv6) + } + + // If internalTrafficPolicy=Local, only keep pod endpoints on the same node. + if group.localTrafficPolicy { + filtered := make(map[ID]Address) + for id, address := range candidates { + if address.Pod != nil && address.Pod.Spec.NodeName == group.key.NodeName { + filtered[id] = address + } + } + return selectAddressFamily(AddressSet{ + Addresses: filtered, + Labels: addresses.Labels, + }, group.enableIPv6) + } + + // If ANY address lacks hints ForZone hints, disable zone filtering and return all candidates. + for _, address := range candidates { + if len(address.ForZones) == 0 { + return selectAddressFamily(AddressSet{ + Addresses: candidates, + Labels: addresses.Labels, + }, group.enableIPv6) + } + } + + // Otherwise, perform zone filtering:keep only endpoints whose hints include this node's zone. + filtered := make(map[ID]Address) + for id, address := range candidates { + if containsZone(address.ForZones, group.nodeTopologyZone) { + filtered[id] = address + } + } + + // If zone filtering produced nothing, fall back to all candidates. + if len(filtered) == 0 { + filtered = candidates + } + + return selectAddressFamily(AddressSet{ + Addresses: filtered, + Labels: addresses.Labels, + }, group.enableIPv6) +} + +func containsZone(zones []v1.ForZone, zone string) bool { + for _, z := range zones { + if z.Name == zone { + return true + } + } + return false +} + +func selectAddressFamily(addresses AddressSet, enableIPv6 bool) AddressSet { + filtered := make(map[ID]Address) + for id, addr := range addresses.Addresses { + if id.IPFamily == corev1.IPv6Protocol && !enableIPv6 { + continue + } + + if id.IPFamily == corev1.IPv4Protocol && enableIPv6 { + altID := id + altID.IPFamily = corev1.IPv6Protocol + if _, ok := addresses.Addresses[altID]; ok { + continue + } + } + + filtered[id] = addr + } + + return AddressSet{ + Addresses: filtered, + Labels: addresses.Labels, + } +} diff --git a/controller/api/destination/watcher/port_publisher.go b/controller/api/destination/watcher/port_publisher.go new file mode 100644 index 0000000000000..47ae5179e20d8 --- /dev/null +++ b/controller/api/destination/watcher/port_publisher.go @@ -0,0 +1,679 @@ +package watcher + +import ( + "context" + "fmt" + "maps" + "net" + "strings" + + "github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta3" + "github.com/linkerd/linkerd2/controller/k8s" + consts "github.com/linkerd/linkerd2/pkg/k8s" + logging "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/intstr" +) + +type ( + // portPublisher represents a service along with a port and optionally a + // hostname. Multiple listeners may be subscribed to a portPublisher. + // portPublisher maintains the current state of the address set and + // publishes diffs to all listeners when updates come from either the + // endpoints API or the service API. + portPublisher struct { + id ServiceID + + targetPort namedPort + srcPort Port + log *logging.Entry + k8sAPI *k8s.API + metadataAPI *k8s.MetadataAPI + enableEndpointSlices bool + enableIPv6 bool + exists bool + addresses AddressSet + filteredListeners map[FilterKey]*filteredListenerGroup + cluster string + localTrafficPolicy bool + } +) + +// Note that portPublishers methods are generally NOT thread-safe. You should +// hold the parent servicePublisher's mutex before calling methods on a +// portPublisher. + +func (pp *portPublisher) updateEndpoints(endpoints *corev1.Endpoints) { + newAddressSet := pp.endpointsToAddresses(endpoints) + if len(newAddressSet.Addresses) == 0 { + pp.publishNoEndpoints(true) + } else { + pp.publishAddressChange(newAddressSet) + } + pp.addresses = newAddressSet +} + +func (pp *portPublisher) addEndpointSlice(slice *discovery.EndpointSlice) { + newAddressSet := pp.endpointSliceToAddresses(slice) + for id, addr := range pp.addresses.Addresses { + if _, ok := newAddressSet.Addresses[id]; !ok { + newAddressSet.Addresses[id] = addr + } + } + + pp.publishAddressChange(newAddressSet) + + // even if the ES doesn't have addresses yet we need to create a new + // pp.addresses entry with the appropriate Labels and LocalTrafficPolicy, + // which isn't going to be captured during the ES update event when + // addresses get added + + pp.addresses = newAddressSet + pp.exists = true +} + +func (pp *portPublisher) updateEndpointSlice(oldSlice *discovery.EndpointSlice, newSlice *discovery.EndpointSlice) { + updatedAddressSet := AddressSet{ + Addresses: make(map[ID]Address), + Labels: pp.addresses.Labels, + } + maps.Copy(updatedAddressSet.Addresses, pp.addresses.Addresses) + + for _, id := range pp.endpointSliceToIDs(oldSlice) { + delete(updatedAddressSet.Addresses, id) + } + + newAddressSet := pp.endpointSliceToAddresses(newSlice) + maps.Copy(updatedAddressSet.Addresses, newAddressSet.Addresses) + pp.publishAddressChange(updatedAddressSet) + + pp.addresses = updatedAddressSet + pp.exists = true +} + +func metricLabels(resource interface{}) map[string]string { + var serviceName, ns string + var resLabels, resAnnotations map[string]string + switch res := resource.(type) { + case *corev1.Endpoints: + { + serviceName, ns = res.Name, res.Namespace + resLabels, resAnnotations = res.Labels, res.Annotations + } + case *discovery.EndpointSlice: + { + serviceName, ns = res.Labels[discovery.LabelServiceName], res.Namespace + resLabels, resAnnotations = res.Labels, res.Annotations + } + } + + labels := map[string]string{service: serviceName, namespace: ns} + + remoteClusterName, hasRemoteClusterName := resLabels[consts.RemoteClusterNameLabel] + serviceFqn, hasServiceFqn := resAnnotations[consts.RemoteServiceFqName] + + if hasRemoteClusterName { + // this means we are looking at Endpoints created for the purpose of mirroring + // an out of cluster service. + labels[targetCluster] = remoteClusterName + if hasServiceFqn { + fqParts := strings.Split(serviceFqn, ".") + if len(fqParts) >= 2 { + labels[targetService] = fqParts[0] + labels[targetServiceNamespace] = fqParts[1] + } + } + } + return labels +} + +func (pp *portPublisher) endpointSliceToAddresses(es *discovery.EndpointSlice) AddressSet { + resolvedPort := pp.resolveESTargetPort(es.Ports) + if resolvedPort == undefinedEndpointPort { + return AddressSet{ + Labels: metricLabels(es), + Addresses: make(map[ID]Address), + } + } + + serviceID, err := getEndpointSliceServiceID(es) + if err != nil { + pp.log.Errorf("Could not fetch resource service name:%v", err) + } + + addresses := make(map[ID]Address) + for _, endpoint := range es.Endpoints { + if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready { + continue + } + + if endpoint.TargetRef == nil { + for _, IPAddr := range endpoint.Addresses { + var authorityOverride string + if fqName, ok := es.Annotations[consts.RemoteServiceFqName]; ok { + authorityOverride = net.JoinHostPort(fqName, fmt.Sprintf("%d", pp.srcPort)) + } + + identity := es.Annotations[consts.RemoteGatewayIdentity] + address, id := pp.newServiceRefAddress(resolvedPort, IPAddr, endpoint.Hostname, serviceID.Name, es.Namespace) + address.Identity, address.AuthorityOverride = identity, authorityOverride + + if endpoint.Hints != nil { + zones := make([]discovery.ForZone, len(endpoint.Hints.ForZones)) + copy(zones, endpoint.Hints.ForZones) + address.ForZones = zones + } + addresses[id] = address + } + continue + } + + if endpoint.TargetRef.Kind == endpointTargetRefPod { + for _, IPAddr := range endpoint.Addresses { + address, id, err := pp.newPodRefAddress( + resolvedPort, + es.AddressType, + IPAddr, + endpoint.Hostname, + endpoint.TargetRef.Name, + endpoint.TargetRef.Namespace, + ) + if err != nil { + pp.log.Errorf("Unable to create new address:%v", err) + continue + } + err = SetToServerProtocol(pp.k8sAPI, &address, pp.log) + if err != nil { + pp.log.Errorf("failed to set address OpaqueProtocol: %s", err) + } + + address.Zone = endpoint.Zone + if endpoint.Hints != nil { + zones := make([]discovery.ForZone, len(endpoint.Hints.ForZones)) + copy(zones, endpoint.Hints.ForZones) + address.ForZones = zones + } + addresses[id] = address + } + } + + if endpoint.TargetRef.Kind == endpointTargetRefExternalWorkload { + for _, IPAddr := range endpoint.Addresses { + address, id, err := pp.newExtRefAddress(resolvedPort, IPAddr, endpoint.Hostname, endpoint.TargetRef.Name, es.Namespace) + if err != nil { + pp.log.Errorf("Unable to create new address: %v", err) + continue + } + + err = SetToServerProtocolExternalWorkload(pp.k8sAPI, &address) + if err != nil { + pp.log.Errorf("failed to set address OpaqueProtocol: %s", err) + continue + } + + address.Zone = endpoint.Zone + if endpoint.Hints != nil { + zones := make([]discovery.ForZone, len(endpoint.Hints.ForZones)) + copy(zones, endpoint.Hints.ForZones) + address.ForZones = zones + } + + addresses[id] = address + } + + } + + } + return AddressSet{ + Addresses: addresses, + Labels: metricLabels(es), + } +} + +// endpointSliceToIDs is similar to endpointSliceToAddresses but instead returns +// only the IDs of the endpoints rather than the addresses themselves. +func (pp *portPublisher) endpointSliceToIDs(es *discovery.EndpointSlice) []ID { + resolvedPort := pp.resolveESTargetPort(es.Ports) + if resolvedPort == undefinedEndpointPort { + return []ID{} + } + + serviceID, err := getEndpointSliceServiceID(es) + if err != nil { + pp.log.Errorf("Could not fetch resource service name:%v", err) + } + + ids := []ID{} + for _, endpoint := range es.Endpoints { + if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready { + continue + } + + if endpoint.TargetRef == nil { + for _, IPAddr := range endpoint.Addresses { + ids = append(ids, ServiceID{ + Name: strings.Join([]string{ + serviceID.Name, + IPAddr, + fmt.Sprint(resolvedPort), + }, "-"), + Namespace: es.Namespace, + }) + } + continue + } + + if endpoint.TargetRef.Kind == endpointTargetRefPod { + ids = append(ids, PodID{ + Name: endpoint.TargetRef.Name, + Namespace: endpoint.TargetRef.Namespace, + IPFamily: corev1.IPFamily(es.AddressType), + }) + } else if endpoint.TargetRef.Kind == endpointTargetRefExternalWorkload { + ids = append(ids, ExternalWorkloadID{ + Name: endpoint.TargetRef.Name, + Namespace: endpoint.TargetRef.Namespace, + }) + } + + } + return ids +} + +func (pp *portPublisher) endpointsToAddresses(endpoints *corev1.Endpoints) AddressSet { + addresses := make(map[ID]Address) + for _, subset := range endpoints.Subsets { + resolvedPort := pp.resolveTargetPort(subset) + if resolvedPort == undefinedEndpointPort { + continue + } + for _, endpoint := range subset.Addresses { + hostname := endpoint.Hostname + if endpoint.TargetRef == nil { + var authorityOverride string + if fqName, ok := endpoints.Annotations[consts.RemoteServiceFqName]; ok { + authorityOverride = fmt.Sprintf("%s:%d", fqName, pp.srcPort) + } + + identity := endpoints.Annotations[consts.RemoteGatewayIdentity] + address, id := pp.newServiceRefAddress(resolvedPort, endpoint.IP, &hostname, endpoints.Name, endpoints.Namespace) + address.Identity, address.AuthorityOverride = identity, authorityOverride + + addresses[id] = address + continue + } + + if endpoint.TargetRef.Kind == endpointTargetRefPod { + address, id, err := pp.newPodRefAddress( + resolvedPort, + "", + endpoint.IP, + &hostname, + endpoint.TargetRef.Name, + endpoint.TargetRef.Namespace, + ) + if err != nil { + pp.log.Errorf("Unable to create new address:%v", err) + continue + } + err = SetToServerProtocol(pp.k8sAPI, &address, pp.log) + if err != nil { + pp.log.Errorf("failed to set address OpaqueProtocol: %s", err) + } + addresses[id] = address + } + } + } + return AddressSet{ + Addresses: addresses, + Labels: metricLabels(endpoints), + } +} + +func (pp *portPublisher) newServiceRefAddress(endpointPort Port, endpointIP string, hostname *string, serviceName, serviceNamespace string) (Address, ServiceID) { + id := ServiceID{ + Name: strings.Join([]string{ + serviceName, + endpointIP, + fmt.Sprint(endpointPort), + }, "-"), + Namespace: serviceNamespace, + } + + return Address{IP: endpointIP, Port: endpointPort, Hostname: hostname}, id +} + +func (pp *portPublisher) newPodRefAddress( + endpointPort Port, + ipFamily discovery.AddressType, + endpointIP string, + hostname *string, + podName, + podNamespace string, +) (Address, PodID, error) { + id := PodID{ + Name: podName, + Namespace: podNamespace, + IPFamily: corev1.IPFamily(ipFamily), + } + pod, err := pp.k8sAPI.Pod().Lister().Pods(id.Namespace).Get(id.Name) + if err != nil { + return Address{}, PodID{}, fmt.Errorf("unable to fetch pod %v: %w", id, err) + } + ownerKind, ownerName, err := pp.metadataAPI.GetOwnerKindAndName(context.Background(), pod, false) + if err != nil { + return Address{}, PodID{}, err + } + addr := Address{ + IP: endpointIP, + Port: endpointPort, + Pod: pod, + OwnerName: ownerName, + OwnerKind: ownerKind, + Hostname: hostname, + } + + return addr, id, nil +} + +func (pp *portPublisher) newExtRefAddress( + endpointPort Port, + endpointIP string, + hostname *string, + externalWorkloadName, + externalWorkloadNamespace string, +) (Address, ExternalWorkloadID, error) { + id := ExternalWorkloadID{ + Name: externalWorkloadName, + Namespace: externalWorkloadNamespace, + } + + ew, err := pp.k8sAPI.ExtWorkload().Lister().ExternalWorkloads(id.Namespace).Get(id.Name) + if err != nil { + return Address{}, ExternalWorkloadID{}, fmt.Errorf("unable to fetch ExternalWorkload %v: %w", id, err) + } + + addr := Address{ + IP: endpointIP, + Port: endpointPort, + ExternalWorkload: ew, + Hostname: hostname, + } + + ownerRefs := ew.GetOwnerReferences() + if len(ownerRefs) == 1 { + parent := ownerRefs[0] + addr.OwnerName = parent.Name + addr.OwnerName = strings.ToLower(parent.Kind) + } + + return addr, id, nil +} + +func (pp *portPublisher) resolveESTargetPort(slicePorts []discovery.EndpointPort) Port { + if slicePorts == nil { + return undefinedEndpointPort + } + + switch pp.targetPort.Type { + case intstr.Int: + return Port(pp.targetPort.IntVal) + case intstr.String: + for _, p := range slicePorts { + name := "" + if p.Name != nil { + name = *p.Name + } + if name == pp.targetPort.StrVal { + return Port(*p.Port) + } + } + } + return undefinedEndpointPort +} + +func (pp *portPublisher) resolveTargetPort(subset corev1.EndpointSubset) Port { + switch pp.targetPort.Type { + case intstr.Int: + return Port(pp.targetPort.IntVal) + case intstr.String: + for _, p := range subset.Ports { + if p.Name == pp.targetPort.StrVal { + return Port(p.Port) + } + } + } + return undefinedEndpointPort +} + +func (pp *portPublisher) updateLocalTrafficPolicy(localTrafficPolicy bool) { + pp.localTrafficPolicy = localTrafficPolicy + for _, group := range pp.filteredListeners { + group.updateLocalTrafficPolicy(localTrafficPolicy) + } +} + +func (pp *portPublisher) updatePort(targetPort namedPort) { + pp.targetPort = targetPort + + if pp.enableEndpointSlices { + matchLabels := map[string]string{discovery.LabelServiceName: pp.id.Name} + selector := labels.Set(matchLabels).AsSelector() + + endpointSlices, err := pp.k8sAPI.ES().Lister().EndpointSlices(pp.id.Namespace).List(selector) + if err == nil { + pp.addresses = AddressSet{} + for _, slice := range endpointSlices { + pp.addEndpointSlice(slice) + } + } else { + pp.log.Errorf("Unable to get EndpointSlices during port update: %s", err) + } + } else { + endpoints, err := pp.k8sAPI.Endpoint().Lister().Endpoints(pp.id.Namespace).Get(pp.id.Name) + if err == nil { + pp.updateEndpoints(endpoints) + } else { + pp.log.Errorf("Unable to get endpoints during port update: %s", err) + } + } +} + +func (pp *portPublisher) deleteEndpointSlice(es *discovery.EndpointSlice) { + updatedAddressSet := AddressSet{ + Addresses: make(map[ID]Address), + Labels: pp.addresses.Labels, + } + for id, address := range pp.addresses.Addresses { + updatedAddressSet.Addresses[id] = address + } + + addrSet := pp.endpointSliceToAddresses(es) + for id := range addrSet.Addresses { + delete(updatedAddressSet.Addresses, id) + } + + pp.publishAddressChange(updatedAddressSet) + pp.addresses = updatedAddressSet + + if len(pp.addresses.Addresses) == 0 { + pp.noEndpoints(false) + } else { + pp.exists = true + } +} + +func (pp *portPublisher) noEndpoints(exists bool) { + pp.exists = exists + pp.addresses = AddressSet{} + pp.publishNoEndpoints(exists) +} + +func (pp *portPublisher) subscribe(listener EndpointUpdateListener, filterKey FilterKey) error { + group, err := pp.filteredListenerGroup(filterKey) + if err != nil { + return err + } + if pp.exists { + if len(pp.addresses.Addresses) > 0 { + filteredSet := group.filterAddresses(pp.addresses) + group.snapshot = filteredSet + if len(filteredSet.Addresses) > 0 { + listener.Add(filteredSet.shallowCopy()) + } + } + } + group.listeners = append(group.listeners, listener) + group.metrics.setSubscribers(len(group.listeners)) + + return nil +} + +func (pp *portPublisher) unsubscribe(listener EndpointUpdateListener, filterKey FilterKey, withRemove bool) { + group, ok := pp.filteredListeners[filterKey] + if ok { + if withRemove { + listener.Remove(group.snapshot) + } + + for i, existing := range group.listeners { + if existing == listener { + n := len(group.listeners) + group.listeners[i] = group.listeners[n-1] + group.listeners[n-1] = nil + group.listeners = group.listeners[:n-1] + break + } + } + if len(group.listeners) == 0 { + endpointsVecs.unregister(endpointsLabels( + pp.cluster, pp.id.Namespace, pp.id.Name, fmt.Sprintf("%d", pp.srcPort), filterKey.Hostname, filterKey.NodeName, + )) + delete(pp.filteredListeners, filterKey) + } + } + group.metrics.setSubscribers(len(group.listeners)) +} +func (pp *portPublisher) updateServer(oldServer, newServer *v1beta3.Server) { + updated := false + for id, address := range pp.addresses.Addresses { + + if pp.isAddressSelected(address, oldServer) || pp.isAddressSelected(address, newServer) { + oldOpaque := address.OpaqueProtocol + if newServer != nil && pp.isAddressSelected(address, newServer) && newServer.Spec.ProxyProtocol == opaqueProtocol { + address.OpaqueProtocol = true + } else { + address.OpaqueProtocol = false + } + if oldOpaque != address.OpaqueProtocol { + pp.addresses.Addresses[id] = address + updated = true + } + } + } + if updated { + pp.publishFilteredSnapshots() + } +} + +func (pp *portPublisher) filteredListenerGroup(filterKey FilterKey) (*filteredListenerGroup, error) { + group, ok := pp.filteredListeners[filterKey] + if !ok { + nodeTopologyZone := "" + if filterKey.EnableEndpointFiltering && filterKey.NodeName != "" { + node, err := pp.metadataAPI.Get(k8s.Node, filterKey.NodeName) + if err != nil { + pp.log.Errorf("Unable to get node %s: %s", filterKey.NodeName, err) + } else { + nodeTopologyZone = node.Labels[corev1.LabelTopologyZone] + } + } + + metrics, err := endpointsVecs.newEndpointsMetrics(endpointsLabels(pp.cluster, pp.id.Namespace, pp.id.Name, fmt.Sprintf("%d", pp.srcPort), filterKey.Hostname, filterKey.NodeName)) + if err != nil { + return nil, err + } + group = newFilteredListenerGroup(filterKey, nodeTopologyZone, pp.enableIPv6, pp.localTrafficPolicy, metrics) + pp.filteredListeners[filterKey] = group + } + return group, nil +} + +func (pp *portPublisher) publishAddressChange(newAddressSet AddressSet) { + for _, group := range pp.filteredListeners { + group.publishDiff(newAddressSet) + } +} + +func (pp *portPublisher) publishFilteredSnapshots() { + for _, group := range pp.filteredListeners { + group.publishDiff(pp.addresses) + } +} + +func (pp *portPublisher) publishNoEndpoints(exists bool) { + for _, group := range pp.filteredListeners { + group.publishNoEndpoints(exists) + } +} + +func (pp *portPublisher) isAddressSelected(address Address, server *v1beta3.Server) bool { + if server == nil { + return false + } + + if address.Pod != nil { + selector, err := metav1.LabelSelectorAsSelector(server.Spec.PodSelector) + if err != nil { + pp.log.Errorf("failed to create Selector: %s", err) + return false + } + + if !selector.Matches(labels.Set(address.Pod.Labels)) { + return false + } + + switch server.Spec.Port.Type { + case intstr.Int: + if server.Spec.Port.IntVal == int32(address.Port) { + return true + } + case intstr.String: + for _, c := range append(address.Pod.Spec.InitContainers, address.Pod.Spec.Containers...) { + for _, p := range c.Ports { + if p.ContainerPort == int32(address.Port) && p.Name == server.Spec.Port.StrVal { + return true + } + } + } + } + + } else if address.ExternalWorkload != nil { + selector, err := metav1.LabelSelectorAsSelector(server.Spec.ExternalWorkloadSelector) + if err != nil { + pp.log.Errorf("failed to create Selector: %s", err) + return false + } + + if !selector.Matches(labels.Set(address.ExternalWorkload.Labels)) { + return false + } + + switch server.Spec.Port.Type { + case intstr.Int: + if server.Spec.Port.IntVal == int32(address.Port) { + return true + } + case intstr.String: + for _, p := range address.ExternalWorkload.Spec.Ports { + if p.Port == int32(address.Port) && p.Name == server.Spec.Port.StrVal { + return true + } + } + } + } + return false +} diff --git a/controller/api/destination/watcher/prometheus.go b/controller/api/destination/watcher/prometheus.go index 16c789c4b276a..b3e23856c2a6e 100644 --- a/controller/api/destination/watcher/prometheus.go +++ b/controller/api/destination/watcher/prometheus.go @@ -128,13 +128,14 @@ func newMetricsVecs(name string, labels []string) metricsVecs { } } -func endpointsLabels(cluster, namespace, service, port string, hostname string) prometheus.Labels { +func endpointsLabels(cluster, namespace, service, port, hostname, nodename string) prometheus.Labels { return prometheus.Labels{ "cluster": cluster, "namespace": namespace, "service": service, "port": port, "hostname": hostname, + "nodename": nodename, } } @@ -147,7 +148,7 @@ func labelNames(labels prometheus.Labels) []string { } func newEndpointsMetricsVecs() endpointsMetricsVecs { - labels := labelNames(endpointsLabels("", "", "", "", "")) + labels := labelNames(endpointsLabels("", "", "", "", "", "")) vecs := newMetricsVecs("endpoints", labels) pods := promauto.NewGaugeVec( diff --git a/controller/api/destination/watcher/service_publisher.go b/controller/api/destination/watcher/service_publisher.go new file mode 100644 index 0000000000000..f80bc5229c9b5 --- /dev/null +++ b/controller/api/destination/watcher/service_publisher.go @@ -0,0 +1,201 @@ +package watcher + +import ( + "sync" + + "github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta3" + "github.com/linkerd/linkerd2/controller/k8s" + logging "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/intstr" +) + +type ( + + // servicePublisher represents a service. It keeps a map of portPublishers + // keyed by port and hostname. This is because each watch on a service + // will have a port and optionally may specify a hostname. The port + // and hostname will influence the endpoint set which is why a separate + // portPublisher is required for each port and hostname combination. The + // service's port mapping will be applied to the requested port and the + // mapped port will be used in the addresses set. If a hostname is + // requested, the address set will be filtered to only include addresses + // with the requested hostname. + servicePublisher struct { + id ServiceID + log *logging.Entry + k8sAPI *k8s.API + metadataAPI *k8s.MetadataAPI + enableEndpointSlices bool + enableIPv6 bool + localTrafficPolicy bool + cluster string + ports map[Port]*portPublisher + // All access to the servicePublisher and its portPublishers is explicitly synchronized by + // this mutex. + sync.Mutex + } +) + +func (sp *servicePublisher) updateEndpoints(newEndpoints *corev1.Endpoints) { + sp.Lock() + defer sp.Unlock() + sp.log.Debugf("Updating endpoints for %s", sp.id) + for _, port := range sp.ports { + port.updateEndpoints(newEndpoints) + } +} + +func (sp *servicePublisher) deleteEndpoints() { + sp.Lock() + defer sp.Unlock() + sp.log.Debugf("Deleting endpoints for %s", sp.id) + for _, port := range sp.ports { + port.noEndpoints(false) + } +} + +func (sp *servicePublisher) addEndpointSlice(newSlice *discovery.EndpointSlice) { + sp.Lock() + defer sp.Unlock() + + sp.log.Debugf("Adding ES %s/%s", newSlice.Namespace, newSlice.Name) + for _, port := range sp.ports { + port.addEndpointSlice(newSlice) + } +} + +func (sp *servicePublisher) updateEndpointSlice(oldSlice *discovery.EndpointSlice, newSlice *discovery.EndpointSlice) { + sp.Lock() + defer sp.Unlock() + + sp.log.Debugf("Updating ES %s/%s", oldSlice.Namespace, oldSlice.Name) + for _, port := range sp.ports { + port.updateEndpointSlice(oldSlice, newSlice) + } +} + +func (sp *servicePublisher) deleteEndpointSlice(es *discovery.EndpointSlice) { + sp.Lock() + defer sp.Unlock() + + sp.log.Debugf("Deleting ES %s/%s", es.Namespace, es.Name) + for _, port := range sp.ports { + port.deleteEndpointSlice(es) + } +} + +func (sp *servicePublisher) updateService(newService *corev1.Service) { + sp.Lock() + defer sp.Unlock() + sp.log.Debugf("Updating service for %s", sp.id) + + // set localTrafficPolicy to true if InternalTrafficPolicy is set to local + if newService.Spec.InternalTrafficPolicy != nil { + sp.localTrafficPolicy = *newService.Spec.InternalTrafficPolicy == corev1.ServiceInternalTrafficPolicyLocal + } else { + sp.localTrafficPolicy = false + } + + for port, publisher := range sp.ports { + newTargetPort := getTargetPort(newService, port) + if newTargetPort != publisher.targetPort { + publisher.updatePort(newTargetPort) + } + // update service endpoints with new localTrafficPolicy + if publisher.localTrafficPolicy != sp.localTrafficPolicy { + publisher.updateLocalTrafficPolicy(sp.localTrafficPolicy) + } + } + +} + +func (sp *servicePublisher) subscribe(srcPort Port, listener EndpointUpdateListener, filterKey FilterKey) error { + sp.Lock() + defer sp.Unlock() + + publisher, ok := sp.ports[srcPort] + if !ok { + publisher = sp.newPortPublisher(srcPort) + sp.ports[srcPort] = publisher + } + err := publisher.subscribe(listener, filterKey) + return err +} + +func (sp *servicePublisher) unsubscribe(srcPort Port, listener EndpointUpdateListener, filterKey FilterKey, withRemove bool) { + sp.Lock() + defer sp.Unlock() + + publisher, ok := sp.ports[srcPort] + if ok { + publisher.unsubscribe(listener, filterKey, withRemove) + } +} + +func (sp *servicePublisher) newPortPublisher(srcPort Port) *portPublisher { + targetPort := intstr.FromInt(int(srcPort)) + svc, err := sp.k8sAPI.Svc().Lister().Services(sp.id.Namespace).Get(sp.id.Name) + if err != nil && !apierrors.IsNotFound(err) { + sp.log.Errorf("error getting service: %s", err) + } + exists := false + if err == nil { + targetPort = getTargetPort(svc, srcPort) + exists = true + } + + log := sp.log.WithField("port", srcPort) + + port := &portPublisher{ + filteredListeners: map[FilterKey]*filteredListenerGroup{}, + targetPort: targetPort, + srcPort: srcPort, + exists: exists, + k8sAPI: sp.k8sAPI, + metadataAPI: sp.metadataAPI, + log: log, + cluster: sp.cluster, + + enableEndpointSlices: sp.enableEndpointSlices, + enableIPv6: sp.enableIPv6, + localTrafficPolicy: sp.localTrafficPolicy, + } + + if port.enableEndpointSlices { + matchLabels := map[string]string{discovery.LabelServiceName: sp.id.Name} + selector := labels.Set(matchLabels).AsSelector() + + sliceList, err := sp.k8sAPI.ES().Lister().EndpointSlices(sp.id.Namespace).List(selector) + if err != nil && !apierrors.IsNotFound(err) { + sp.log.Errorf("error getting endpointSlice list: %s", err) + } + if err == nil { + for _, slice := range sliceList { + port.addEndpointSlice(slice) + } + } + } else { + endpoints, err := sp.k8sAPI.Endpoint().Lister().Endpoints(sp.id.Namespace).Get(sp.id.Name) + if err != nil && !apierrors.IsNotFound(err) { + sp.log.Errorf("error getting endpoints: %s", err) + } + if err == nil { + port.updateEndpoints(endpoints) + } + } + + return port +} + +func (sp *servicePublisher) updateServer(oldServer, newServer *v1beta3.Server) { + sp.Lock() + defer sp.Unlock() + + for _, pp := range sp.ports { + pp.updateServer(oldServer, newServer) + } +} diff --git a/controller/cmd/destination/main.go b/controller/cmd/destination/main.go index 5e67a22fa9e80..62508f48180c6 100644 --- a/controller/cmd/destination/main.go +++ b/controller/cmd/destination/main.go @@ -168,7 +168,7 @@ func Main(args []string) { log.Fatalf("Failed to initialize Kubernetes metadata API: %s", err) } - clusterStore, err := watcher.NewClusterStore(k8Client, *controllerNamespace, *enableEndpointSlices) + clusterStore, err := watcher.NewClusterStore(k8Client, *controllerNamespace, *enableEndpointSlices, *enableIPv6) if err != nil { log.Fatalf("Failed to initialize Cluster Store: %s", err) }