diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java index 00b2ebbd6..9abe41266 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java @@ -32,6 +32,7 @@ import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.EnableJobClusterRequest; import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.EnableJobClusterResponse; import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobClusterRequest; +import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.HealthCheckRequest; import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobClusterResponse; import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobDetailsResponse; import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobSchedInfoRequest; @@ -93,6 +94,7 @@ import io.mantisrx.master.jobcluster.job.JobHelper; import io.mantisrx.master.jobcluster.job.JobState; import io.mantisrx.master.jobcluster.proto.BaseResponse; +import io.mantisrx.master.jobcluster.proto.HealthCheckResponse; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobDetailsRequest; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.ResubmitWorkerRequest; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateJobClusterArtifactRequest; @@ -241,6 +243,7 @@ private Receive getInitializedBehavior() { .match(EnableJobClusterRequest.class, this::onJobClusterEnable) .match(DisableJobClusterRequest.class, this::onJobClusterDisable) .match(GetJobClusterRequest.class, this::onJobClusterGet) + .match(HealthCheckRequest.class, this::onHealthCheck) .match(ListCompletedJobsInClusterRequest.class, this::onJobListCompleted) .match(GetLastSubmittedJobIdStreamRequest.class, this::onGetLastSubmittedJobIdSubject) .match(ListArchivedWorkersRequest.class, this::onListArchivedWorkers) @@ -561,6 +564,7 @@ public void onJobClusterGet(GetJobClusterRequest r) { sender.tell(new GetJobClusterResponse(r.requestId, CLIENT_ERROR_NOT_FOUND, "No such Job cluster " + r.getJobClusterName(), empty()), getSelf()); } } + @Override public void onGetLastSubmittedJobIdSubject(GetLastSubmittedJobIdStreamRequest r) { Optional jobClusterInfo = jobClusterInfoManager.getJobClusterInfo(r.getClusterName()); @@ -665,6 +669,17 @@ public void onGetJobScalerRuleStream(final JobClusterScalerRuleProto.GetJobScale } } + public void onHealthCheck(HealthCheckRequest request) { + Optional jobClusterInfo = jobClusterInfoManager.getJobClusterInfo(request.getClusterName()); + + if (jobClusterInfo.isPresent()) { + jobClusterInfo.get().jobClusterActor.forward(request, getContext()); + } else { + ActorRef sender = getSender(); + sender.tell(new HealthCheckResponse(request.requestId, CLIENT_ERROR_NOT_FOUND, "No such Job cluster " + request.getClusterName(), false, null), getSelf()); + } + } + private void onTerminated(final Terminated terminated) { logger.warn("onTerminated {}", terminated.actor()); } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/MasterApiAkkaService.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/MasterApiAkkaService.java index a42136f0d..b87199539 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/MasterApiAkkaService.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/MasterApiAkkaService.java @@ -64,7 +64,10 @@ import io.mantisrx.server.core.master.MasterMonitor; import io.mantisrx.server.master.LeaderRedirectionFilter; import io.mantisrx.server.master.persistence.IMantisPersistenceProvider; +import io.mantisrx.master.jobcluster.HealthCheckExtension; import io.mantisrx.server.master.resourcecluster.ResourceClusters; +import java.util.List; +import java.util.Objects; import java.util.concurrent.CompletionStage; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -94,6 +97,7 @@ public class MasterApiAkkaService extends BaseService { private final ExecutorService executorService; private final CountDownLatch serviceLatch = new CountDownLatch(1); private final HttpsConnectionContext httpsConnectionContext; + private final List healthCheckExtensions; public MasterApiAkkaService(final MasterMonitor masterMonitor, final MasterDescription masterDescription, @@ -117,7 +121,8 @@ public MasterApiAkkaService(final MasterMonitor masterMonitor, mantisStorageProvider, lifecycleEventPublisher, leadershipManager, - null + null, + List.of() ); } public MasterApiAkkaService(final MasterMonitor masterMonitor, @@ -131,6 +136,33 @@ public MasterApiAkkaService(final MasterMonitor masterMonitor, final LifecycleEventPublisher lifecycleEventPublisher, final ILeadershipManager leadershipManager, final HttpsConnectionContext httpsConnectionContext) { + this( + masterMonitor, + masterDescription, + jobClustersManagerActor, + statusEventBrokerActor, + resourceClusters, + resourceClustersHostManagerActor, + serverPort, + mantisStorageProvider, + lifecycleEventPublisher, + leadershipManager, + httpsConnectionContext, + List.of() + ); + } + public MasterApiAkkaService(final MasterMonitor masterMonitor, + final MasterDescription masterDescription, + final ActorRef jobClustersManagerActor, + final ActorRef statusEventBrokerActor, + final ResourceClusters resourceClusters, + final ActorRef resourceClustersHostManagerActor, + final int serverPort, + final IMantisPersistenceProvider mantisStorageProvider, + final LifecycleEventPublisher lifecycleEventPublisher, + final ILeadershipManager leadershipManager, + final HttpsConnectionContext httpsConnectionContext, + final List healthCheckExtensions) { super(true); Preconditions.checkNotNull(masterMonitor, "MasterMonitor"); Preconditions.checkNotNull(masterDescription, "masterDescription"); @@ -149,6 +181,7 @@ public MasterApiAkkaService(final MasterMonitor masterMonitor, this.storageProvider = mantisStorageProvider; this.lifecycleEventPublisher = lifecycleEventPublisher; this.leadershipManager = leadershipManager; + this.healthCheckExtensions = Objects.requireNonNullElseGet(healthCheckExtensions, List::of); this.system = ActorSystem.create("MasterApiActorSystem"); this.materializer = Materializer.createMaterializer(system); this.mantisMasterRoute = configureApiRoutes(this.system); @@ -170,7 +203,7 @@ public MasterApiAkkaService(final MasterMonitor masterMonitor, private MantisMasterRoute configureApiRoutes(final ActorSystem actorSystem) { // Setup API routes - final JobClusterRouteHandler jobClusterRouteHandler = new JobClusterRouteHandlerAkkaImpl(jobClustersManagerActor); + final JobClusterRouteHandler jobClusterRouteHandler = new JobClusterRouteHandlerAkkaImpl(jobClustersManagerActor, this.healthCheckExtensions); final JobRouteHandler jobRouteHandler = new JobRouteHandlerAkkaImpl(jobClustersManagerActor); final MasterDescriptionRoute masterDescriptionRoute = new MasterDescriptionRoute(masterDescription); diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobClusterRouteHandler.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobClusterRouteHandler.java index 6369478a9..c6285e45e 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobClusterRouteHandler.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobClusterRouteHandler.java @@ -16,11 +16,13 @@ package io.mantisrx.master.api.akka.route.handlers; +import io.mantisrx.master.jobcluster.proto.HealthCheckResponse; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateSchedulingInfoRequest; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateSchedulingInfoResponse; import io.mantisrx.master.jobcluster.proto.JobClusterScalerRuleProto; - +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletionStage; public interface JobClusterRouteHandler { @@ -59,4 +61,6 @@ CompletionStage updateSchedulingInfo( CompletionStage getScalerRules(JobClusterScalerRuleProto.GetScalerRulesRequest request); CompletionStage deleteScalerRule(JobClusterScalerRuleProto.DeleteScalerRuleRequest request); + + CompletionStage healthCheck(String clusterName, List jobIds, Map context); } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobClusterRouteHandlerAkkaImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobClusterRouteHandlerAkkaImpl.java index 0f2b311c7..081bd0768 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobClusterRouteHandlerAkkaImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobClusterRouteHandlerAkkaImpl.java @@ -23,12 +23,17 @@ import io.mantisrx.common.metrics.Metrics; import io.mantisrx.common.metrics.MetricsRegistry; import io.mantisrx.master.JobClustersManagerActor.UpdateSchedulingInfo; +import io.mantisrx.master.jobcluster.HealthCheckExtension; +import io.mantisrx.master.jobcluster.proto.HealthCheckResponse; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateSchedulingInfoRequest; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateSchedulingInfoResponse; import io.mantisrx.master.jobcluster.proto.JobClusterScalerRuleProto; import io.mantisrx.server.master.config.ConfigurationProvider; import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletionStage; import org.slf4j.Logger; @@ -40,9 +45,15 @@ public class JobClusterRouteHandlerAkkaImpl implements JobClusterRouteHandler { private final ActorRef jobClustersManagerActor; private final Counter allJobClustersGET; private final Duration timeout; + private final List healthCheckExtensions; public JobClusterRouteHandlerAkkaImpl(ActorRef jobClusterManagerActor) { + this(jobClusterManagerActor, List.of()); + } + + public JobClusterRouteHandlerAkkaImpl(ActorRef jobClusterManagerActor, List healthCheckExtensions) { this.jobClustersManagerActor = jobClusterManagerActor; + this.healthCheckExtensions = healthCheckExtensions; long timeoutMs = Optional.ofNullable(ConfigurationProvider.getConfig().getMasterApiAskTimeoutMs()).orElse(1000L); this.timeout = Duration.ofMillis(timeoutMs); Metrics m = new Metrics.Builder() @@ -177,4 +188,45 @@ public CompletionStage .thenApply(JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse.class::cast); return response; } + + @Override + public CompletionStage healthCheck( + String clusterName, List jobIds, Map context) { + JobClusterManagerProto.HealthCheckRequest request = + new JobClusterManagerProto.HealthCheckRequest(clusterName, jobIds); + + return ask(jobClustersManagerActor, request, timeout) + .thenApply(HealthCheckResponse.class::cast) + .thenApply(actorResponse -> { + if (!actorResponse.isHealthy() || healthCheckExtensions.isEmpty()) { + return actorResponse; + } + + for (HealthCheckExtension healthCheckExtension : healthCheckExtensions) { + Map scopedContext = extractContext(context, healthCheckExtension.contextId()); + HealthCheckResponse result = healthCheckExtension.checkHealth(clusterName, jobIds, scopedContext); + if (!result.isHealthy()) { + return result; + } + } + + return actorResponse; + }); + } + + private static Map extractContext(Map context, String contextId) { + if (contextId == null || contextId.isEmpty()) { + return context; + } + + String prefix = contextId + "."; + Map scoped = new HashMap<>(); + for (Map.Entry entry : context.entrySet()) { + if (entry.getKey().startsWith(prefix)) { + scoped.put(entry.getKey().substring(prefix.length()), entry.getValue()); + } + } + + return scoped; + } } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/HttpRequestMetrics.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/HttpRequestMetrics.java index 772f56610..c43dfb0d8 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/HttpRequestMetrics.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/HttpRequestMetrics.java @@ -67,6 +67,7 @@ public static class Endpoints { public static final String RESOURCE_CLUSTERS = "api.v1.resourceClusters"; public static final String JOB_CLUSTER_SCALER_RULES = "api.v1.jobClusters.scalerRules"; + public static final String JOB_CLUSTER_INSTANCE_HEALTHCHECK = "api.v1.jobClusters.instance.healthcheck"; private static String[] endpoints = new String[]{ JOB_ARTIFACTS, @@ -99,7 +100,8 @@ public static class Endpoints { JOB_DISCOVERY_STREAM, LAST_SUBMITTED_JOB_ID_STREAM, RESOURCE_CLUSTERS, - JOB_CLUSTER_SCALER_RULES + JOB_CLUSTER_SCALER_RULES, + JOB_CLUSTER_INSTANCE_HEALTHCHECK }; private static Set endpointSet = Sets.newHashSet(endpoints); diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/JobClustersRoute.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/JobClustersRoute.java index 9c33738b6..178df9863 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/JobClustersRoute.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/JobClustersRoute.java @@ -56,12 +56,17 @@ import io.mantisrx.master.api.akka.route.handlers.JobClusterRouteHandler; import io.mantisrx.master.api.akka.route.proto.JobClusterProtoAdapter; import io.mantisrx.master.jobcluster.proto.BaseResponse; +import io.mantisrx.master.jobcluster.proto.HealthCheckResponse; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto; import io.mantisrx.master.jobcluster.proto.JobClusterScalerRuleProto; import io.mantisrx.runtime.NamedJobDefinition; import io.mantisrx.server.master.config.ConfigurationProvider; import io.mantisrx.server.master.config.MasterConfiguration; import io.mantisrx.shaded.com.google.common.base.Strings; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -211,6 +216,14 @@ public Route constructRoutes() { )) ), + // api/v1/jobClusters/{}/healthcheck + path( + PathMatchers.segment().slash("healthcheck"), + (clusterName) -> pathEndOrSingleSlash(() -> + get(() -> healthCheckRoute(clusterName)) + ) + ), + // api/v1/jobClusters/{}/scalerRules path( PathMatchers.segment().slash("scalerRules"), @@ -785,4 +798,25 @@ private Route deleteScalerRuleRoute(String clusterName, String ruleId) { HttpRequestMetrics.HttpVerb.DELETE ); } + + private Route healthCheckRoute(String clusterName) { + logger.trace("GET /api/v1/jobClusters/{}/healthcheck called", clusterName); + + return parameterMap(params -> { + String jobIdsParam = params.get("job-ids"); + List jobIds = (jobIdsParam != null && !jobIdsParam.isBlank()) + ? Arrays.asList(jobIdsParam.split(",")) + : null; + + Map context = new HashMap<>(params); + CompletionStage response = jobClusterRouteHandler.healthCheck(clusterName, jobIds, context); + + return completeAsync( + response, + resp -> complete(StatusCodes.OK, resp, Jackson.marshaller()), + HttpRequestMetrics.Endpoints.JOB_CLUSTER_INSTANCE_HEALTHCHECK, + HttpRequestMetrics.HttpVerb.GET + ); + }); + } } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/HealthCheckExtension.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/HealthCheckExtension.java new file mode 100644 index 000000000..e6cdb2d7b --- /dev/null +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/HealthCheckExtension.java @@ -0,0 +1,53 @@ +/* + * Copyright 2026 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mantisrx.master.jobcluster; + +import io.mantisrx.master.jobcluster.proto.HealthCheckResponse; +import java.util.List; +import java.util.Map; + +/** + * Extension point for job cluster health checks. Implementations are composed into an ordered + * list and executed sequentially — the chain stops at the first failure. + * + *

Each implementation declares a {@link #contextId()} that acts as a namespace prefix. + * The handler extracts query params prefixed with this ID (e.g., {@code alertSystem.names}) + * and passes them to the check with the prefix stripped. + */ +public interface HealthCheckExtension { + + /** + * Namespace prefix for context parameters belonging to this health check. + * For example, a check with contextId "alertSystem" receives query params like + * {@code alertSystem.names} as {@code names} in its context map. + * + * @return the context namespace. + */ + String contextId(); + + /** + * Run a health check against the given job cluster. + * + * @param clusterName the job cluster name. + * @param jobIds specific job IDs to check. Used to overwrite the default behavior of checking all active jobs. + * @param context parameters namespaced to this check (prefix already stripped). + * + * @return a healthy response, or an unhealthy response with the appropriate {@link + * HealthCheckResponse.FailureReason} + */ + HealthCheckResponse checkHealth(String clusterName, List jobIds, Map context); +} diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobClusterActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobClusterActor.java index d2c238e8f..86209ab0d 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobClusterActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobClusterActor.java @@ -54,6 +54,7 @@ import io.mantisrx.master.events.LifecycleEventPublisher; import io.mantisrx.master.events.LifecycleEventsProto; import io.mantisrx.master.jobcluster.job.CostsCalculator; +import io.mantisrx.master.jobcluster.job.FilterableMantisWorkerMetadataWritable; import io.mantisrx.master.jobcluster.job.IMantisJobMetadata; import io.mantisrx.master.jobcluster.job.JobActor; import io.mantisrx.master.jobcluster.job.JobHelper; @@ -61,8 +62,11 @@ import io.mantisrx.master.jobcluster.job.MantisJobMetadataImpl; import io.mantisrx.master.jobcluster.job.MantisJobMetadataView; import io.mantisrx.master.jobcluster.job.worker.IMantisWorkerMetadata; +import io.mantisrx.master.jobcluster.proto.HealthCheckResponse; +import io.mantisrx.master.jobcluster.proto.HealthCheckResponse.FailedWorker; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.DeleteJobClusterResponse; +import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.HealthCheckRequest; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.DisableJobClusterRequest; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.DisableJobClusterResponse; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.EnableJobClusterRequest; @@ -118,6 +122,7 @@ import io.mantisrx.master.jobcluster.scaler.JobClusterScalerRuleDataFactory; import io.mantisrx.master.jobcluster.scaler.JobClusterScalerRuleDataImplWritable; import io.mantisrx.runtime.JobSla; +import io.mantisrx.runtime.MantisJobState; import io.mantisrx.runtime.command.InvalidJobException; import io.mantisrx.server.core.JobCompletedReason; import io.mantisrx.server.master.InvalidJobRequestException; @@ -617,6 +622,7 @@ private Receive buildInitializedBehavior() { new EnableJobClusterResponse(x.requestId, SUCCESS, genUnexpectedMsg(x.toString(), this.name, state)), getSelf())) .match(GetJobClusterRequest.class, this::onJobClusterGet) + .match(HealthCheckRequest.class, this::onHealthCheck) .match(JobClusterProto.DeleteJobClusterRequest.class, this::onJobClusterDelete) .match(ListArchivedWorkersRequest.class, this::onListArchivedWorkers) .match(ListCompletedJobsInClusterRequest.class, this::onJobListCompleted) @@ -2633,6 +2639,49 @@ public void onJobScalerRuleStream(JobClusterScalerRuleProto.GetJobScalerRuleStre } } + public void onHealthCheck(final HealthCheckRequest request) { + final ActorRef sender = getSender(); + final ActorRef self = getSelf(); + + Set prefilteredJobIds = new HashSet<>(); + if (request.getJobIds() != null && !request.getJobIds().isEmpty()) { + request.getJobIds().stream() + .map(JobId::fromId) + .filter(Optional::isPresent) + .map(Optional::get) + .forEach(prefilteredJobIds::add); + } + + ListJobCriteria criteria = new ListJobCriteria(); + getFilteredNonTerminalJobList(criteria, prefilteredJobIds) + .collect(Lists::newArrayList, (failedWorkers, view) -> { + if (view.getWorkerMetadataList() != null) { + for (FilterableMantisWorkerMetadataWritable worker : view.getWorkerMetadataList()) { + if (worker.getState() != MantisJobState.Started) { + failedWorkers.add(new FailedWorker( + worker.getWorkerIndex(), + worker.getWorkerNumber(), + worker.getState().name())); + } + } + } + }) + .subscribe( + failedWorkers -> { + if (failedWorkers.isEmpty()) { + sender.tell(HealthCheckResponse.healthy(request.requestId), self); + } else { + sender.tell(HealthCheckResponse.unhealthyWorkers(request.requestId, failedWorkers), self); + } + }, + error -> { + logger.error("Health check failed for cluster {}", name, error); + sender.tell(new HealthCheckResponse(request.requestId, SERVER_ERROR, + "Health check failed: " + error.getMessage(), false, null), self); + } + ); + } + static final class JobInfo { final long submittedAt; diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/HealthCheckResponse.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/HealthCheckResponse.java new file mode 100644 index 000000000..a0de73ddd --- /dev/null +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/HealthCheckResponse.java @@ -0,0 +1,79 @@ +/* + * Copyright 2026 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mantisrx.master.jobcluster.proto; + +import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonCreator; +import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonInclude; +import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonProperty; +import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonSubTypes; +import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonTypeInfo; +import java.util.List; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; + +@Getter +@EqualsAndHashCode(callSuper = true) +@ToString +@JsonInclude(JsonInclude.Include.ALWAYS) +public class HealthCheckResponse extends BaseResponse { + + private final boolean isHealthy; + private final FailureReason failureReason; + + @JsonCreator + public HealthCheckResponse( + @JsonProperty("requestId") long requestId, + @JsonProperty("responseCode") ResponseCode responseCode, + @JsonProperty("message") String message, + @JsonProperty("isHealthy") boolean isHealthy, + @JsonProperty("failureReason") FailureReason failureReason) { + super(requestId, responseCode, message); + this.isHealthy = isHealthy; + this.failureReason = failureReason; + } + + public static HealthCheckResponse healthy(long requestId) { + return new HealthCheckResponse( + requestId, ResponseCode.SUCCESS, "OK", true, null); + } + + public static HealthCheckResponse unhealthyWorkers(long requestId, List failedWorkers) { + return new HealthCheckResponse( + requestId, ResponseCode.SERVER_ERROR, "unhealthy workers", false, + new WorkerFailure(failedWorkers)); + } + + public static HealthCheckResponse unhealthyAlerts(long requestId, List alerts) { + return new HealthCheckResponse( + requestId, ResponseCode.SERVER_ERROR, "alerts firing", false, + new AlertFailure(alerts)); + } + + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") + @JsonSubTypes({ + @JsonSubTypes.Type(value = WorkerFailure.class, name = "workerStatus"), + @JsonSubTypes.Type(value = AlertFailure.class, name = "alertsFiring") + }) + public sealed interface FailureReason permits WorkerFailure, AlertFailure {} + + public record WorkerFailure(List failedWorkers) implements FailureReason {} + + public record AlertFailure(List alerts) implements FailureReason {} + + public record FailedWorker(int workerIndex, int workerNumber, String state) {} +} diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterManagerProto.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterManagerProto.java index 6612b9c79..ff238ebdd 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterManagerProto.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterManagerProto.java @@ -2253,5 +2253,34 @@ public String toString() { } } + public static final class HealthCheckRequest extends BaseRequest { + private final String clusterName; + private final List jobIds; + + public HealthCheckRequest(final String clusterName, final List jobIds) { + super(); + Preconditions.checkArg( + clusterName != null && !clusterName.isEmpty(), + "Cluster name cannot be null or empty"); + this.clusterName = clusterName; + this.jobIds = jobIds; + } + + public String getClusterName() { + return clusterName; + } + public List getJobIds() { + return jobIds; + } + + @Override + public String toString() { + return "HealthCheckRequest{" + + "clusterName='" + clusterName + '\'' + + ", jobIds=" + jobIds + + ", requestId=" + requestId + + '}'; + } + } } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/MasterMain.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/MasterMain.java index 273172052..0c04d80cc 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/MasterMain.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/MasterMain.java @@ -36,6 +36,8 @@ import io.mantisrx.master.api.akka.MasterApiAkkaService; import io.mantisrx.master.events.AuditEventBrokerActor; import io.mantisrx.master.events.AuditEventSubscriber; +import io.mantisrx.master.jobcluster.HealthCheckExtension; +import java.util.List; import io.mantisrx.master.events.AuditEventSubscriberAkkaImpl; import io.mantisrx.master.events.AuditEventSubscriberLoggingImpl; import io.mantisrx.master.events.LifecycleEventPublisher; @@ -103,11 +105,21 @@ public class MasterMain implements Service { private MasterConfiguration config; private ILeadershipManager leadershipManager; private MasterMonitor monitor; + private final List healthCheckExtensions; public MasterMain( ConfigurationFactory configFactory, MantisPropertiesLoader dynamicPropertiesLoader, AuditEventSubscriber auditEventSubscriber) { + this(configFactory, dynamicPropertiesLoader, auditEventSubscriber, List.of()); + } + + public MasterMain( + ConfigurationFactory configFactory, + MantisPropertiesLoader dynamicPropertiesLoader, + AuditEventSubscriber auditEventSubscriber, + List healthCheckExtensions) { + this.healthCheckExtensions = healthCheckExtensions == null ? List.of() : healthCheckExtensions; String test = "{\"jobId\":\"sine-function-1\",\"status\":{\"jobId\":\"sine-function-1\",\"stageNum\":1,\"workerIndex\":0,\"workerNumber\":2,\"type\":\"HEARTBEAT\",\"message\":\"heartbeat\",\"state\":\"Noop\",\"hostname\":null,\"timestamp\":1525813363585,\"reason\":\"Normal\",\"payloads\":[{\"type\":\"SubscriptionState\",\"data\":\"false\"},{\"type\":\"IncomingDataDrop\",\"data\":\"{\\\"onNextCount\\\":0,\\\"droppedCount\\\":0}\"}]}}"; Metrics metrics = new Metrics.Builder() @@ -217,7 +229,7 @@ public MasterMain( monitor.start(); mantisServices.addService(leaderFactory.createLeaderElector(config, leadershipManager)); mantisServices.addService(new MasterApiAkkaService(monitor, leadershipManager.getDescription(), jobClusterManagerActor, statusEventBrokerActor, - resourceClusters, resourceClustersHostActor, config.getApiPort(), storageProvider, lifecycleEventPublisher, leadershipManager)); + resourceClusters, resourceClustersHostActor, config.getApiPort(), storageProvider, lifecycleEventPublisher, leadershipManager, null, healthCheckExtensions)); if (leaderFactory instanceof LocalLeaderFactory && !config.isLocalMode()) { logger.error("local mode is [ {} ] and leader factory is {} this configuration is unsafe", config.isLocalMode(), leaderFactory.getClass().getSimpleName()); diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/api/akka/route/handlers/JobClusterRouteHandlerAkkaImplTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/api/akka/route/handlers/JobClusterRouteHandlerAkkaImplTest.java new file mode 100644 index 000000000..e867560c3 --- /dev/null +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/api/akka/route/handlers/JobClusterRouteHandlerAkkaImplTest.java @@ -0,0 +1,264 @@ +/* + * Copyright 2026 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mantisrx.master.api.akka.route.handlers; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.testkit.javadsl.TestKit; +import com.netflix.mantis.master.scheduler.TestHelpers; +import io.mantisrx.master.JobClustersManagerActor; +import io.mantisrx.master.events.AuditEventSubscriberLoggingImpl; +import io.mantisrx.master.events.LifecycleEventPublisher; +import io.mantisrx.master.events.LifecycleEventPublisherImpl; +import io.mantisrx.master.events.StatusEventSubscriberLoggingImpl; +import io.mantisrx.master.events.WorkerEventSubscriberLoggingImpl; +import io.mantisrx.master.jobcluster.HealthCheckExtension; +import io.mantisrx.master.jobcluster.job.CostsCalculator; +import io.mantisrx.master.jobcluster.proto.HealthCheckResponse; +import io.mantisrx.master.jobcluster.proto.HealthCheckResponse.FailedWorker; +import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto; +import io.mantisrx.master.scheduler.FakeMantisScheduler; +import io.mantisrx.server.master.persistence.FileBasedPersistenceProvider; +import io.mantisrx.server.master.persistence.MantisJobStore; +import io.mantisrx.server.master.scheduler.MantisScheduler; +import io.mantisrx.server.master.scheduler.MantisSchedulerFactory; +import java.io.File; +import java.nio.file.Files; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.commons.io.FileUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class JobClusterRouteHandlerAkkaImplTest { + + private static ActorSystem system; + private static ActorRef jobClustersManagerActor; + private static File stateDirectory; + + @BeforeClass + public static void setup() throws Exception { + TestHelpers.setupMasterConfig(); + system = ActorSystem.create("JobClusterRouteHandlerTest"); + stateDirectory = Files.createTempDirectory("test-handler").toFile(); + + LifecycleEventPublisher lifecycleEventPublisher = new LifecycleEventPublisherImpl( + new AuditEventSubscriberLoggingImpl(), + new StatusEventSubscriberLoggingImpl(), + new WorkerEventSubscriberLoggingImpl()); + + jobClustersManagerActor = system.actorOf( + JobClustersManagerActor.props( + new MantisJobStore(new FileBasedPersistenceProvider(stateDirectory, true)), + lifecycleEventPublisher, + CostsCalculator.noop(), + 0), + "jobClustersManager"); + + MantisSchedulerFactory mantisSchedulerFactory = mock(MantisSchedulerFactory.class); + MantisScheduler fakeScheduler = new FakeMantisScheduler(jobClustersManagerActor); + when(mantisSchedulerFactory.forJob(any())).thenReturn(fakeScheduler); + jobClustersManagerActor.tell( + new JobClusterManagerProto.JobClustersManagerInitialize( + mantisSchedulerFactory, false), ActorRef.noSender()); + + Thread.sleep(1000); + } + + @AfterClass + public static void tearDown() throws Exception { + TestKit.shutdownActorSystem(system); + system = null; + FileUtils.deleteDirectory(stateDirectory); + } + + @Test + public void testHealthCheckNoExtensions() throws Exception { + JobClusterRouteHandlerAkkaImpl handler = new JobClusterRouteHandlerAkkaImpl(jobClustersManagerActor); + + HealthCheckResponse response = handler.healthCheck("nonExistentCluster", null, Map.of()) + .toCompletableFuture().get(5, TimeUnit.SECONDS); + + assertNotNull(response); + } + + @Test + public void testHealthCheckExtensionChainingStopsOnFailure() throws Exception { + HealthCheckExtension passingExtension = new HealthCheckExtension() { + @Override + public String contextId() { return "passing"; } + + @Override + public HealthCheckResponse checkHealth(String clusterName, List jobIds, Map context) { + return HealthCheckResponse.healthy(0); + } + }; + + HealthCheckExtension failingExtension = new HealthCheckExtension() { + @Override + public String contextId() { return "alertSystem"; } + + @Override + public HealthCheckResponse checkHealth(String clusterName, List jobIds, Map context) { + return HealthCheckResponse.unhealthyAlerts(0, List.of("alert1")); + } + }; + + HealthCheckExtension neverCalledExtension = new HealthCheckExtension() { + @Override + public String contextId() { return "neverCalled"; } + + @Override + public HealthCheckResponse checkHealth(String clusterName, List jobIds, Map context) { + throw new AssertionError("This extension should not be called"); + } + }; + + ActorRef mockActor = system.actorOf(Props.create(HealthyActorStub.class)); + JobClusterRouteHandlerAkkaImpl handler = new JobClusterRouteHandlerAkkaImpl( + mockActor, List.of(passingExtension, failingExtension, neverCalledExtension)); + + HealthCheckResponse response = handler.healthCheck("testCluster", null, Map.of()) + .toCompletableFuture().get(5, TimeUnit.SECONDS); + + assertFalse(response.isHealthy()); + assertTrue(response.getFailureReason() instanceof HealthCheckResponse.AlertFailure); + HealthCheckResponse.AlertFailure alertFailure = (HealthCheckResponse.AlertFailure) response.getFailureReason(); + assertEquals(List.of("alert1"), alertFailure.alerts()); + } + + @Test + public void testHealthCheckExtensionReceivesScopedContext() throws Exception { + final Map[] capturedContext = new Map[1]; + + HealthCheckExtension contextCapturingExtension = new HealthCheckExtension() { + @Override + public String contextId() { return "myCheck"; } + + @Override + public HealthCheckResponse checkHealth(String clusterName, List jobIds, Map context) { + capturedContext[0] = context; + return HealthCheckResponse.healthy(0); + } + }; + + ActorRef mockActor = system.actorOf(Props.create(HealthyActorStub.class)); + JobClusterRouteHandlerAkkaImpl handler = new JobClusterRouteHandlerAkkaImpl( + mockActor, List.of(contextCapturingExtension)); + + Map fullContext = Map.of( + "myCheck.names", "foo,bar", + "myCheck.threshold", "5", + "otherCheck.something", "ignored", + "job-ids", "job-1" + ); + + HealthCheckResponse response = handler.healthCheck("testCluster", null, fullContext) + .toCompletableFuture().get(5, TimeUnit.SECONDS); + + assertTrue(response.isHealthy()); + assertNotNull(capturedContext[0]); + assertEquals("foo,bar", capturedContext[0].get("names")); + assertEquals("5", capturedContext[0].get("threshold")); + assertNull(capturedContext[0].get("otherCheck.something")); + assertNull(capturedContext[0].get("job-ids")); + } + + @Test + public void testHealthCheckAllExtensionsPassReturnsHealthy() throws Exception { + HealthCheckExtension ext1 = new HealthCheckExtension() { + @Override + public String contextId() { return "ext1"; } + + @Override + public HealthCheckResponse checkHealth(String clusterName, List jobIds, Map context) { + return HealthCheckResponse.healthy(0); + } + }; + + HealthCheckExtension ext2 = new HealthCheckExtension() { + @Override + public String contextId() { return "ext2"; } + + @Override + public HealthCheckResponse checkHealth(String clusterName, List jobIds, Map context) { + return HealthCheckResponse.healthy(0); + } + }; + + ActorRef mockActor = system.actorOf(Props.create(HealthyActorStub.class)); + JobClusterRouteHandlerAkkaImpl handler = new JobClusterRouteHandlerAkkaImpl( + mockActor, List.of(ext1, ext2)); + + HealthCheckResponse response = handler.healthCheck("testCluster", null, Map.of()) + .toCompletableFuture().get(5, TimeUnit.SECONDS); + + assertTrue(response.isHealthy()); + } + + @Test + public void testHealthCheckActorUnhealthySkipsExtensions() throws Exception { + HealthCheckExtension shouldNotBeCalled = new HealthCheckExtension() { + @Override + public String contextId() { return "ext"; } + + @Override + public HealthCheckResponse checkHealth(String clusterName, List jobIds, Map context) { + throw new AssertionError("Extension should not be called when actor reports unhealthy"); + } + }; + + ActorRef mockActor = system.actorOf(Props.create(UnhealthyActorStub.class)); + JobClusterRouteHandlerAkkaImpl handler = new JobClusterRouteHandlerAkkaImpl( + mockActor, List.of(shouldNotBeCalled)); + + HealthCheckResponse response = handler.healthCheck("testCluster", null, Map.of()) + .toCompletableFuture().get(5, TimeUnit.SECONDS); + + assertFalse(response.isHealthy()); + } + + public static class HealthyActorStub extends AbstractActor { + @Override + public Receive createReceive() { + return receiveBuilder() + .match(JobClusterManagerProto.HealthCheckRequest.class, req -> + getSender().tell(HealthCheckResponse.healthy(req.requestId), getSelf())) + .build(); + } + } + + public static class UnhealthyActorStub extends AbstractActor { + @Override + public Receive createReceive() { + return receiveBuilder() + .match(JobClusterManagerProto.HealthCheckRequest.class, req -> + getSender().tell(HealthCheckResponse.unhealthyWorkers(req.requestId, + List.of(new FailedWorker(0, 1, "Accepted"))), getSelf())) + .build(); + } + } +} diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/api/akka/route/v1/JobClustersRouteTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/api/akka/route/v1/JobClustersRouteTest.java index 076439a5d..39bc5e5ce 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/api/akka/route/v1/JobClustersRouteTest.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/api/akka/route/v1/JobClustersRouteTest.java @@ -18,6 +18,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -155,6 +156,8 @@ public void testIt() throws Exception { testJobClustersDelete(); testJobClustersPut(); testJobClusterInstanceGET(); + testJobClusterHealthCheck(); + testJobClusterHealthCheckNonExistent(); testNonExistentJobClusterInstanceGET(); testJobClusterInstancePOSTNotAllowed(); testJobClusterInstanceValidUpdate(); @@ -679,6 +682,29 @@ private void testJobClusterActionDisableDELETENotAllowed() throws InterruptedExc testDelete(getJobClusterDisableEp(JobClusterPayloads.CLUSTER_NAME), StatusCodes.METHOD_NOT_ALLOWED, null); } + private void testJobClusterHealthCheck() throws InterruptedException { + testGet( + getJobClusterInstanceEndpoint(JobClusterPayloads.CLUSTER_NAME) + "/healthcheck", + StatusCodes.OK, + response -> { + try { + ObjectMapper mapper = new ObjectMapper(); + JsonNode node = mapper.readTree(response); + assertTrue("Expected 'healthy' field in response: " + response, node.has("healthy")); + assertTrue(node.get("healthy").asBoolean()); + } catch (IOException e) { + fail("Failed to parse health check response: " + e.getMessage()); + } + }); + } + + private void testJobClusterHealthCheckNonExistent() throws InterruptedException { + testGet( + getJobClusterInstanceEndpoint("nonExistentCluster") + "/healthcheck", + StatusCodes.NOT_FOUND, + null); + } + private void testJobClusterDeleteWithoutRequiredParam() throws InterruptedException { testDelete( getJobClusterInstanceEndpoint(JobClusterPayloads.CLUSTER_NAME), diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java index 854b4dde3..428279bb0 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java @@ -91,6 +91,7 @@ import io.mantisrx.master.jobcluster.job.worker.WorkerState; import io.mantisrx.master.jobcluster.job.worker.WorkerTerminate; import io.mantisrx.master.jobcluster.proto.BaseResponse; +import io.mantisrx.master.jobcluster.proto.HealthCheckResponse; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateJobClusterArtifactRequest; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateJobClusterLabelsRequest; @@ -3377,6 +3378,133 @@ public void testExpireOldJobs() { //TODO } + // HEALTH CHECK TESTS ////////////////////////////////////////////////////////////////////////// + @Test + public void testHealthCheckAllWorkersStarted() { + try { + TestKit probe = new TestKit(system); + String clusterName = "testHealthCheckAllWorkersStarted"; + MantisScheduler schedulerMock = JobTestHelper.createMockScheduler(); + MantisJobStore jobStoreMock = mock(MantisJobStore.class); + String jobId = clusterName + "-1"; + JobDefinition jobDefn = createJob(clusterName); + + final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); + ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); + assertEquals(SUCCESS, createResp.responseCode); + + JobTestHelper.submitJobAndVerifySuccess(probe, clusterName, jobClusterActor, jobDefn, jobId); + JobTestHelper.getJobDetailsAndVerify(probe, jobClusterActor, jobId, SUCCESS, JobState.Accepted); + JobTestHelper.sendLaunchedInitiatedStartedEventsToWorker(probe, jobClusterActor, jobId, 1, new WorkerId(clusterName, jobId, 0, 1)); + JobTestHelper.getJobDetailsAndVerify(probe, jobClusterActor, jobId, SUCCESS, JobState.Launched); + + jobClusterActor.tell(new JobClusterManagerProto.HealthCheckRequest(clusterName, null), probe.getRef()); + HealthCheckResponse healthResp = probe.expectMsgClass(HealthCheckResponse.class); + + assertTrue(healthResp.isHealthy()); + assertEquals(SUCCESS, healthResp.responseCode); + assertNull(healthResp.getFailureReason()); + + probe.getSystem().stop(jobClusterActor); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + @Test + public void testHealthCheckWithUnstartedWorkers() { + try { + TestKit probe = new TestKit(system); + String clusterName = "testHealthCheckWithUnstartedWorkers"; + MantisScheduler schedulerMock = JobTestHelper.createMockScheduler(); + MantisJobStore jobStoreMock = mock(MantisJobStore.class); + String jobId = clusterName + "-1"; + JobDefinition jobDefn = createJob(clusterName); + + final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); + ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); + assertEquals(SUCCESS, createResp.responseCode); + + JobTestHelper.submitJobAndVerifySuccess(probe, clusterName, jobClusterActor, jobDefn, jobId); + JobTestHelper.getJobDetailsAndVerify(probe, jobClusterActor, jobId, SUCCESS, JobState.Accepted); + + jobClusterActor.tell(new JobClusterManagerProto.HealthCheckRequest(clusterName, null), probe.getRef()); + HealthCheckResponse healthResp = probe.expectMsgClass(HealthCheckResponse.class); + + assertFalse(healthResp.isHealthy()); + assertEquals(SERVER_ERROR, healthResp.responseCode); + assertNotNull(healthResp.getFailureReason()); + assertTrue(healthResp.getFailureReason() instanceof HealthCheckResponse.WorkerFailure); + + HealthCheckResponse.WorkerFailure workerFailure = + (HealthCheckResponse.WorkerFailure) healthResp.getFailureReason(); + assertFalse(workerFailure.failedWorkers().isEmpty()); + + probe.getSystem().stop(jobClusterActor); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + @Test + public void testHealthCheckWithJobIdFilter() { + try { + TestKit probe = new TestKit(system); + String clusterName = "testHealthCheckWithJobIdFilter"; + MantisScheduler schedulerMock = JobTestHelper.createMockScheduler(); + MantisJobStore jobStoreMock = mock(MantisJobStore.class); + String jobId = clusterName + "-1"; + JobDefinition jobDefn = createJob(clusterName); + + final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); + ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); + assertEquals(SUCCESS, createResp.responseCode); + + JobTestHelper.submitJobAndVerifySuccess(probe, clusterName, jobClusterActor, jobDefn, jobId); + JobTestHelper.sendLaunchedInitiatedStartedEventsToWorker(probe, jobClusterActor, jobId, 1, new WorkerId(clusterName, jobId, 0, 1)); + + jobClusterActor.tell(new JobClusterManagerProto.HealthCheckRequest(clusterName, ImmutableList.of(jobId)), probe.getRef()); + HealthCheckResponse healthResp = probe.expectMsgClass(HealthCheckResponse.class); + + assertTrue(healthResp.isHealthy()); + assertEquals(SUCCESS, healthResp.responseCode); + + probe.getSystem().stop(jobClusterActor); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + @Test + public void testHealthCheckNoActiveJobs() throws Exception { + TestKit probe = new TestKit(system); + String clusterName = "testHealthCheckNoActiveJobs"; + MantisSchedulerFactory schedulerMock = mock(MantisSchedulerFactory.class); + MantisJobStore jobStoreMock = mock(MantisJobStore.class); + + final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); + ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, schedulerMock, eventPublisher, costsCalculator, 0)); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); + assertEquals(SUCCESS, createResp.responseCode); + + jobClusterActor.tell(new JobClusterManagerProto.HealthCheckRequest(clusterName, null), probe.getRef()); + HealthCheckResponse healthResp = probe.expectMsgClass(HealthCheckResponse.class); + + assertTrue(healthResp.isHealthy()); + assertEquals(SUCCESS, healthResp.responseCode); + + probe.getSystem().stop(jobClusterActor); + } }