Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.EnableJobClusterRequest;
import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.EnableJobClusterResponse;
import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobClusterRequest;
import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.HealthCheckRequest;
import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobClusterResponse;
import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobDetailsResponse;
import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobSchedInfoRequest;
Expand Down Expand Up @@ -93,6 +94,7 @@
import io.mantisrx.master.jobcluster.job.JobHelper;
import io.mantisrx.master.jobcluster.job.JobState;
import io.mantisrx.master.jobcluster.proto.BaseResponse;
import io.mantisrx.master.jobcluster.proto.HealthCheckResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobDetailsRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.ResubmitWorkerRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateJobClusterArtifactRequest;
Expand Down Expand Up @@ -241,6 +243,7 @@ private Receive getInitializedBehavior() {
.match(EnableJobClusterRequest.class, this::onJobClusterEnable)
.match(DisableJobClusterRequest.class, this::onJobClusterDisable)
.match(GetJobClusterRequest.class, this::onJobClusterGet)
.match(HealthCheckRequest.class, this::onHealthCheck)
.match(ListCompletedJobsInClusterRequest.class, this::onJobListCompleted)
.match(GetLastSubmittedJobIdStreamRequest.class, this::onGetLastSubmittedJobIdSubject)
.match(ListArchivedWorkersRequest.class, this::onListArchivedWorkers)
Expand Down Expand Up @@ -561,6 +564,7 @@ public void onJobClusterGet(GetJobClusterRequest r) {
sender.tell(new GetJobClusterResponse(r.requestId, CLIENT_ERROR_NOT_FOUND, "No such Job cluster " + r.getJobClusterName(), empty()), getSelf());
}
}

@Override
public void onGetLastSubmittedJobIdSubject(GetLastSubmittedJobIdStreamRequest r) {
Optional<JobClusterInfo> jobClusterInfo = jobClusterInfoManager.getJobClusterInfo(r.getClusterName());
Expand Down Expand Up @@ -665,6 +669,17 @@ public void onGetJobScalerRuleStream(final JobClusterScalerRuleProto.GetJobScale
}
}

public void onHealthCheck(HealthCheckRequest request) {
Optional<JobClusterInfo> jobClusterInfo = jobClusterInfoManager.getJobClusterInfo(request.getClusterName());

if (jobClusterInfo.isPresent()) {
jobClusterInfo.get().jobClusterActor.forward(request, getContext());
} else {
ActorRef sender = getSender();
sender.tell(new HealthCheckResponse(request.requestId, CLIENT_ERROR_NOT_FOUND, "No such Job cluster " + request.getClusterName(), false, null), getSelf());
}
}

