Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.EnableJobClusterRequest;
import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.EnableJobClusterResponse;
import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobClusterRequest;
import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.HealthCheckRequest;
import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.HealthCheckResponse;
import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobClusterResponse;
import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobDetailsResponse;
import static io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobSchedInfoRequest;
Expand Down Expand Up @@ -241,6 +243,7 @@ private Receive getInitializedBehavior() {
.match(EnableJobClusterRequest.class, this::onJobClusterEnable)
.match(DisableJobClusterRequest.class, this::onJobClusterDisable)
.match(GetJobClusterRequest.class, this::onJobClusterGet)
.match(HealthCheckRequest.class, this::onHealthCheck)
.match(ListCompletedJobsInClusterRequest.class, this::onJobListCompleted)
.match(GetLastSubmittedJobIdStreamRequest.class, this::onGetLastSubmittedJobIdSubject)
.match(ListArchivedWorkersRequest.class, this::onListArchivedWorkers)
Expand Down Expand Up @@ -561,6 +564,7 @@ public void onJobClusterGet(GetJobClusterRequest r) {
sender.tell(new GetJobClusterResponse(r.requestId, CLIENT_ERROR_NOT_FOUND, "No such Job cluster " + r.getJobClusterName(), empty()), getSelf());
}
}

@Override
public void onGetLastSubmittedJobIdSubject(GetLastSubmittedJobIdStreamRequest r) {
Optional<JobClusterInfo> jobClusterInfo = jobClusterInfoManager.getJobClusterInfo(r.getClusterName());
Expand Down Expand Up @@ -665,6 +669,17 @@ public void onGetJobScalerRuleStream(final JobClusterScalerRuleProto.GetJobScale
}
}

public void onHealthCheck(HealthCheckRequest request) {
Optional<JobClusterInfo> jobClusterInfo = jobClusterInfoManager.getJobClusterInfo(request.clusterName);

if (jobClusterInfo.isPresent()) {
jobClusterInfo.get().jobClusterActor.forward(request, getContext());
} else {
ActorRef sender = getSender();
sender.tell(new HealthCheckResponse(request.requestId, CLIENT_ERROR_NOT_FOUND, "No such Job cluster " + request.clusterName, false, null), getSelf());
}
}

