Skip to content
32 changes: 30 additions & 2 deletions comp/remote-config/rcprotocoltest/impl/ping_pong.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/http"
"path"
"strconv"
"strings"
"sync"

"github.com/DataDog/datadog-agent/pkg/config/remote/api"
Expand Down Expand Up @@ -40,8 +41,9 @@ func RunTransportTests(ctx context.Context, httpClient *api.HTTPClient, runCount
}

var wg sync.WaitGroup
wg.Add(3)
wg.Add(4)
go func() { defer wg.Done(); runWebSocketTest(ctx, httpClient, runCount) }()
go func() { defer wg.Done(); runWebSocketTestWithALPN(ctx, httpClient, runCount) }()
go func() { defer wg.Done(); runGrpcTest(ctx, httpClient, runCount) }()
go func() { defer wg.Done(); runTCPTest(ctx, httpClient) }()
wg.Wait()
Expand Down Expand Up @@ -93,14 +95,40 @@ func runWebSocketTest(ctx context.Context, httpClient *api.HTTPClient, runCount
}
}()

n, err := runEchoLoop(ctx, httpClient, runCount)
n, err := runEchoLoop(ctx, httpClient, runCount, ALPNDefault)
if err != nil {
log.Debugf("websocket echo test failed: %s (%d data frames exchanged)", err, n)
return
}
log.Debugf("websocket echo test complete (%d data frames exchanged)", n)
}

func runWebSocketTestWithALPN(ctx context.Context, httpClient *api.HTTPClient, runCount uint64) {
defer func() {
if err := recover(); err != nil {
log.Warnf("unexpected websocket echo with ALPN connectivity test failure: %s", err)
}
}()

// ALPN requires TLS, check if TLS is enabled before running test.
baseURL, err := httpClient.BaseURL()
if err != nil {
log.Debugf("websocket echo test with ALPN failed to get base URL: %s", err)
return
}
if strings.ToLower(baseURL.Scheme) == "http" {
log.Debug("websocket echo test with ALPN skipped: TLS is disabled")
return
}

n, err := runEchoLoop(ctx, httpClient, runCount, ALPNDDRC)
if err != nil {
log.Debugf("websocket echo test with ALPN failed: %s (%d data frames exchanged)", err, n)
return
}
log.Debugf("websocket echo test with ALPN complete (%d data frames exchanged)", n)
}