private void onTerminated(final Terminated terminated) {
logger.warn("onTerminated {}", terminated.actor());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@
import io.mantisrx.server.core.master.MasterMonitor;
import io.mantisrx.server.master.LeaderRedirectionFilter;
import io.mantisrx.server.master.persistence.IMantisPersistenceProvider;
import io.mantisrx.master.jobcluster.HealthCheckExtension;
import io.mantisrx.server.master.resourcecluster.ResourceClusters;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -94,6 +97,7 @@ public class MasterApiAkkaService extends BaseService {
private final ExecutorService executorService;
private final CountDownLatch serviceLatch = new CountDownLatch(1);
private final HttpsConnectionContext httpsConnectionContext;
private final List<HealthCheckExtension> healthCheckExtensions;

public MasterApiAkkaService(final MasterMonitor masterMonitor,
final MasterDescription masterDescription,
Expand All @@ -117,7 +121,8 @@ public MasterApiAkkaService(final MasterMonitor masterMonitor,
mantisStorageProvider,
lifecycleEventPublisher,
leadershipManager,
null
null,
List.of()
);
}
public MasterApiAkkaService(final MasterMonitor masterMonitor,
Expand All @@ -131,6 +136,33 @@ public MasterApiAkkaService(final MasterMonitor masterMonitor,
final LifecycleEventPublisher lifecycleEventPublisher,
final ILeadershipManager leadershipManager,
final HttpsConnectionContext httpsConnectionContext) {
this(
masterMonitor,
masterDescription,
jobClustersManagerActor,
statusEventBrokerActor,
resourceClusters,
resourceClustersHostManagerActor,
serverPort,
mantisStorageProvider,
lifecycleEventPublisher,
leadershipManager,
httpsConnectionContext,
List.of()
);
}
public MasterApiAkkaService(final MasterMonitor masterMonitor,
final MasterDescription masterDescription,
final ActorRef jobClustersManagerActor,
final ActorRef statusEventBrokerActor,
final ResourceClusters resourceClusters,
final ActorRef resourceClustersHostManagerActor,
final int serverPort,
final IMantisPersistenceProvider mantisStorageProvider,
final LifecycleEventPublisher lifecycleEventPublisher,
final ILeadershipManager leadershipManager,
final HttpsConnectionContext httpsConnectionContext,
final List<HealthCheckExtension> healthCheckExtensions) {
super(true);
Preconditions.checkNotNull(masterMonitor, "MasterMonitor");
Preconditions.checkNotNull(masterDescription, "masterDescription");
Expand All @@ -149,6 +181,7 @@ public MasterApiAkkaService(final MasterMonitor masterMonitor,
this.storageProvider = mantisStorageProvider;
this.lifecycleEventPublisher = lifecycleEventPublisher;
this.leadershipManager = leadershipManager;
this.healthCheckExtensions = Objects.requireNonNullElseGet(healthCheckExtensions, List::of);
this.system = ActorSystem.create("MasterApiActorSystem");
this.materializer = Materializer.createMaterializer(system);
this.mantisMasterRoute = configureApiRoutes(this.system);
Expand All @@ -170,7 +203,7 @@ public MasterApiAkkaService(final MasterMonitor masterMonitor,

private MantisMasterRoute configureApiRoutes(final ActorSystem actorSystem) {
// Setup API routes
final JobClusterRouteHandler jobClusterRouteHandler = new JobClusterRouteHandlerAkkaImpl(jobClustersManagerActor);
final JobClusterRouteHandler jobClusterRouteHandler = new JobClusterRouteHandlerAkkaImpl(jobClustersManagerActor, this.healthCheckExtensions);
final JobRouteHandler jobRouteHandler = new JobRouteHandlerAkkaImpl(jobClustersManagerActor);

final MasterDescriptionRoute masterDescriptionRoute = new MasterDescriptionRoute(masterDescription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

package io.mantisrx.master.api.akka.route.handlers;

import io.mantisrx.master.jobcluster.proto.HealthCheckResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateSchedulingInfoRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateSchedulingInfoResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterScalerRuleProto;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;

public interface JobClusterRouteHandler {
Expand Down Expand Up @@ -59,4 +61,6 @@ CompletionStage<UpdateSchedulingInfoResponse> updateSchedulingInfo(
CompletionStage<JobClusterScalerRuleProto.GetScalerRulesResponse> getScalerRules(JobClusterScalerRuleProto.GetScalerRulesRequest request);

CompletionStage<JobClusterScalerRuleProto.DeleteScalerRuleResponse> deleteScalerRule(JobClusterScalerRuleProto.DeleteScalerRuleRequest request);

CompletionStage<HealthCheckResponse> healthCheck(String clusterName, List<String> jobIds, Map<String, Object> context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.master.JobClustersManagerActor.UpdateSchedulingInfo;
import io.mantisrx.master.jobcluster.HealthCheckExtension;
import io.mantisrx.master.jobcluster.proto.HealthCheckResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateSchedulingInfoRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateSchedulingInfoResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterScalerRuleProto;
import io.mantisrx.server.master.config.ConfigurationProvider;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import org.slf4j.Logger;
Expand All @@ -40,9 +45,15 @@ public class JobClusterRouteHandlerAkkaImpl implements JobClusterRouteHandler {
private final ActorRef jobClustersManagerActor;
private final Counter allJobClustersGET;
private final Duration timeout;
private final List<HealthCheckExtension> healthCheckExtensions;

public JobClusterRouteHandlerAkkaImpl(ActorRef jobClusterManagerActor) {
this(jobClusterManagerActor, List.of());
}

public JobClusterRouteHandlerAkkaImpl(ActorRef jobClusterManagerActor, List<HealthCheckExtension> healthCheckExtensions) {
this.jobClustersManagerActor = jobClusterManagerActor;
this.healthCheckExtensions = healthCheckExtensions;
long timeoutMs = Optional.ofNullable(ConfigurationProvider.getConfig().getMasterApiAskTimeoutMs()).orElse(1000L);
this.timeout = Duration.ofMillis(timeoutMs);
Metrics m = new Metrics.Builder()
Expand Down Expand Up @@ -177,4 +188,45 @@ public CompletionStage<JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse>
.thenApply(JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse.class::cast);
return response;
}

@Override
public CompletionStage<HealthCheckResponse> healthCheck(
String clusterName, List<String> jobIds, Map<String, Object> context) {
JobClusterManagerProto.HealthCheckRequest request =
new JobClusterManagerProto.HealthCheckRequest(clusterName, jobIds);

return ask(jobClustersManagerActor, request, timeout)
.thenApply(HealthCheckResponse.class::cast)
.thenApply(actorResponse -> {
if (!actorResponse.isHealthy() || healthCheckExtensions.isEmpty()) {
return actorResponse;
}

for (HealthCheckExtension healthCheckExtension : healthCheckExtensions) {
Map<String, Object> scopedContext = extractContext(context, healthCheckExtension.contextId());
HealthCheckResponse result = healthCheckExtension.checkHealth(clusterName, jobIds, scopedContext);
if (!result.isHealthy()) {
return result;
}
}

return actorResponse;
});
}

private static Map<String, Object> extractContext(Map<String, Object> context, String contextId) {
if (contextId == null || contextId.isEmpty()) {
return context;
}

String prefix = contextId + ".";
Map<String, Object> scoped = new HashMap<>();
for (Map.Entry<String, Object> entry : context.entrySet()) {
if (entry.getKey().startsWith(prefix)) {
scoped.put(entry.getKey().substring(prefix.length()), entry.getValue());
}
}

return scoped;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public static class Endpoints {

public static final String RESOURCE_CLUSTERS = "api.v1.resourceClusters";
public static final String JOB_CLUSTER_SCALER_RULES = "api.v1.jobClusters.scalerRules";
public static final String JOB_CLUSTER_INSTANCE_HEALTHCHECK = "api.v1.jobClusters.instance.healthcheck";

private static String[] endpoints = new String[]{
JOB_ARTIFACTS,
Expand Down Expand Up @@ -99,7 +100,8 @@ public static class Endpoints {
JOB_DISCOVERY_STREAM,
LAST_SUBMITTED_JOB_ID_STREAM,
RESOURCE_CLUSTERS,
JOB_CLUSTER_SCALER_RULES
JOB_CLUSTER_SCALER_RULES,
JOB_CLUSTER_INSTANCE_HEALTHCHECK
};

private static Set<String> endpointSet = Sets.newHashSet(endpoints);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,17 @@
import io.mantisrx.master.api.akka.route.handlers.JobClusterRouteHandler;
import io.mantisrx.master.api.akka.route.proto.JobClusterProtoAdapter;
import io.mantisrx.master.jobcluster.proto.BaseResponse;
import io.mantisrx.master.jobcluster.proto.HealthCheckResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto;
import io.mantisrx.master.jobcluster.proto.JobClusterScalerRuleProto;
import io.mantisrx.runtime.NamedJobDefinition;
import io.mantisrx.server.master.config.ConfigurationProvider;
import io.mantisrx.server.master.config.MasterConfiguration;
import io.mantisrx.shaded.com.google.common.base.Strings;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -211,6 +216,14 @@ public Route constructRoutes() {
))
),

// api/v1/jobClusters/{}/healthcheck
path(
PathMatchers.segment().slash("healthcheck"),
(clusterName) -> pathEndOrSingleSlash(() ->
get(() -> healthCheckRoute(clusterName))
)
),

// api/v1/jobClusters/{}/scalerRules
path(
PathMatchers.segment().slash("scalerRules"),
Expand Down Expand Up @@ -785,4 +798,25 @@ private Route deleteScalerRuleRoute(String clusterName, String ruleId) {
HttpRequestMetrics.HttpVerb.DELETE
);
}

private Route healthCheckRoute(String clusterName) {
logger.trace("GET /api/v1/jobClusters/{}/healthcheck called", clusterName);

return parameterMap(params -> {
String jobIdsParam = params.get("job-ids");
List<String> jobIds = (jobIdsParam != null && !jobIdsParam.isBlank())
? Arrays.asList(jobIdsParam.split(","))
: null;

Map<String, Object> context = new HashMap<>(params);
CompletionStage<HealthCheckResponse> response = jobClusterRouteHandler.healthCheck(clusterName, jobIds, context);

return completeAsync(
response,
resp -> complete(StatusCodes.OK, resp, Jackson.marshaller()),
HttpRequestMetrics.Endpoints.JOB_CLUSTER_INSTANCE_HEALTHCHECK,
HttpRequestMetrics.HttpVerb.GET
);
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2026 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.mantisrx.master.jobcluster;

import io.mantisrx.master.jobcluster.proto.HealthCheckResponse;
import java.util.List;
import java.util.Map;

/**
* Extension point for job cluster health checks. Implementations are composed into an ordered
* list and executed sequentially — the chain stops at the first failure.
*
* <p>Each implementation declares a {@link #contextId()} that acts as a namespace prefix.
* The handler extracts query params prefixed with this ID (e.g., {@code alertSystem.names})
* and passes them to the check with the prefix stripped.
*/
public interface HealthCheckExtension {

/**
* Namespace prefix for context parameters belonging to this health check.
* For example, a check with contextId "alertSystem" receives query params like
* {@code alertSystem.names} as {@code names} in its context map.
*
* @return the context namespace.
*/
String contextId();

/**
* Run a health check against the given job cluster.
*
* @param clusterName the job cluster name.
* @param jobIds specific job IDs to check. Used to overwrite the default behavior of checking all active jobs.
* @param context parameters namespaced to this check (prefix already stripped).
*
* @return a healthy response, or an unhealthy response with the appropriate {@link
* HealthCheckResponse.FailureReason}
*/
HealthCheckResponse checkHealth(String clusterName, List<String> jobIds, Map<String, Object> context);
}
Loading
Loading