Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions gravity/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2594,6 +2594,8 @@ func (g *GravityClient) processSessionMessage(streamIndex int, msg *pb.SessionMe
case *pb.SessionMessage_MonitorReport:
// Server should not send monitor reports to client — ignore
g.logger.Debug("received unexpected monitor report from server, ignoring")
case *pb.SessionMessage_ResourceSyncResponse:
g.handleResourceSyncResponse(msg.Id, m.ResourceSyncResponse)
default:
g.logger.Debug("unhandled session message type: %T", m)
}
Expand Down Expand Up @@ -2892,6 +2894,11 @@ func (g *GravityClient) handleRouteSandboxResponse(msgID string, response *pb.Ro
}
}

func (g *GravityClient) handleResourceSyncResponse(msgID string, response *pb.ResourceSyncResponse) {
g.logger.Debug("handleResourceSyncResponse: Received resource sync response for msgID=%s, added=%d, removed=%d, success=%v",
msgID, response.Added, response.Removed, response.Success)
}

func (g *GravityClient) handleCheckpointURLResponse(msgID string, response *pb.CheckpointURLResponse) {
g.logger.Debug("handleCheckpointURLResponse: Received checkpoint URL response for msgID=%s, sandbox=%s, success=%v",
msgID, response.SandboxId, response.Success)
Expand Down Expand Up @@ -5008,6 +5015,68 @@ func (g *GravityClient) Unprovision(deploymentID string, ownerID string) error {
return nil
}

// SendResourceSync sends the full set of active resources to gravity.
// Gravity compares this against its cached state for the machine and
// unprovisions any stale entries that hadron no longer has. This replaces
// per-resource RouteDeploymentRequest calls during reconnection, providing
// atomic full-state sync that prevents stale VIP entries in the gossip mesh.
func (g *GravityClient) SendResourceSync(deployments []ResourceSyncItem, sandboxes []ResourceSyncItem, timeout time.Duration) (*pb.ResourceSyncResponse, error) {
g.mu.RLock()
ready := g.sessionReady
g.mu.RUnlock()

select {
case <-ready:
case <-time.After(timeout):
return nil, fmt.Errorf("timeout waiting for session ready before resource sync")
case <-g.ctx.Done():
return nil, fmt.Errorf("context cancelled while waiting for session ready")
}

pbDeployments := make([]*pb.ResourceSyncEntry, len(deployments))
for i, d := range deployments {
pbDeployments[i] = &pb.ResourceSyncEntry{
Id: d.ID,
VirtualIp: d.VirtualIP,
OwnerId: d.OwnerID,
}
}
pbSandboxes := make([]*pb.ResourceSyncEntry, len(sandboxes))
for i, s := range sandboxes {
pbSandboxes[i] = &pb.ResourceSyncEntry{
Id: s.ID,
VirtualIp: s.VirtualIP,
OwnerId: s.OwnerID,
}
}

msgID := generateMessageID()
msg := &pb.SessionMessage{
Id: msgID,
MessageType: &pb.SessionMessage_ResourceSync{
ResourceSync: &pb.ResourceSyncRequest{
Deployments: pbDeployments,
Sandboxes: pbSandboxes,
},
},
}

// Broadcast to ALL connected ions.
if err := g.broadcastSessionMessage(msg); err != nil {
return nil, fmt.Errorf("failed to send resource sync: %w", err)
}

g.logger.Info("SendResourceSync: broadcast %d deployments + %d sandboxes", len(deployments), len(sandboxes))
return &pb.ResourceSyncResponse{Success: true, Added: int32(len(deployments) + len(sandboxes))}, nil
}

// ResourceSyncItem represents a single resource in a sync request.
type ResourceSyncItem struct {
ID string
VirtualIP string
OwnerID string
}

// SendEvacuateRequest sends a request to evacuate sandboxes on this machine.
func (g *GravityClient) SendEvacuateRequest(machineID, reason string, sandboxes []*pb.SandboxEvacInfo) error {
msg := &pb.SessionMessage{
Expand Down
Loading
Loading