From 76db7af713bd1e96c9f6da2b300066cbe70e7f6e Mon Sep 17 00:00:00 2001 From: Andrey Cheptsov Date: Mon, 16 Mar 2026 12:40:20 +0100 Subject: [PATCH 1/5] Add per-job hourly log quota enforced on runner Prevents runaway log costs by limiting log output per job per calendar hour. Default quota is 50MB/hour, configurable via DSTACK_SERVER_LOG_QUOTA_PER_JOB_HOUR (0 disables). Jobs exceeding the quota are terminated with reason log_quota_exceeded. Co-Authored-By: Claude Opus 4.6 --- runner/internal/common/types/types.go | 1 + runner/internal/runner/executor/executor.go | 53 +++++++++++++++++-- .../internal/runner/executor/executor_test.go | 22 ++++++++ runner/internal/runner/executor/logs.go | 40 ++++++++++++-- runner/internal/runner/schemas/schemas.go | 1 + src/dstack/_internal/core/models/runs.py | 6 +++ src/dstack/_internal/server/schemas/runner.py | 1 + .../services/jobs/configurators/base.py | 6 +++ src/dstack/_internal/server/settings.py | 5 ++ .../_internal/server/routers/test_runs.py | 2 + 10 files changed, 130 insertions(+), 7 deletions(-) diff --git a/runner/internal/common/types/types.go b/runner/internal/common/types/types.go index b7f6c6fd3..057c0248c 100644 --- a/runner/internal/common/types/types.go +++ b/runner/internal/common/types/types.go @@ -10,4 +10,5 @@ const ( TerminationReasonTerminatedByUser TerminationReason = "terminated_by_user" TerminationReasonTerminatedByServer TerminationReason = "terminated_by_server" TerminationReasonMaxDurationExceeded TerminationReason = "max_duration_exceeded" + TerminationReasonLogQuotaExceeded TerminationReason = "log_quota_exceeded" ) diff --git a/runner/internal/runner/executor/executor.go b/runner/internal/runner/executor/executor.go index 98289eb4e..a6655147b 100644 --- a/runner/internal/runner/executor/executor.go +++ b/runner/internal/runner/executor/executor.go @@ -261,6 +261,17 @@ func (ex *RunExecutor) Run(ctx context.Context) (err error) { default: } + if errors.Is(err, ErrLogQuotaExceeded) { + log.Error(ctx, "Log quota exceeded", "quota", ex.jobSpec.LogQuotaHour) + ex.SetJobStateWithTerminationReason( + ctx, + schemas.JobStateFailed, + types.TerminationReasonLogQuotaExceeded, + fmt.Sprintf("Job log output exceeded the hourly quota of %d bytes", ex.jobSpec.LogQuotaHour), + ) + return fmt.Errorf("log quota exceeded: %w", err) + } + // todo fail reason? log.Error(ctx, "Exec failed", "err", err) var exitError *exec.ExitError @@ -283,6 +294,7 @@ func (ex *RunExecutor) SetJob(body schemas.SubmitBody) { ex.clusterInfo = body.ClusterInfo ex.secrets = body.Secrets ex.repoCredentials = body.RepoCredentials + ex.jobLogs.SetQuota(body.JobSpec.LogQuotaHour) ex.state = WaitCode } @@ -586,11 +598,10 @@ func (ex *RunExecutor) execJob(ctx context.Context, jobLogFile io.Writer) error defer func() { _ = cmd.Wait() }() // release resources if copy fails stripper := ansistrip.NewWriter(ex.jobLogs, AnsiStripFlushInterval, AnsiStripMaxDelay, MaxBufferSize) - defer func() { _ = stripper.Close() }() logger := io.MultiWriter(jobLogFile, ex.jobWsLogs, stripper) - _, err = io.Copy(logger, ptm) - if err != nil && !isPtyError(err) { - return fmt.Errorf("copy command output: %w", err) + + if err := ex.copyOutputWithQuota(cmd, ptm, stripper, logger); err != nil { + return err } if err = cmd.Wait(); err != nil { return fmt.Errorf("wait for command: %w", err) @@ -598,6 +609,40 @@ func (ex *RunExecutor) execJob(ctx context.Context, jobLogFile io.Writer) error return nil } +// copyOutputWithQuota streams process output through the log pipeline and +// monitors for log quota exceeded. The quota signal is out-of-band (via channel) +// because the ansistrip writer is async and swallows downstream write errors. +func (ex *RunExecutor) copyOutputWithQuota(cmd *exec.Cmd, ptm io.Reader, stripper io.Closer, logger io.Writer) error { + copyDone := make(chan error, 1) + go func() { + _, err := io.Copy(logger, ptm) + copyDone <- err + }() + + // Wait for either io.Copy to finish or quota to be exceeded. + var copyErr error + select { + case copyErr = <-copyDone: + case <-ex.jobLogs.QuotaExceeded(): + _ = cmd.Process.Kill() + <-copyDone + } + + // Flush the ansistrip buffer — may also trigger quota exceeded. + _ = stripper.Close() + + select { + case <-ex.jobLogs.QuotaExceeded(): + return ErrLogQuotaExceeded + default: + } + + if copyErr != nil && !isPtyError(copyErr) { + return fmt.Errorf("copy command output: %w", copyErr) + } + return nil +} + // setupGitCredentials must be called from Run after setJobUser func (ex *RunExecutor) setupGitCredentials(ctx context.Context) (func(), error) { if ex.repoCredentials == nil { diff --git a/runner/internal/runner/executor/executor_test.go b/runner/internal/runner/executor/executor_test.go index 915cca35a..f4330ec76 100644 --- a/runner/internal/runner/executor/executor_test.go +++ b/runner/internal/runner/executor/executor_test.go @@ -141,6 +141,28 @@ func TestExecutor_MaxDuration(t *testing.T) { assert.ErrorContains(t, err, "killed") } +func TestExecutor_LogQuota(t *testing.T) { + if testing.Short() { + t.Skip() + } + + ex := makeTestExecutor(t) + ex.killDelay = 500 * time.Millisecond + // Output >100 bytes to trigger the quota + ex.jobSpec.Commands = append(ex.jobSpec.Commands, "for i in $(seq 1 20); do echo 'This line is long enough to exceed the quota easily'; done") + ex.jobSpec.LogQuotaHour = 100 // 100 bytes + ex.jobLogs.SetQuota(100) + makeCodeTar(t, ex) + + err := ex.Run(t.Context()) + assert.ErrorContains(t, err, "log quota exceeded") + + // Verify the termination state was set + history := ex.GetHistory(0) + lastState := history.JobStates[len(history.JobStates)-1] + assert.Equal(t, schemas.JobStateFailed, lastState.State) +} + func TestExecutor_RemoteRepo(t *testing.T) { if testing.Short() { t.Skip() diff --git a/runner/internal/runner/executor/logs.go b/runner/internal/runner/executor/logs.go index 808fc84b1..670c9a4ab 100644 --- a/runner/internal/runner/executor/logs.go +++ b/runner/internal/runner/executor/logs.go @@ -1,29 +1,63 @@ package executor import ( + "errors" "sync" + "time" "github.com/dstackai/dstack/runner/internal/runner/schemas" ) +var ErrLogQuotaExceeded = errors.New("log quota exceeded") + type appendWriter struct { mu *sync.RWMutex // shares with executor history []schemas.LogEvent timestamp *MonotonicTimestamp // shares with executor + + quota int // bytes per hour, 0 = unlimited + bytesInHour int // bytes written in current hour bucket + hourStart int64 // unix timestamp (seconds) of current hour bucket start + quotaExceeded chan struct{} // closed when quota is exceeded (out-of-band signal) + exceededOnce sync.Once } func newAppendWriter(mu *sync.RWMutex, timestamp *MonotonicTimestamp) *appendWriter { return &appendWriter{ - mu: mu, - history: make([]schemas.LogEvent, 0), - timestamp: timestamp, + mu: mu, + history: make([]schemas.LogEvent, 0), + timestamp: timestamp, + quotaExceeded: make(chan struct{}), } } +func (w *appendWriter) SetQuota(quota int) { + w.quota = quota +} + +// QuotaExceeded returns a channel that is closed when the log quota is exceeded. +func (w *appendWriter) QuotaExceeded() <-chan struct{} { + return w.quotaExceeded +} + func (w *appendWriter) Write(p []byte) (n int, err error) { w.mu.Lock() defer w.mu.Unlock() + if w.quota > 0 { + now := time.Now().Unix() + currentHour := (now / 3600) * 3600 + if currentHour != w.hourStart { + w.bytesInHour = 0 + w.hourStart = currentHour + } + if w.bytesInHour+len(p) > w.quota { + w.exceededOnce.Do(func() { close(w.quotaExceeded) }) + return 0, ErrLogQuotaExceeded + } + w.bytesInHour += len(p) + } + pCopy := make([]byte, len(p)) copy(pCopy, p) w.history = append(w.history, schemas.LogEvent{Message: pCopy, Timestamp: w.timestamp.Next()}) diff --git a/runner/internal/runner/schemas/schemas.go b/runner/internal/runner/schemas/schemas.go index ca707db76..e8a37f5d9 100644 --- a/runner/internal/runner/schemas/schemas.go +++ b/runner/internal/runner/schemas/schemas.go @@ -79,6 +79,7 @@ type JobSpec struct { Env map[string]string `json:"env"` SingleBranch bool `json:"single_branch"` MaxDuration int `json:"max_duration"` + LogQuotaHour int `json:"log_quota_hour"` // bytes per hour, 0 = unlimited SSHKey *SSHKey `json:"ssh_key"` WorkingDir *string `json:"working_dir"` RepoDir *string `json:"repo_dir"` diff --git a/src/dstack/_internal/core/models/runs.py b/src/dstack/_internal/core/models/runs.py index bd1307df7..f70281f90 100644 --- a/src/dstack/_internal/core/models/runs.py +++ b/src/dstack/_internal/core/models/runs.py @@ -151,6 +151,7 @@ class JobTerminationReason(str, Enum): CREATING_CONTAINER_ERROR = "creating_container_error" EXECUTOR_ERROR = "executor_error" MAX_DURATION_EXCEEDED = "max_duration_exceeded" + LOG_QUOTA_EXCEEDED = "log_quota_exceeded" def to_status(self) -> JobStatus: mapping = { @@ -173,6 +174,7 @@ def to_status(self) -> JobStatus: self.CREATING_CONTAINER_ERROR: JobStatus.FAILED, self.EXECUTOR_ERROR: JobStatus.FAILED, self.MAX_DURATION_EXCEEDED: JobStatus.TERMINATED, + self.LOG_QUOTA_EXCEEDED: JobStatus.FAILED, } return mapping[self] @@ -205,6 +207,7 @@ def to_error(self) -> Optional[str]: JobTerminationReason.CREATING_CONTAINER_ERROR: "runner error", JobTerminationReason.EXECUTOR_ERROR: "executor error", JobTerminationReason.MAX_DURATION_EXCEEDED: "max duration exceeded", + JobTerminationReason.LOG_QUOTA_EXCEEDED: "log quota exceeded", } return error_mapping.get(self) @@ -275,6 +278,9 @@ class JobSpec(CoreModel): privileged: bool = False single_branch: Optional[bool] = None max_duration: Optional[int] + log_quota_hour: Optional[int] = None + """`log_quota_hour` is the maximum number of bytes of log output per calendar hour. + `None` means unlimited. Set from `DSTACK_SERVER_LOG_QUOTA_PER_JOB_HOUR`.""" stop_duration: Optional[int] = None utilization_policy: Optional[UtilizationPolicy] = None registry_auth: Optional[RegistryAuth] diff --git a/src/dstack/_internal/server/schemas/runner.py b/src/dstack/_internal/server/schemas/runner.py index 43e9ddbb8..38274124e 100644 --- a/src/dstack/_internal/server/schemas/runner.py +++ b/src/dstack/_internal/server/schemas/runner.py @@ -83,6 +83,7 @@ class SubmitBody(CoreModel): "gateway", "single_branch", "max_duration", + "log_quota_hour", "ssh_key", "working_dir", "repo_dir", diff --git a/src/dstack/_internal/server/services/jobs/configurators/base.py b/src/dstack/_internal/server/services/jobs/configurators/base.py index b73c9bbe6..7231104cb 100644 --- a/src/dstack/_internal/server/services/jobs/configurators/base.py +++ b/src/dstack/_internal/server/services/jobs/configurators/base.py @@ -48,6 +48,7 @@ from dstack._internal.core.models.volumes import MountPoint, VolumeMountPoint from dstack._internal.core.services.profiles import get_retry from dstack._internal.core.services.ssh.ports import filter_reserved_ports +from dstack._internal.server import settings as server_settings from dstack._internal.server.services.docker import ImageConfig, get_image_config from dstack._internal.utils import crypto from dstack._internal.utils.common import run_async @@ -169,6 +170,7 @@ async def _get_job_spec( privileged=self._privileged(), single_branch=self._single_branch(), max_duration=self._max_duration(), + log_quota_hour=self._log_quota_hour(), stop_duration=self._stop_duration(), utilization_policy=self._utilization_policy(), registry_auth=self._registry_auth(), @@ -304,6 +306,10 @@ def _max_duration(self) -> Optional[int]: return None return self.run_spec.merged_profile.max_duration + def _log_quota_hour(self) -> Optional[int]: + quota = server_settings.SERVER_LOG_QUOTA_PER_JOB_HOUR + return quota if quota > 0 else None + def _stop_duration(self) -> Optional[int]: if self.run_spec.merged_profile.stop_duration is None: return DEFAULT_STOP_DURATION diff --git a/src/dstack/_internal/server/settings.py b/src/dstack/_internal/server/settings.py index 71a43a30b..01216cff3 100644 --- a/src/dstack/_internal/server/settings.py +++ b/src/dstack/_internal/server/settings.py @@ -133,6 +133,11 @@ SERVER_TEMPLATES_REPO = os.getenv("DSTACK_SERVER_TEMPLATES_REPO") +# Per-job log quota: maximum bytes of log output per calendar hour. 0 = unlimited. +SERVER_LOG_QUOTA_PER_JOB_HOUR = int( + os.getenv("DSTACK_SERVER_LOG_QUOTA_PER_JOB_HOUR", 50 * 1024 * 1024) # 50 MB +) + # Development settings SQL_ECHO_ENABLED = os.getenv("DSTACK_SQL_ECHO_ENABLED") is not None diff --git a/src/tests/_internal/server/routers/test_runs.py b/src/tests/_internal/server/routers/test_runs.py index 1d72ba552..32b1cd912 100644 --- a/src/tests/_internal/server/routers/test_runs.py +++ b/src/tests/_internal/server/routers/test_runs.py @@ -269,6 +269,7 @@ def get_dev_env_run_plan_dict( "replica_group": "0", "single_branch": False, "max_duration": None, + "log_quota_hour": 52428800, "stop_duration": 300, "utilization_policy": None, "registry_auth": None, @@ -506,6 +507,7 @@ def get_dev_env_run_dict( "replica_group": "0", "single_branch": False, "max_duration": None, + "log_quota_hour": 52428800, "stop_duration": 300, "utilization_policy": None, "registry_auth": None, From c1c8150514ed80057868dc966117acdaa5a9fd3b Mon Sep 17 00:00:00 2001 From: Andrey Cheptsov Date: Mon, 16 Mar 2026 12:57:47 +0100 Subject: [PATCH 2/5] Fix gci formatting in logs.go Co-Authored-By: Claude Opus 4.6 --- runner/internal/runner/executor/logs.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/runner/internal/runner/executor/logs.go b/runner/internal/runner/executor/logs.go index 670c9a4ab..febe2348f 100644 --- a/runner/internal/runner/executor/logs.go +++ b/runner/internal/runner/executor/logs.go @@ -15,10 +15,10 @@ type appendWriter struct { history []schemas.LogEvent timestamp *MonotonicTimestamp // shares with executor - quota int // bytes per hour, 0 = unlimited - bytesInHour int // bytes written in current hour bucket - hourStart int64 // unix timestamp (seconds) of current hour bucket start - quotaExceeded chan struct{} // closed when quota is exceeded (out-of-band signal) + quota int // bytes per hour, 0 = unlimited + bytesInHour int // bytes written in current hour bucket + hourStart int64 // unix timestamp (seconds) of current hour bucket start + quotaExceeded chan struct{} // closed when quota is exceeded (out-of-band signal) exceededOnce sync.Once } From 6247739b98749e145a0ab1e4e5f5ff71bace2bba Mon Sep 17 00:00:00 2001 From: Andrey Cheptsov Date: Tue, 17 Mar 2026 03:38:15 -0700 Subject: [PATCH 3/5] Use monotonic time for log quota hour buckets Avoids susceptibility to wall clock adjustments (e.g., NTP sync) by tracking elapsed hours from a monotonic start time instead of calendar-hour boundaries. Co-Authored-By: Claude Opus 4.6 --- runner/internal/runner/executor/logs.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/runner/internal/runner/executor/logs.go b/runner/internal/runner/executor/logs.go index febe2348f..54b087e32 100644 --- a/runner/internal/runner/executor/logs.go +++ b/runner/internal/runner/executor/logs.go @@ -2,6 +2,7 @@ package executor import ( "errors" + "math" "sync" "time" @@ -17,7 +18,8 @@ type appendWriter struct { quota int // bytes per hour, 0 = unlimited bytesInHour int // bytes written in current hour bucket - hourStart int64 // unix timestamp (seconds) of current hour bucket start + currentHour int // monotonic hour bucket index since timeStarted + timeStarted time.Time // monotonic reference point for hour buckets quotaExceeded chan struct{} // closed when quota is exceeded (out-of-band signal) exceededOnce sync.Once } @@ -33,6 +35,7 @@ func newAppendWriter(mu *sync.RWMutex, timestamp *MonotonicTimestamp) *appendWri func (w *appendWriter) SetQuota(quota int) { w.quota = quota + w.timeStarted = time.Now() } // QuotaExceeded returns a channel that is closed when the log quota is exceeded. @@ -45,11 +48,10 @@ func (w *appendWriter) Write(p []byte) (n int, err error) { defer w.mu.Unlock() if w.quota > 0 { - now := time.Now().Unix() - currentHour := (now / 3600) * 3600 - if currentHour != w.hourStart { + hour := int(math.Floor(time.Since(w.timeStarted).Hours())) + if hour != w.currentHour { w.bytesInHour = 0 - w.hourStart = currentHour + w.currentHour = hour } if w.bytesInHour+len(p) > w.quota { w.exceededOnce.Do(func() { close(w.quotaExceeded) }) From 9bed3388167e5489fce38e0ae2dd14576a4935be Mon Sep 17 00:00:00 2001 From: Andrey Cheptsov Date: Tue, 17 Mar 2026 03:51:49 -0700 Subject: [PATCH 4/5] Move log_quota_hour from JobSpec to SubmitBody for backward compatibility MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit JobSpec is sent client→server as part of RunPlan.current_resource, so new fields break older servers. log_quota_hour is only needed server→runner, so it belongs on SubmitBody instead. Co-Authored-By: Claude Opus 4.6 --- runner/internal/runner/executor/executor.go | 6 +++--- runner/internal/runner/executor/executor_test.go | 1 - runner/internal/runner/schemas/schemas.go | 2 +- src/dstack/_internal/core/models/runs.py | 3 --- src/dstack/_internal/server/schemas/runner.py | 3 ++- .../_internal/server/services/jobs/configurators/base.py | 6 ------ src/dstack/_internal/server/services/runner/client.py | 3 +++ src/tests/_internal/server/routers/test_runs.py | 2 -- 8 files changed, 9 insertions(+), 17 deletions(-) diff --git a/runner/internal/runner/executor/executor.go b/runner/internal/runner/executor/executor.go index a6655147b..3662a45aa 100644 --- a/runner/internal/runner/executor/executor.go +++ b/runner/internal/runner/executor/executor.go @@ -262,12 +262,12 @@ func (ex *RunExecutor) Run(ctx context.Context) (err error) { } if errors.Is(err, ErrLogQuotaExceeded) { - log.Error(ctx, "Log quota exceeded", "quota", ex.jobSpec.LogQuotaHour) + log.Error(ctx, "Log quota exceeded", "quota", ex.jobLogs.quota) ex.SetJobStateWithTerminationReason( ctx, schemas.JobStateFailed, types.TerminationReasonLogQuotaExceeded, - fmt.Sprintf("Job log output exceeded the hourly quota of %d bytes", ex.jobSpec.LogQuotaHour), + fmt.Sprintf("Job log output exceeded the hourly quota of %d bytes", ex.jobLogs.quota), ) return fmt.Errorf("log quota exceeded: %w", err) } @@ -294,7 +294,7 @@ func (ex *RunExecutor) SetJob(body schemas.SubmitBody) { ex.clusterInfo = body.ClusterInfo ex.secrets = body.Secrets ex.repoCredentials = body.RepoCredentials - ex.jobLogs.SetQuota(body.JobSpec.LogQuotaHour) + ex.jobLogs.SetQuota(body.LogQuotaHour) ex.state = WaitCode } diff --git a/runner/internal/runner/executor/executor_test.go b/runner/internal/runner/executor/executor_test.go index f4330ec76..2330cd6f3 100644 --- a/runner/internal/runner/executor/executor_test.go +++ b/runner/internal/runner/executor/executor_test.go @@ -150,7 +150,6 @@ func TestExecutor_LogQuota(t *testing.T) { ex.killDelay = 500 * time.Millisecond // Output >100 bytes to trigger the quota ex.jobSpec.Commands = append(ex.jobSpec.Commands, "for i in $(seq 1 20); do echo 'This line is long enough to exceed the quota easily'; done") - ex.jobSpec.LogQuotaHour = 100 // 100 bytes ex.jobLogs.SetQuota(100) makeCodeTar(t, ex) diff --git a/runner/internal/runner/schemas/schemas.go b/runner/internal/runner/schemas/schemas.go index e8a37f5d9..47706228c 100644 --- a/runner/internal/runner/schemas/schemas.go +++ b/runner/internal/runner/schemas/schemas.go @@ -36,6 +36,7 @@ type SubmitBody struct { ClusterInfo ClusterInfo `json:"cluster_info"` Secrets map[string]string `json:"secrets"` RepoCredentials *RepoCredentials `json:"repo_credentials"` + LogQuotaHour int `json:"log_quota_hour"` // bytes per hour, 0 = unlimited } type PullResponse struct { @@ -79,7 +80,6 @@ type JobSpec struct { Env map[string]string `json:"env"` SingleBranch bool `json:"single_branch"` MaxDuration int `json:"max_duration"` - LogQuotaHour int `json:"log_quota_hour"` // bytes per hour, 0 = unlimited SSHKey *SSHKey `json:"ssh_key"` WorkingDir *string `json:"working_dir"` RepoDir *string `json:"repo_dir"` diff --git a/src/dstack/_internal/core/models/runs.py b/src/dstack/_internal/core/models/runs.py index f70281f90..fdb7b58cd 100644 --- a/src/dstack/_internal/core/models/runs.py +++ b/src/dstack/_internal/core/models/runs.py @@ -278,9 +278,6 @@ class JobSpec(CoreModel): privileged: bool = False single_branch: Optional[bool] = None max_duration: Optional[int] - log_quota_hour: Optional[int] = None - """`log_quota_hour` is the maximum number of bytes of log output per calendar hour. - `None` means unlimited. Set from `DSTACK_SERVER_LOG_QUOTA_PER_JOB_HOUR`.""" stop_duration: Optional[int] = None utilization_policy: Optional[UtilizationPolicy] = None registry_auth: Optional[RegistryAuth] diff --git a/src/dstack/_internal/server/schemas/runner.py b/src/dstack/_internal/server/schemas/runner.py index 38274124e..852421251 100644 --- a/src/dstack/_internal/server/schemas/runner.py +++ b/src/dstack/_internal/server/schemas/runner.py @@ -83,7 +83,6 @@ class SubmitBody(CoreModel): "gateway", "single_branch", "max_duration", - "log_quota_hour", "ssh_key", "working_dir", "repo_dir", @@ -104,6 +103,8 @@ class SubmitBody(CoreModel): cluster_info: Annotated[Optional[ClusterInfo], Field(include=True)] secrets: Annotated[Optional[Dict[str, str]], Field(include=True)] repo_credentials: Annotated[Optional[RemoteRepoCreds], Field(include=True)] + log_quota_hour: Optional[int] = None + """Maximum bytes of log output per hour. None means unlimited.""" # TODO: remove `run_spec` once instances deployed with 0.19.8 or earlier are no longer supported. run_spec: Annotated[ RunSpec, diff --git a/src/dstack/_internal/server/services/jobs/configurators/base.py b/src/dstack/_internal/server/services/jobs/configurators/base.py index 7231104cb..b73c9bbe6 100644 --- a/src/dstack/_internal/server/services/jobs/configurators/base.py +++ b/src/dstack/_internal/server/services/jobs/configurators/base.py @@ -48,7 +48,6 @@ from dstack._internal.core.models.volumes import MountPoint, VolumeMountPoint from dstack._internal.core.services.profiles import get_retry from dstack._internal.core.services.ssh.ports import filter_reserved_ports -from dstack._internal.server import settings as server_settings from dstack._internal.server.services.docker import ImageConfig, get_image_config from dstack._internal.utils import crypto from dstack._internal.utils.common import run_async @@ -170,7 +169,6 @@ async def _get_job_spec( privileged=self._privileged(), single_branch=self._single_branch(), max_duration=self._max_duration(), - log_quota_hour=self._log_quota_hour(), stop_duration=self._stop_duration(), utilization_policy=self._utilization_policy(), registry_auth=self._registry_auth(), @@ -306,10 +304,6 @@ def _max_duration(self) -> Optional[int]: return None return self.run_spec.merged_profile.max_duration - def _log_quota_hour(self) -> Optional[int]: - quota = server_settings.SERVER_LOG_QUOTA_PER_JOB_HOUR - return quota if quota > 0 else None - def _stop_duration(self) -> Optional[int]: if self.run_spec.merged_profile.stop_duration is None: return DEFAULT_STOP_DURATION diff --git a/src/dstack/_internal/server/services/runner/client.py b/src/dstack/_internal/server/services/runner/client.py index c31726e76..4b78eefee 100644 --- a/src/dstack/_internal/server/services/runner/client.py +++ b/src/dstack/_internal/server/services/runner/client.py @@ -15,6 +15,7 @@ from dstack._internal.core.models.resources import Memory from dstack._internal.core.models.runs import ClusterInfo, Job, Run from dstack._internal.core.models.volumes import InstanceMountPoint, Volume, VolumeMountPoint +from dstack._internal.server import settings as server_settings from dstack._internal.server.schemas.instances import InstanceCheck from dstack._internal.server.schemas.runner import ( ComponentInfo, @@ -93,6 +94,7 @@ def submit_job( merged_env.update(job_spec.env) job_spec = job_spec.copy(deep=True) job_spec.env = merged_env + quota = server_settings.SERVER_LOG_QUOTA_PER_JOB_HOUR body = SubmitBody( run=run, job_spec=job_spec, @@ -100,6 +102,7 @@ def submit_job( cluster_info=cluster_info, secrets=secrets, repo_credentials=repo_credentials, + log_quota_hour=quota if quota > 0 else None, run_spec=run.run_spec, ) resp = requests.post( diff --git a/src/tests/_internal/server/routers/test_runs.py b/src/tests/_internal/server/routers/test_runs.py index 32b1cd912..1d72ba552 100644 --- a/src/tests/_internal/server/routers/test_runs.py +++ b/src/tests/_internal/server/routers/test_runs.py @@ -269,7 +269,6 @@ def get_dev_env_run_plan_dict( "replica_group": "0", "single_branch": False, "max_duration": None, - "log_quota_hour": 52428800, "stop_duration": 300, "utilization_policy": None, "registry_auth": None, @@ -507,7 +506,6 @@ def get_dev_env_run_dict( "replica_group": "0", "single_branch": False, "max_duration": None, - "log_quota_hour": 52428800, "stop_duration": 300, "utilization_policy": None, "registry_auth": None, From 7d127f0bf9ce450465ac1f6860d1715a13c19450 Mon Sep 17 00:00:00 2001 From: Andrey Cheptsov Date: Tue, 17 Mar 2026 04:15:23 -0700 Subject: [PATCH 5/5] Fix SubmitBody.log_quota_hour serialization with Field(include=True) Without Field(include=True), pydantic's .json() drops fields that lack an include annotation when other fields in the model use Field(include=...). Co-Authored-By: Claude Opus 4.6 --- src/dstack/_internal/server/schemas/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dstack/_internal/server/schemas/runner.py b/src/dstack/_internal/server/schemas/runner.py index 852421251..549ff7914 100644 --- a/src/dstack/_internal/server/schemas/runner.py +++ b/src/dstack/_internal/server/schemas/runner.py @@ -103,7 +103,7 @@ class SubmitBody(CoreModel): cluster_info: Annotated[Optional[ClusterInfo], Field(include=True)] secrets: Annotated[Optional[Dict[str, str]], Field(include=True)] repo_credentials: Annotated[Optional[RemoteRepoCreds], Field(include=True)] - log_quota_hour: Optional[int] = None + log_quota_hour: Annotated[Optional[int], Field(include=True)] = None """Maximum bytes of log output per hour. None means unlimited.""" # TODO: remove `run_spec` once instances deployed with 0.19.8 or earlier are no longer supported. run_spec: Annotated[