diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java index 00b2ebbd6..14716584f 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java @@ -32,6 +32,8 @@ import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.EnableJobClusterRequest; import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.EnableJobClusterResponse; import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobClusterRequest; +import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.HealthCheckRequest; +import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.HealthCheckResponse; import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobClusterResponse; import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobDetailsResponse; import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobSchedInfoRequest; @@ -241,6 +243,7 @@ private Receive getInitializedBehavior() { .match(EnableJobClusterRequest.class, this::onJobClusterEnable) .match(DisableJobClusterRequest.class, this::onJobClusterDisable) .match(GetJobClusterRequest.class, this::onJobClusterGet) + .match(HealthCheckRequest.class, this::onHealthCheck) .match(ListCompletedJobsInClusterRequest.class, this::onJobListCompleted) .match(GetLastSubmittedJobIdStreamRequest.class, this::onGetLastSubmittedJobIdSubject) .match(ListArchivedWorkersRequest.class, this::onListArchivedWorkers) @@ -561,6 +564,7 @@ public void onJobClusterGet(GetJobClusterRequest r) { sender.tell(new GetJobClusterResponse(r.requestId, CLIENT_ERROR_NOT_FOUND, "No such Job cluster " + r.getJobClusterName(), empty()), getSelf()); } } + @Override public void onGetLastSubmittedJobIdSubject(GetLastSubmittedJobIdStreamRequest r) { Optional jobClusterInfo = jobClusterInfoManager.getJobClusterInfo(r.getClusterName()); @@ -665,6 +669,17 @@ public void onGetJobScalerRuleStream(final JobClusterScalerRuleProto.GetJobScale } } + public void onHealthCheck(HealthCheckRequest request) { + Optional jobClusterInfo = jobClusterInfoManager.getJobClusterInfo(request.clusterName); + + if (jobClusterInfo.isPresent()) { + jobClusterInfo.get().jobClusterActor.forward(request, getContext()); + } else { + ActorRef sender = getSender(); + sender.tell(new HealthCheckResponse(request.requestId, CLIENT_ERROR_NOT_FOUND, "No such Job cluster " + request.clusterName, false, null), getSelf()); + } + } + private void onTerminated(final Terminated terminated) { logger.warn("onTerminated {}", terminated.actor()); } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobClusterRouteHandler.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobClusterRouteHandler.java index 6369478a9..70ba99b3a 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobClusterRouteHandler.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobClusterRouteHandler.java @@ -16,11 +16,12 @@ package io.mantisrx.master.api.akka.route.handlers; + import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateSchedulingInfoRequest; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateSchedulingInfoResponse; import io.mantisrx.master.jobcluster.proto.JobClusterScalerRuleProto; - +import java.util.List; import java.util.concurrent.CompletionStage; public interface JobClusterRouteHandler { @@ -59,4 +60,6 @@ CompletionStage updateSchedulingInfo( CompletionStage getScalerRules(JobClusterScalerRuleProto.GetScalerRulesRequest request); CompletionStage deleteScalerRule(JobClusterScalerRuleProto.DeleteScalerRuleRequest request); + + CompletionStage healthCheck(String clusterName, List jobIds); } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobClusterRouteHandlerAkkaImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobClusterRouteHandlerAkkaImpl.java index 0f2b311c7..a9174fa0b 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobClusterRouteHandlerAkkaImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobClusterRouteHandlerAkkaImpl.java @@ -23,12 +23,14 @@ import io.mantisrx.common.metrics.Metrics; import io.mantisrx.common.metrics.MetricsRegistry; import io.mantisrx.master.JobClustersManagerActor.UpdateSchedulingInfo; + import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateSchedulingInfoRequest; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateSchedulingInfoResponse; import io.mantisrx.master.jobcluster.proto.JobClusterScalerRuleProto; import io.mantisrx.server.master.config.ConfigurationProvider; import java.time.Duration; +import java.util.List; import java.util.Optional; import java.util.concurrent.CompletionStage; import org.slf4j.Logger; @@ -177,4 +179,10 @@ public CompletionStage .thenApply(JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse.class::cast); return response; } + + @Override + public CompletionStage healthCheck(String clusterName, List jobIds) { + JobClusterManagerProto.HealthCheckRequest request = new JobClusterManagerProto.HealthCheckRequest(clusterName, jobIds); + return ask(jobClustersManagerActor, request, timeout).thenApply(JobClusterManagerProto.HealthCheckResponse.class::cast); + } } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/HttpRequestMetrics.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/HttpRequestMetrics.java index 772f56610..c43dfb0d8 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/HttpRequestMetrics.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/HttpRequestMetrics.java @@ -67,6 +67,7 @@ public static class Endpoints { public static final String RESOURCE_CLUSTERS = "api.v1.resourceClusters"; public static final String JOB_CLUSTER_SCALER_RULES = "api.v1.jobClusters.scalerRules"; + public static final String JOB_CLUSTER_INSTANCE_HEALTHCHECK = "api.v1.jobClusters.instance.healthcheck"; private static String[] endpoints = new String[]{ JOB_ARTIFACTS, @@ -99,7 +100,8 @@ public static class Endpoints { JOB_DISCOVERY_STREAM, LAST_SUBMITTED_JOB_ID_STREAM, RESOURCE_CLUSTERS, - JOB_CLUSTER_SCALER_RULES + JOB_CLUSTER_SCALER_RULES, + JOB_CLUSTER_INSTANCE_HEALTHCHECK }; private static Set endpointSet = Sets.newHashSet(endpoints); diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/JobClustersRoute.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/JobClustersRoute.java index 9c33738b6..cf7c6cd69 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/JobClustersRoute.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/JobClustersRoute.java @@ -56,12 +56,16 @@ import io.mantisrx.master.api.akka.route.handlers.JobClusterRouteHandler; import io.mantisrx.master.api.akka.route.proto.JobClusterProtoAdapter; import io.mantisrx.master.jobcluster.proto.BaseResponse; + import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto; import io.mantisrx.master.jobcluster.proto.JobClusterScalerRuleProto; import io.mantisrx.runtime.NamedJobDefinition; import io.mantisrx.server.master.config.ConfigurationProvider; import io.mantisrx.server.master.config.MasterConfiguration; -import io.mantisrx.shaded.com.google.common.base.Strings; +import java.util.Arrays; + +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -211,6 +215,14 @@ public Route constructRoutes() { )) ), + // api/v1/jobClusters/{}/healthcheck + path( + PathMatchers.segment().slash("healthcheck"), + (clusterName) -> pathEndOrSingleSlash(() -> + get(() -> healthCheckRoute(clusterName)) + ) + ), + // api/v1/jobClusters/{}/scalerRules path( PathMatchers.segment().slash("scalerRules"), @@ -785,4 +797,23 @@ private Route deleteScalerRuleRoute(String clusterName, String ruleId) { HttpRequestMetrics.HttpVerb.DELETE ); } + + private Route healthCheckRoute(String clusterName) { + logger.trace("GET /api/v1/jobClusters/{}/healthcheck called", clusterName); + + return parameterMap(params -> { + String jobIdsParam = params.get("job-ids"); + List jobIds = (jobIdsParam != null && !jobIdsParam.isBlank()) + ? Arrays.asList(jobIdsParam.split(",")) + : null; + + CompletionStage response = jobClusterRouteHandler.healthCheck(clusterName, jobIds); + return completeAsync( + response, + resp -> complete(StatusCodes.OK, resp, Jackson.marshaller()), + HttpRequestMetrics.Endpoints.JOB_CLUSTER_INSTANCE_HEALTHCHECK, + HttpRequestMetrics.HttpVerb.GET + ); + }); + } } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobClusterActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobClusterActor.java index d2c238e8f..1c2c21fe7 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobClusterActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobClusterActor.java @@ -54,6 +54,7 @@ import io.mantisrx.master.events.LifecycleEventPublisher; import io.mantisrx.master.events.LifecycleEventsProto; import io.mantisrx.master.jobcluster.job.CostsCalculator; +import io.mantisrx.master.jobcluster.job.FilterableMantisWorkerMetadataWritable; import io.mantisrx.master.jobcluster.job.IMantisJobMetadata; import io.mantisrx.master.jobcluster.job.JobActor; import io.mantisrx.master.jobcluster.job.JobHelper; @@ -61,8 +62,11 @@ import io.mantisrx.master.jobcluster.job.MantisJobMetadataImpl; import io.mantisrx.master.jobcluster.job.MantisJobMetadataView; import io.mantisrx.master.jobcluster.job.worker.IMantisWorkerMetadata; +import io.mantisrx.master.jobcluster.proto.BaseResponse.ResponseCode; +import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UnreadyWorker; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.DeleteJobClusterResponse; +import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.HealthCheckRequest; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.DisableJobClusterRequest; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.DisableJobClusterResponse; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.EnableJobClusterRequest; @@ -110,14 +114,17 @@ import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyResponse; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateSchedulingInfoResponse; import io.mantisrx.master.jobcluster.proto.JobClusterProto; +import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.HealthCheckResponse; import io.mantisrx.master.jobcluster.proto.JobClusterProto.JobStartedEvent; import io.mantisrx.master.jobcluster.proto.JobClusterProto.KillJobRequest; +import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UnreadyWorkers; import io.mantisrx.master.jobcluster.proto.JobClusterScalerRuleProto; import io.mantisrx.master.jobcluster.proto.JobProto; import io.mantisrx.master.jobcluster.scaler.IJobClusterScalerRuleData; import io.mantisrx.master.jobcluster.scaler.JobClusterScalerRuleDataFactory; import io.mantisrx.master.jobcluster.scaler.JobClusterScalerRuleDataImplWritable; import io.mantisrx.runtime.JobSla; +import io.mantisrx.runtime.MantisJobState; import io.mantisrx.runtime.command.InvalidJobException; import io.mantisrx.server.core.JobCompletedReason; import io.mantisrx.server.master.InvalidJobRequestException; @@ -617,6 +624,7 @@ private Receive buildInitializedBehavior() { new EnableJobClusterResponse(x.requestId, SUCCESS, genUnexpectedMsg(x.toString(), this.name, state)), getSelf())) .match(GetJobClusterRequest.class, this::onJobClusterGet) + .match(HealthCheckRequest.class, this::onHealthCheck) .match(JobClusterProto.DeleteJobClusterRequest.class, this::onJobClusterDelete) .match(ListArchivedWorkersRequest.class, this::onListArchivedWorkers) .match(ListCompletedJobsInClusterRequest.class, this::onJobListCompleted) @@ -2633,6 +2641,58 @@ public void onJobScalerRuleStream(JobClusterScalerRuleProto.GetJobScalerRuleStre } } + public void onHealthCheck(final HealthCheckRequest request) { + final ActorRef sender = getSender(); + final ActorRef self = getSelf(); + + Set prefilteredJobIds = new HashSet<>(); + if (request.jobIds != null && !request.jobIds.isEmpty()) { + request.jobIds.stream() + .map(JobId::fromId) + .filter(Optional::isPresent) + .map(Optional::get) + .forEach(prefilteredJobIds::add); + } + + ListJobCriteria criteria = new ListJobCriteria(); + getFilteredNonTerminalJobList(criteria, prefilteredJobIds) + .collect(Lists::newArrayList, (unreadyWorkers, view) -> { + if (view.getWorkerMetadataList() != null) { + for (FilterableMantisWorkerMetadataWritable worker : view.getWorkerMetadataList()) { + if (worker.getState() != MantisJobState.Started) { + unreadyWorkers.add(new UnreadyWorker( + worker.getWorkerIndex(), + worker.getWorkerNumber(), + worker.getState().name())); + } + } + } + }) + .subscribe( + unreadyWorkers -> { + if (unreadyWorkers.isEmpty()) { + HealthCheckResponse response + = new HealthCheckResponse(request.requestId, ResponseCode.SUCCESS, "OK", true, null); + sender.tell(response, self); + } else { + HealthCheckResponse response + = new HealthCheckResponse( + request.requestId, + ResponseCode.SUCCESS, + "unready workers", + false, + new UnreadyWorkers(unreadyWorkers)); + sender.tell(response, self); + } + }, + error -> { + logger.error("Health check failed for cluster {}", name, error); + sender.tell(new JobClusterManagerProto.HealthCheckResponse(request.requestId, SERVER_ERROR, + "Health check failed: " + error.getMessage(), false, null), self); + } + ); + } + static final class JobInfo { final long submittedAt; diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterManagerProto.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterManagerProto.java index 6612b9c79..7ad367a29 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterManagerProto.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterManagerProto.java @@ -2253,5 +2253,48 @@ public String toString() { } } + public static final class HealthCheckRequest extends BaseRequest { + public final String clusterName; + public final List jobIds; + public HealthCheckRequest(final String clusterName, final List jobIds) { + super(); + Preconditions.checkArg( + clusterName != null && !clusterName.isEmpty(), + "Cluster name cannot be null or empty"); + this.clusterName = clusterName; + this.jobIds = jobIds; + } + + @Override + public String toString() { + return "HealthCheckRequest{" + + "clusterName='" + clusterName + '\'' + + ", jobIds=" + jobIds + + ", requestId=" + requestId + + '}'; + } + } + + public static final class HealthCheckResponse extends BaseResponse { + public final boolean isHealthy; + public final UnreadyWorkers unreadyWorkers; + + @JsonCreator + @JsonIgnoreProperties(ignoreUnknown = true) + public HealthCheckResponse( + @JsonProperty("requestId") long requestId, + @JsonProperty("responseCode") ResponseCode responseCode, + @JsonProperty("message") String message, + @JsonProperty("isHealthy") boolean isHealthy, + @JsonProperty("unreadyWorkers") UnreadyWorkers unreadyWorkers) { + super(requestId, responseCode, message); + this.isHealthy = isHealthy; + this.unreadyWorkers = unreadyWorkers; + } + } + + public record UnreadyWorkers(List workers) {} + + public record UnreadyWorker(int workerIndex, int workerNumber, String state) {} } diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/api/akka/route/v1/JobClustersRouteTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/api/akka/route/v1/JobClustersRouteTest.java index 076439a5d..c6d7e9d66 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/api/akka/route/v1/JobClustersRouteTest.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/api/akka/route/v1/JobClustersRouteTest.java @@ -18,6 +18,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -155,6 +156,8 @@ public void testIt() throws Exception { testJobClustersDelete(); testJobClustersPut(); testJobClusterInstanceGET(); + testJobClusterHealthCheck(); + testJobClusterHealthCheckNonExistent(); testNonExistentJobClusterInstanceGET(); testJobClusterInstancePOSTNotAllowed(); testJobClusterInstanceValidUpdate(); @@ -777,4 +780,27 @@ private void compareClustersPayload(String clusterListResponse) { assert ex == null; } } + + private void testJobClusterHealthCheck() throws InterruptedException { + testGet( + getJobClusterInstanceEndpoint(JobClusterPayloads.CLUSTER_NAME) + "/healthcheck", + StatusCodes.OK, + response -> { + try { + ObjectMapper mapper = new ObjectMapper(); + JsonNode node = mapper.readTree(response); + assertTrue("Expected 'isHealthy' field in response: " + response, node.has("isHealthy")); + assertTrue(node.get("isHealthy").asBoolean()); + } catch (IOException e) { + fail("Failed to parse health check response: " + e.getMessage()); + } + }); + } + + private void testJobClusterHealthCheckNonExistent() throws InterruptedException { + testGet( + getJobClusterInstanceEndpoint("nonExistentCluster") + "/healthcheck", + StatusCodes.NOT_FOUND, + null); + } } diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java index 854b4dde3..05ea1960a 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java @@ -3377,6 +3377,88 @@ public void testExpireOldJobs() { //TODO } + // HEALTH CHECK TESTS ////////////////////////////////////////////////////////////////////////// + /** + * Submits a job, optionally starts workers, sends a health check request, and returns the response. + */ + private JobClusterManagerProto.HealthCheckResponse submitJobAndHealthCheck( + String clusterName, boolean startWorkers, List jobIds) throws Exception { + TestKit probe = new TestKit(system); + MantisScheduler schedulerMock = JobTestHelper.createMockScheduler(); + MantisJobStore jobStoreMock = mock(MantisJobStore.class); + String jobId = clusterName + "-1"; + JobDefinition jobDefinition = createJob(clusterName); + + final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); + ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); + assertEquals(SUCCESS, createResp.responseCode); + + JobTestHelper.submitJobAndVerifySuccess(probe, clusterName, jobClusterActor, jobDefinition, jobId); + if (startWorkers) { + JobTestHelper.sendLaunchedInitiatedStartedEventsToWorker(probe, jobClusterActor, jobId, 1, new WorkerId(clusterName, jobId, 0, 1)); + } + + jobClusterActor.tell(new JobClusterManagerProto.HealthCheckRequest(clusterName, jobIds), probe.getRef()); + JobClusterManagerProto.HealthCheckResponse healthResp = probe.expectMsgClass(JobClusterManagerProto.HealthCheckResponse.class); + + probe.getSystem().stop(jobClusterActor); + return healthResp; + } + + @Test + public void testHealthCheckAllWorkersStarted() throws Exception { + JobClusterManagerProto.HealthCheckResponse resp + = submitJobAndHealthCheck("testHealthCheckAllWorkersStarted", true, null); + + assertTrue(resp.isHealthy); + assertEquals(SUCCESS, resp.responseCode); + assertNull(resp.unreadyWorkers); + } + + @Test + public void testHealthCheckWithUnstartedWorkers() throws Exception { + JobClusterManagerProto.HealthCheckResponse resp = + submitJobAndHealthCheck("testHealthCheckWithUnstartedWorkers", false, null); + + assertFalse(resp.isHealthy); + assertEquals(SUCCESS, resp.responseCode); + assertNotNull(resp.unreadyWorkers); + assertFalse(resp.unreadyWorkers.workers().isEmpty()); + } + + @Test + public void testHealthCheckWithJobIdFilter() throws Exception { + String clusterName = "testHealthCheckWithJobIdFilter"; + JobClusterManagerProto.HealthCheckResponse resp = + submitJobAndHealthCheck(clusterName, true, ImmutableList.of(clusterName + "-1")); + + assertTrue(resp.isHealthy); + assertEquals(SUCCESS, resp.responseCode); + } + + @Test + public void testHealthCheckNoActiveJobs() throws Exception { + TestKit probe = new TestKit(system); + String clusterName = "testHealthCheckNoActiveJobs"; + MantisSchedulerFactory schedulerMock = mock(MantisSchedulerFactory.class); + MantisJobStore jobStoreMock = mock(MantisJobStore.class); + + final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); + ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, schedulerMock, eventPublisher, costsCalculator, 0)); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); + assertEquals(SUCCESS, createResp.responseCode); + + jobClusterActor.tell(new JobClusterManagerProto.HealthCheckRequest(clusterName, null), probe.getRef()); + JobClusterManagerProto.HealthCheckResponse healthResp = probe.expectMsgClass(JobClusterManagerProto.HealthCheckResponse.class); + + assertTrue(healthResp.isHealthy); + assertEquals(SUCCESS, healthResp.responseCode); + + probe.getSystem().stop(jobClusterActor); + } } diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java index 880ccd4f3..aa069d284 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java @@ -223,7 +223,7 @@ private void setupRpcService() throws Exception { .getJobArtifactsToCache(CLUSTER_ID); } - private void setupActor() { + private void setupActor() throws ExecutionException, InterruptedException { MasterConfiguration masterConfig = mock(MasterConfiguration.class); when(masterConfig.getTimeoutSecondsToReportStart()).thenReturn(1); ExecuteStageRequestFactory executeStageRequestFactory = new ExecuteStageRequestFactory(masterConfig); @@ -253,6 +253,7 @@ private void setupActor() { Duration.ofSeconds(15), CLUSTER_ID, new LongDynamicProperty(propertiesLoader, "resourcecluster.gateway.maxConcurrentRequests.test", 100000L)); + resourceCluster.getRegisteredTaskExecutors().get(); } @Test