func runGrpcTest(ctx context.Context, httpClient *api.HTTPClient, runCount uint64) {
defer func() {
if err := recover(); err != nil {
Expand Down
69 changes: 58 additions & 11 deletions comp/remote-config/rcprotocoltest/impl/websocket_echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package rcprotocoltestimpl
import (
"bytes"
"context"
"errors"
"fmt"
"path"
"strconv"
Expand All @@ -25,8 +26,28 @@ import (
// messageTimeout interval or the test times out.
const messageTimeout = 5 * time.Minute

func runEchoLoop(ctx context.Context, client *api.HTTPClient, runCount uint64) (uint, error) {
conn, err := newWebSocketClient(ctx, "/api/v0.2/echo-test", client, runCount)
// ALPNMode specifies the ALPN protocol mode for WebSocket connections.
type ALPNMode int

const (
// ALPNDefault uses no ALPN protocol negotiation.
ALPNDefault ALPNMode = 0
// ALPNDDRC uses the dd-rc-v1 ALPN protocol.
ALPNDDRC ALPNMode = 1
)

// alpnProtocolDDRC is the ALPN protocol identifier for Datadog Remote Config
// WebSocket connections. This protocol enables optimized routing and handling
// of remote config traffic at the load balancer and backend level.
const alpnProtocolDDRC = "dd-rc-v1"

func runEchoLoop(ctx context.Context, client *api.HTTPClient, runCount uint64, alpnMode ALPNMode) (uint, error) {
endpointPath := "/api/v0.2/echo-test"
if alpnMode == ALPNDDRC {
endpointPath = "/api/v0.2/echo-test-alpn"
}

conn, err := newWebSocketClient(ctx, endpointPath, client, runCount, alpnMode)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -115,16 +136,39 @@ func gracefulAbort(conn *websocket.Conn) {
//
// The "endpointPath" specifies the resource path to connect to, which is
// appended to the client baseURL.
func newWebSocketClient(ctx context.Context, endpointPath string, httpClient *api.HTTPClient, runCount uint64) (*websocket.Conn, error) {
//
// The "alpnMode" specifies the ALPN protocol mode. Use ALPNDefault for no ALPN
// or ALPNDDRC for dd-rc-v1 ALPN protocol.
func newWebSocketClient(ctx context.Context, endpointPath string, httpClient *api.HTTPClient, runCount uint64, alpnMode ALPNMode) (*websocket.Conn, error) {
// Extract the TLS & Proxy configuration from the HTTP client.
transport, err := httpClient.Transport()
if err != nil {
return nil, err
}

tlsConfig := transport.TLSClientConfig

// Parse the "base URL" the client uses to connect to RC.
url, err := httpClient.BaseURL()
if err != nil {
return nil, err
}

// Handle ALPN if requested.
if alpnMode == ALPNDDRC {
// ALPN requires TLS, so this test cannot run with plain HTTP.
if strings.ToLower(url.Scheme) == "http" {
return nil, errors.New("ALPN websocket test requires TLS (remote_configuration.no_tls must be false)")
}

// Clone and configure TLS for ALPN.
tlsConfig = tlsConfig.Clone()
tlsConfig.NextProtos = []string{alpnProtocolDDRC}
}

dialer := &websocket.Dialer{
HandshakeTimeout: 30 * time.Second,
TLSClientConfig: transport.TLSClientConfig,
TLSClientConfig: tlsConfig,
Proxy: transport.Proxy,
}

Expand All @@ -135,11 +179,6 @@ func newWebSocketClient(ctx context.Context, endpointPath string, httpClient *ap
headers.Set("X-Echo-Run-Count", strconv.FormatUint(runCount, 10))
headers.Set("X-Agent-UUID", uuid.GetUUID())

// Parse the "base URL" the client uses to connect to RC.
url, err := httpClient.BaseURL()
if err != nil {
return nil, err
}
// Append the specific path to the WebSocket resource.
url.Path = path.Join(url.Path, endpointPath)
// Change the protocol to use websockets.
Expand All @@ -150,7 +189,11 @@ func newWebSocketClient(ctx context.Context, endpointPath string, httpClient *ap
url.Scheme = "wss"
}

log.Debugf("connecting to websocket endpoint %s", url.String())
logMsg := "connecting to websocket endpoint " + url.String()
if alpnMode == ALPNDDRC {
logMsg += " with ALPN " + alpnProtocolDDRC
}
log.Debug(logMsg)

// Send the HTTP request, wait for the upgrade response and then perform the
// WebSocket handshake.
Expand All @@ -160,7 +203,11 @@ func newWebSocketClient(ctx context.Context, endpointPath string, httpClient *ap
}
_ = resp.Body.Close()

log.Debug("websocket connected")
logMsg = "websocket connected"
if alpnMode == ALPNDDRC {
logMsg += " with ALPN " + alpnProtocolDDRC
}
log.Debug(logMsg)

return conn, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestWebSocketTest(t *testing.T) {

// Drive the test and ensure the expected number of frames were
// exchanged.
n, err := runEchoLoop(ctx, client, 1)
n, err := runEchoLoop(ctx, client, 1, ALPNDefault)
assert.NoError(err)
assert.Equal(uint(len(tt.frames)), n)
})
Expand Down Expand Up @@ -294,7 +294,7 @@ func TestNewWebSocket(t *testing.T) {
client, err := api.NewHTTPClient(auth, agentConfig, url)
assert.NoError(err)

conn, err := newWebSocketClient(ctx, tt.path, client, 0)
conn, err := newWebSocketClient(ctx, tt.path, client, 0, ALPNDefault)
assert.NoError(err)
defer conn.Close()

Expand Down Expand Up @@ -376,10 +376,10 @@ func TestWebSocketTest_PING_PONG(t *testing.T) {
client, err := api.NewHTTPClient(api.Auth{}, agentConfig, url)
assert.NoError(err)

conn, err := newWebSocketClient(ctx, "/bananas", client, 1)
conn, err := newWebSocketClient(ctx, "/bananas", client, 1, ALPNDefault)
assert.NoError(err)
defer conn.Close()

_, err = runEchoLoop(ctx, client, 1)
_, err = runEchoLoop(ctx, client, 1, ALPNDefault)
assert.NoError(err)
}
Loading