Skip to content

Commit 355dc3d

Browse files
authored
Hotfix: Fix log_path bug for jobs with multiple attempts (#1430)
* Log errors that take place in LifecycledCallable. Previously they were swallowed. * Ensure that DefaultJobPersistence always returns a "full" job. The job with all of its attempts, not just some of them. * Fixes bug where same log path was being used for multiple attempts which caused jobs to get stuck.
1 parent c4ae615 commit 355dc3d

File tree

3 files changed

+89
-24
lines changed

3 files changed

+89
-24
lines changed

airbyte-scheduler/src/main/java/io/airbyte/scheduler/JobSubmitter.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import io.airbyte.config.persistence.ConfigRepository;
3939
import io.airbyte.scheduler.persistence.JobPersistence;
4040
import io.airbyte.validation.json.JsonValidationException;
41-
import io.airbyte.workers.OutputAndStatus;
4241
import io.airbyte.workers.WorkerConstants;
4342
import java.io.IOException;
4443
import java.nio.file.Path;
@@ -75,9 +74,9 @@ public void run() {
7574
try {
7675
LOGGER.info("Running job-submitter...");
7776

78-
final Optional<Job> oldestPendingJob = persistence.getNextJob();
77+
final Optional<Job> nextJob = persistence.getNextJob();
7978

80-
oldestPendingJob.ifPresent(job -> {
79+
nextJob.ifPresent(job -> {
8180
trackSubmission(job);
8281
submitJob(job);
8382
LOGGER.info("Job-Submitter Summary. Submitted job with scope {}", job.getScope());
@@ -120,7 +119,8 @@ void submitJob(Job job) {
120119
}
121120
trackCompletion(job, output.getStatus());
122121
})
123-
.setOnException(noop -> {
122+
.setOnException(e -> {
123+
LOGGER.error("Exception thrown in Job Submission: ", e);
124124
persistence.failAttempt(job.getId(), attemptNumber);
125125
trackCompletion(job, io.airbyte.workers.JobStatus.FAILED);
126126
})
@@ -223,12 +223,4 @@ private ImmutableMap.Builder<String, Object> generateMetadata(Job job) throws Co
223223
return metadata;
224224
}
225225

226-
private static JobStatus getStatus(OutputAndStatus<?> output) {
227-
return switch (output.getStatus()) {
228-
case SUCCEEDED -> JobStatus.SUCCEEDED;
229-
case FAILED -> JobStatus.FAILED;
230-
default -> throw new IllegalStateException("Unknown state " + output.getStatus());
231-
};
232-
}
233-
234226
}

airbyte-scheduler/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.google.common.annotations.VisibleForTesting;
2828
import io.airbyte.commons.json.Jsons;
2929
import io.airbyte.config.JobConfig;
30+
import io.airbyte.config.JobConfig.ConfigType;
3031
import io.airbyte.config.JobOutput;
3132
import io.airbyte.config.StandardSyncOutput;
3233
import io.airbyte.config.State;
@@ -116,7 +117,7 @@ public void resetJob(long jobId) throws IOException {
116117
final LocalDateTime now = LocalDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC);
117118
database.query(ctx -> {
118119
updateJobStatusIfNotInTerminalState(ctx, jobId, JobStatus.PENDING, now,
119-
new IllegalStateException(String.format("Attempt to reset a job that is in a terminate state. job id: %s", jobId)));
120+
new IllegalStateException(String.format("Attempt to reset a job that is in a terminal state. job id: %s", jobId)));
120121
return null;
121122
});
122123
}
@@ -246,8 +247,11 @@ public Job getJob(long jobId) throws IOException {
246247
}
247248

248249
private Job getJob(DSLContext ctx, long jobId) {
249-
return getJobFromResult(ctx.fetch(BASE_JOB_SELECT_AND_JOIN + "WHERE jobs.id = ?", jobId))
250-
.orElseThrow(() -> new RuntimeException("Could not find job with id: " + jobId));
250+
return getJobOptional(ctx, jobId).orElseThrow(() -> new RuntimeException("Could not find job with id: " + jobId));
251+
}
252+
253+
private Optional<Job> getJobOptional(DSLContext ctx, long jobId) {
254+
return getJobFromResult(ctx.fetch(BASE_JOB_SELECT_AND_JOIN + "WHERE jobs.id = ?", jobId));
251255
}
252256

253257
@Override
@@ -269,21 +273,27 @@ public List<Job> listJobsWithStatus(JobConfig.ConfigType configType, JobStatus s
269273

270274
@Override
271275
public Optional<Job> getLastSyncJob(UUID connectionId) throws IOException {
272-
return database.query(ctx -> getJobFromResult(ctx
276+
return database.query(ctx -> ctx
273277
.fetch(BASE_JOB_SELECT_AND_JOIN + "WHERE scope = ? AND CAST(jobs.status AS VARCHAR) <> ? ORDER BY jobs.created_at DESC LIMIT 1",
274-
ScopeHelper.createScope(JobConfig.ConfigType.SYNC, connectionId.toString()),
275-
JobStatus.CANCELLED.toString().toLowerCase())));
278+
ScopeHelper.createScope(ConfigType.SYNC, connectionId.toString()),
279+
JobStatus.CANCELLED.toString().toLowerCase())
280+
.stream()
281+
.findFirst()
282+
.flatMap(r -> getJobOptional(ctx, r.get("job_id", Long.class))));
276283
}
277284

278285
@Override
279286
public Optional<State> getCurrentState(UUID connectionId) throws IOException {
280-
return database.query(ctx -> getJobFromResult(ctx
287+
return database.query(ctx -> ctx
281288
.fetch(BASE_JOB_SELECT_AND_JOIN + "WHERE scope = ? AND CAST(jobs.status AS VARCHAR) = ? ORDER BY attempts.created_at DESC LIMIT 1",
282-
ScopeHelper.createScope(JobConfig.ConfigType.SYNC, connectionId.toString()),
283-
JobStatus.SUCCEEDED.toString().toLowerCase())))
289+
ScopeHelper.createScope(ConfigType.SYNC, connectionId.toString()),
290+
JobStatus.SUCCEEDED.toString().toLowerCase())
291+
.stream()
292+
.findFirst()
293+
.flatMap(r -> getJobOptional(ctx, r.get("job_id", Long.class)))
284294
.flatMap(Job::getSuccessOutput)
285295
.map(JobOutput::getSync)
286-
.map(StandardSyncOutput::getState);
296+
.map(StandardSyncOutput::getState));
287297
}
288298

289299
@Override
@@ -292,9 +302,12 @@ public Optional<Job> getNextJob() throws IOException {
292302
// 1. get oldest, pending job
293303
// 2. job is excluded if another job of the same scope is already running
294304
// 3. job is excluded if another job of the same scope is already incomplete
295-
return database.query(ctx -> getJobFromResult(ctx
305+
return database.query(ctx -> ctx
296306
.fetch(BASE_JOB_SELECT_AND_JOIN
297-
+ "WHERE CAST(jobs.status AS VARCHAR) = 'pending' AND jobs.scope NOT IN ( SELECT scope FROM jobs WHERE status = 'running' OR status = 'incomplete' ) ORDER BY jobs.created_at ASC LIMIT 1")));
307+
+ "WHERE CAST(jobs.status AS VARCHAR) = 'pending' AND jobs.scope NOT IN ( SELECT scope FROM jobs WHERE status = 'running' OR status = 'incomplete' ) ORDER BY jobs.created_at ASC LIMIT 1")
308+
.stream()
309+
.findFirst()
310+
.flatMap(r -> getJobOptional(ctx, r.get("job_id", Long.class))));
298311
}
299312

300313
private static List<Job> getJobsFromResult(Result<Record> result) {

airbyte-scheduler/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import io.airbyte.config.JobConfig;
3939
import io.airbyte.config.JobConfig.ConfigType;
4040
import io.airbyte.config.JobOutput;
41+
import io.airbyte.config.JobOutput.OutputType;
4142
import io.airbyte.config.JobSyncConfig;
4243
import io.airbyte.config.StandardSyncOutput;
4344
import io.airbyte.config.State;
@@ -544,6 +545,50 @@ public void testGetOldestPendingJobOnlyPendingJobs() throws IOException {
544545
assertTrue(actual.isEmpty());
545546
}
546547

548+
@Test
549+
void testGetNextJobWithMultipleAttempts() throws IOException {
550+
final long jobId = jobPersistence.createJob(SCOPE, JOB_CONFIG);
551+
jobPersistence.failAttempt(jobId, jobPersistence.createAttempt(jobId, LOG_PATH));
552+
jobPersistence.failAttempt(jobId, jobPersistence.createAttempt(jobId, LOG_PATH));
553+
jobPersistence.resetJob(jobId);
554+
555+
final Optional<Job> actual = jobPersistence.getNextJob();
556+
final Job expected = getExpectedJobTwoAttempts(jobId, JobStatus.PENDING, AttemptStatus.FAILED);
557+
558+
assertTrue(actual.isPresent());
559+
assertEquals(expected, actual.get());
560+
}
561+
562+
@Test
563+
void testGetCurrentStateWithMultipleAttempts() throws IOException {
564+
final State state = new State().withState(Jsons.jsonNode(ImmutableMap.of("checkpoint", 4)));
565+
final JobOutput jobOutput = new JobOutput().withOutputType(OutputType.SYNC).withSync(new StandardSyncOutput().withState(state));
566+
567+
final long jobId = jobPersistence.createJob(SCOPE, JOB_CONFIG);
568+
jobPersistence.failAttempt(jobId, jobPersistence.createAttempt(jobId, LOG_PATH));
569+
final int attemptId = jobPersistence.createAttempt(jobId, LOG_PATH);
570+
jobPersistence.writeOutput(jobId, attemptId, jobOutput);
571+
jobPersistence.succeedAttempt(jobId, attemptId);
572+
573+
final Optional<State> actual = jobPersistence.getCurrentState(UUID.fromString(ScopeHelper.getConfigId(SCOPE)));
574+
575+
assertTrue(actual.isPresent());
576+
assertEquals(state, actual.get());
577+
}
578+
579+
@Test
580+
void testGetLastSyncJobWithMultipleAttempts() throws IOException {
581+
final long jobId = jobPersistence.createJob(SCOPE, JOB_CONFIG);
582+
jobPersistence.failAttempt(jobId, jobPersistence.createAttempt(jobId, LOG_PATH));
583+
jobPersistence.failAttempt(jobId, jobPersistence.createAttempt(jobId, LOG_PATH));
584+
585+
final Optional<Job> actual = jobPersistence.getLastSyncJob(UUID.fromString(ScopeHelper.getConfigId(SCOPE)));
586+
final Job expected = getExpectedJobTwoAttempts(jobId, JobStatus.INCOMPLETE, AttemptStatus.FAILED);
587+
588+
assertTrue(actual.isPresent());
589+
assertEquals(expected, actual.get());
590+
}
591+
547592
@Test
548593
public void testGetJobFromRecord() throws IOException, SQLException {
549594
final long jobId = jobPersistence.createJob(SCOPE, JOB_CONFIG);
@@ -559,6 +604,21 @@ private Job getExpectedJobNoAttempts(long jobId, JobStatus jobStatus) {
559604
return getExpectedJob(jobId, jobStatus, Collections.emptyList());
560605
}
561606

607+
private Job getExpectedJobTwoAttempts(long jobId, JobStatus jobStatus, AttemptStatus attemptStatus) {
608+
final Job job = getExpectedJobOneAttempt(jobId, jobStatus, attemptStatus);
609+
job.getAttempts().add(new Attempt(
610+
1L,
611+
jobId,
612+
LOG_PATH,
613+
null,
614+
attemptStatus,
615+
NOW.getEpochSecond(),
616+
NOW.getEpochSecond(),
617+
null));
618+
619+
return job;
620+
}
621+
562622
private Job getExpectedJobOneAttempt(long jobId, JobStatus jobStatus, AttemptStatus attemptStatus) {
563623
final Attempt attempt = new Attempt(
564624
0L,

0 commit comments

Comments
 (0)