private void onTerminated(final Terminated terminated) {
logger.warn("onTerminated {}", terminated.actor());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@

package io.mantisrx.master.api.akka.route.handlers;


import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateSchedulingInfoRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateSchedulingInfoResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterScalerRuleProto;

import java.util.List;
import java.util.concurrent.CompletionStage;

public interface JobClusterRouteHandler {
Expand Down Expand Up @@ -59,4 +60,6 @@ CompletionStage<UpdateSchedulingInfoResponse> updateSchedulingInfo(
CompletionStage<JobClusterScalerRuleProto.GetScalerRulesResponse> getScalerRules(JobClusterScalerRuleProto.GetScalerRulesRequest request);

CompletionStage<JobClusterScalerRuleProto.DeleteScalerRuleResponse> deleteScalerRule(JobClusterScalerRuleProto.DeleteScalerRuleRequest request);

CompletionStage<JobClusterManagerProto.HealthCheckResponse> healthCheck(String clusterName, List<String> jobIds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.master.JobClustersManagerActor.UpdateSchedulingInfo;

import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateSchedulingInfoRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateSchedulingInfoResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterScalerRuleProto;
import io.mantisrx.server.master.config.ConfigurationProvider;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import org.slf4j.Logger;
Expand Down Expand Up @@ -177,4 +179,10 @@ public CompletionStage<JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse>
.thenApply(JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse.class::cast);
return response;
}

@Override
public CompletionStage<JobClusterManagerProto.HealthCheckResponse> healthCheck(String clusterName, List<String> jobIds) {
JobClusterManagerProto.HealthCheckRequest request = new JobClusterManagerProto.HealthCheckRequest(clusterName, jobIds);
return ask(jobClustersManagerActor, request, timeout).thenApply(JobClusterManagerProto.HealthCheckResponse.class::cast);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public static class Endpoints {

public static final String RESOURCE_CLUSTERS = "api.v1.resourceClusters";
public static final String JOB_CLUSTER_SCALER_RULES = "api.v1.jobClusters.scalerRules";
public static final String JOB_CLUSTER_INSTANCE_HEALTHCHECK = "api.v1.jobClusters.instance.healthcheck";

private static String[] endpoints = new String[]{
JOB_ARTIFACTS,
Expand Down Expand Up @@ -99,7 +100,8 @@ public static class Endpoints {
JOB_DISCOVERY_STREAM,
LAST_SUBMITTED_JOB_ID_STREAM,
RESOURCE_CLUSTERS,
JOB_CLUSTER_SCALER_RULES
JOB_CLUSTER_SCALER_RULES,
JOB_CLUSTER_INSTANCE_HEALTHCHECK
};

private static Set<String> endpointSet = Sets.newHashSet(endpoints);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,16 @@
import io.mantisrx.master.api.akka.route.handlers.JobClusterRouteHandler;
import io.mantisrx.master.api.akka.route.proto.JobClusterProtoAdapter;
import io.mantisrx.master.jobcluster.proto.BaseResponse;

import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto;
import io.mantisrx.master.jobcluster.proto.JobClusterScalerRuleProto;
import io.mantisrx.runtime.NamedJobDefinition;
import io.mantisrx.server.master.config.ConfigurationProvider;
import io.mantisrx.server.master.config.MasterConfiguration;
import io.mantisrx.shaded.com.google.common.base.Strings;
import java.util.Arrays;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -211,6 +215,14 @@ public Route constructRoutes() {
))
),

// api/v1/jobClusters/{}/healthcheck
path(
PathMatchers.segment().slash("healthcheck"),
(clusterName) -> pathEndOrSingleSlash(() ->
get(() -> healthCheckRoute(clusterName))
)
),

// api/v1/jobClusters/{}/scalerRules
path(
PathMatchers.segment().slash("scalerRules"),
Expand Down Expand Up @@ -785,4 +797,23 @@ private Route deleteScalerRuleRoute(String clusterName, String ruleId) {
HttpRequestMetrics.HttpVerb.DELETE
);
}

private Route healthCheckRoute(String clusterName) {
logger.trace("GET /api/v1/jobClusters/{}/healthcheck called", clusterName);

return parameterMap(params -> {
String jobIdsParam = params.get("job-ids");
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about we have users to provide the region, and we will determine the health for all jobs in that region?

here it seems we are expecting users to provide list of job ids, do we have use case where we only need inspect subset of jobs within a given region?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, it checks any active jobs. The job-ids param is optional, it just does the default check if it's null. But if you do specify it, it only checks the job-ids that were inserted.

This probably won't be used right away, it's more of a forward looking change. In the future, where we have a fully managed Mantis CD system, this will allow us to check just jobs that were recently deployed. It's also an additional knob that gives some of the power users more flexibility.

List<String> jobIds = (jobIdsParam != null && !jobIdsParam.isBlank())
? Arrays.asList(jobIdsParam.split(","))
: null;

CompletionStage<JobClusterManagerProto.HealthCheckResponse> response = jobClusterRouteHandler.healthCheck(clusterName, jobIds);
return completeAsync(
response,
resp -> complete(StatusCodes.OK, resp, Jackson.marshaller()),
HttpRequestMetrics.Endpoints.JOB_CLUSTER_INSTANCE_HEALTHCHECK,
HttpRequestMetrics.HttpVerb.GET
);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,19 @@
import io.mantisrx.master.events.LifecycleEventPublisher;
import io.mantisrx.master.events.LifecycleEventsProto;
import io.mantisrx.master.jobcluster.job.CostsCalculator;
import io.mantisrx.master.jobcluster.job.FilterableMantisWorkerMetadataWritable;
import io.mantisrx.master.jobcluster.job.IMantisJobMetadata;
import io.mantisrx.master.jobcluster.job.JobActor;
import io.mantisrx.master.jobcluster.job.JobHelper;
import io.mantisrx.master.jobcluster.job.JobState;
import io.mantisrx.master.jobcluster.job.MantisJobMetadataImpl;
import io.mantisrx.master.jobcluster.job.MantisJobMetadataView;
import io.mantisrx.master.jobcluster.job.worker.IMantisWorkerMetadata;
import io.mantisrx.master.jobcluster.proto.BaseResponse.ResponseCode;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UnreadyWorker;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.DeleteJobClusterResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.HealthCheckRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.DisableJobClusterRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.DisableJobClusterResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.EnableJobClusterRequest;
Expand Down Expand Up @@ -110,14 +114,17 @@
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateSchedulingInfoResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterProto;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.HealthCheckResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterProto.JobStartedEvent;
import io.mantisrx.master.jobcluster.proto.JobClusterProto.KillJobRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UnreadyWorkers;
import io.mantisrx.master.jobcluster.proto.JobClusterScalerRuleProto;
import io.mantisrx.master.jobcluster.proto.JobProto;
import io.mantisrx.master.jobcluster.scaler.IJobClusterScalerRuleData;
import io.mantisrx.master.jobcluster.scaler.JobClusterScalerRuleDataFactory;
import io.mantisrx.master.jobcluster.scaler.JobClusterScalerRuleDataImplWritable;
import io.mantisrx.runtime.JobSla;
import io.mantisrx.runtime.MantisJobState;
import io.mantisrx.runtime.command.InvalidJobException;
import io.mantisrx.server.core.JobCompletedReason;
import io.mantisrx.server.master.InvalidJobRequestException;
Expand Down Expand Up @@ -617,6 +624,7 @@ private Receive buildInitializedBehavior() {
new EnableJobClusterResponse(x.requestId, SUCCESS, genUnexpectedMsg(x.toString(),
this.name, state)), getSelf()))
.match(GetJobClusterRequest.class, this::onJobClusterGet)
.match(HealthCheckRequest.class, this::onHealthCheck)
.match(JobClusterProto.DeleteJobClusterRequest.class, this::onJobClusterDelete)
.match(ListArchivedWorkersRequest.class, this::onListArchivedWorkers)
.match(ListCompletedJobsInClusterRequest.class, this::onJobListCompleted)
Expand Down Expand Up @@ -2633,6 +2641,58 @@ public void onJobScalerRuleStream(JobClusterScalerRuleProto.GetJobScalerRuleStre
}
}

public void onHealthCheck(final HealthCheckRequest request) {
final ActorRef sender = getSender();
final ActorRef self = getSelf();

Set<JobId> prefilteredJobIds = new HashSet<>();
if (request.jobIds != null && !request.jobIds.isEmpty()) {
request.jobIds.stream()
.map(JobId::fromId)
.filter(Optional::isPresent)
.map(Optional::get)
.forEach(prefilteredJobIds::add);
}

ListJobCriteria criteria = new ListJobCriteria();
getFilteredNonTerminalJobList(criteria, prefilteredJobIds)
.collect(Lists::<UnreadyWorker>newArrayList, (unreadyWorkers, view) -> {
if (view.getWorkerMetadataList() != null) {
for (FilterableMantisWorkerMetadataWritable worker : view.getWorkerMetadataList()) {
if (worker.getState() != MantisJobState.Started) {
unreadyWorkers.add(new UnreadyWorker(
worker.getWorkerIndex(),
worker.getWorkerNumber(),
worker.getState().name()));
Comment on lines +2662 to +2666
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is categorizing pending workers as failed?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo the health logic is something like a composite of (all workers for each index have started, failed/resubmit workers count in past x duration is below a threshold; alert pack etc).

Copy link
Copy Markdown
Collaborator Author

@james-lubin james-lubin Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, maybe I can improve the naming here, but the behavior is intentional, the idea is that after a deploy you can call this endpoint with n amount of retries until the workers have started. So if there are some stuck in accepted/pending the healthcheck will fail.

Alternatively, since we're using SSE, we could just keep the connection open and send an event when the workers start or a timeout is reached. But I think the way its implemented is easier to build on top of.

Edit: Didn't see your second comment until refreshing. Agreed on the composite, I'm just planning to add the rest of that within nfmantis.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe change the name to something like "notReady"?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call, went with unready

}
}
}
})
.subscribe(
unreadyWorkers -> {
if (unreadyWorkers.isEmpty()) {
HealthCheckResponse response
= new HealthCheckResponse(request.requestId, ResponseCode.SUCCESS, "OK", true, null);
sender.tell(response, self);
} else {
HealthCheckResponse response
= new HealthCheckResponse(
request.requestId,
ResponseCode.SUCCESS,
"unready workers",
false,
new UnreadyWorkers(unreadyWorkers));
sender.tell(response, self);
}
},
error -> {
logger.error("Health check failed for cluster {}", name, error);
sender.tell(new JobClusterManagerProto.HealthCheckResponse(request.requestId, SERVER_ERROR,
"Health check failed: " + error.getMessage(), false, null), self);
}
);
}

static final class JobInfo {

final long submittedAt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2253,5 +2253,48 @@ public String toString() {
}
}

public static final class HealthCheckRequest extends BaseRequest {
public final String clusterName;
public final List<String> jobIds;

public HealthCheckRequest(final String clusterName, final List<String> jobIds) {
super();
Preconditions.checkArg(
clusterName != null && !clusterName.isEmpty(),
"Cluster name cannot be null or empty");
this.clusterName = clusterName;
this.jobIds = jobIds;
}

@Override
public String toString() {
return "HealthCheckRequest{" +
"clusterName='" + clusterName + '\'' +
", jobIds=" + jobIds +
", requestId=" + requestId +
'}';
}
}

public static final class HealthCheckResponse extends BaseResponse {
public final boolean isHealthy;
public final UnreadyWorkers unreadyWorkers;

@JsonCreator
@JsonIgnoreProperties(ignoreUnknown = true)
public HealthCheckResponse(
@JsonProperty("requestId") long requestId,
@JsonProperty("responseCode") ResponseCode responseCode,
@JsonProperty("message") String message,
@JsonProperty("isHealthy") boolean isHealthy,
@JsonProperty("unreadyWorkers") UnreadyWorkers unreadyWorkers) {
super(requestId, responseCode, message);
this.isHealthy = isHealthy;
this.unreadyWorkers = unreadyWorkers;
}
}

public record UnreadyWorkers(List<UnreadyWorker> workers) {}

public record UnreadyWorker(int workerIndex, int workerNumber, String state) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -155,6 +156,8 @@ public void testIt() throws Exception {
testJobClustersDelete();
testJobClustersPut();
testJobClusterInstanceGET();
testJobClusterHealthCheck();
testJobClusterHealthCheckNonExistent();
testNonExistentJobClusterInstanceGET();
testJobClusterInstancePOSTNotAllowed();
testJobClusterInstanceValidUpdate();
Expand Down Expand Up @@ -777,4 +780,27 @@ private void compareClustersPayload(String clusterListResponse) {
assert ex == null;
}
}

private void testJobClusterHealthCheck() throws InterruptedException {
testGet(
getJobClusterInstanceEndpoint(JobClusterPayloads.CLUSTER_NAME) + "/healthcheck",
StatusCodes.OK,
response -> {
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(response);
assertTrue("Expected 'isHealthy' field in response: " + response, node.has("isHealthy"));
assertTrue(node.get("isHealthy").asBoolean());
} catch (IOException e) {
fail("Failed to parse health check response: " + e.getMessage());
}
});
}

private void testJobClusterHealthCheckNonExistent() throws InterruptedException {
testGet(
getJobClusterInstanceEndpoint("nonExistentCluster") + "/healthcheck",
StatusCodes.NOT_FOUND,
null);
}
}
Loading
Loading