Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/docs/concepts/exports.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,19 @@ $ dstack fleet list

Imported fleets can be used for runs just like the project's own fleets.

<div editor-title=".dstack.yml">

```yaml
type: dev-environment
ide: vscode

fleets:
- my-local-fleet
- team-a/my-fleet
```

</div>

!!! info "Tenant isolation"
Exported fleets share the same access model as regular fleets. See [Tenant isolation](fleets.md#tenant-isolation) for details.

Expand Down
14 changes: 14 additions & 0 deletions src/dstack/_internal/core/compatibility/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from dstack._internal.core.models.common import EntityReference
from dstack._internal.core.models.profiles import ProfileParams


def patch_profile_params(params: ProfileParams) -> None:
# If there are no project-prefixed fleets, replace all EntityReference with str
# for compatibility with pre-0.20.14 servers that don't support EntityReference.
if params.fleets is not None and all(
EntityReference.parse(f).project is None for f in params.fleets
):
params.fleets = [
fleet_ref.format() if isinstance(fleet_ref, EntityReference) else fleet_ref
for fleet_ref in params.fleets
]
10 changes: 9 additions & 1 deletion src/dstack/_internal/core/compatibility/fleets.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from typing import Optional

from dstack._internal.core.models.common import IncludeExcludeDictType, IncludeExcludeSetType
from dstack._internal.core.compatibility.common import patch_profile_params
from dstack._internal.core.models.common import (
IncludeExcludeDictType,
IncludeExcludeSetType,
)
from dstack._internal.core.models.fleets import ApplyFleetPlanInput, FleetSpec


Expand Down Expand Up @@ -56,3 +60,7 @@ def get_fleet_spec_excludes(fleet_spec: FleetSpec) -> Optional[IncludeExcludeDic
if spec_excludes:
return spec_excludes
return None


def patch_fleet_spec(spec: FleetSpec) -> None:
patch_profile_params(spec.profile)
12 changes: 11 additions & 1 deletion src/dstack/_internal/core/compatibility/runs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from typing import Optional

from dstack._internal.core.models.common import IncludeExcludeDictType, IncludeExcludeSetType
from dstack._internal.core.compatibility.common import patch_profile_params
from dstack._internal.core.models.common import (
IncludeExcludeDictType,
IncludeExcludeSetType,
)
from dstack._internal.core.models.configurations import ServiceConfiguration
from dstack._internal.core.models.routers import SGLangServiceRouterConfig
from dstack._internal.core.models.runs import (
Expand Down Expand Up @@ -138,3 +142,9 @@ def get_job_submission_excludes(job_submissions: list[JobSubmission]) -> Include
submission_excludes["job_runtime_data"] = jrd_excludes

return submission_excludes


def patch_run_spec(run_spec: RunSpec) -> None:
patch_profile_params(run_spec.configuration)
if run_spec.profile is not None:
patch_profile_params(run_spec.profile)
28 changes: 28 additions & 0 deletions src/dstack/_internal/core/models/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,31 @@ class ApplyAction(str, Enum):
class NetworkMode(str, Enum):
HOST = "host"
BRIDGE = "bridge"


class EntityReference(CoreModel):
"""
Cross-project entity reference.
"""

project: Annotated[
Optional[str],
Field(description="The project name. If unspecified, refers to the current project"),
]
name: Annotated[str, Field(description="The entity name")]

@classmethod
def parse(cls, v: Union[str, "EntityReference"]) -> "EntityReference":
if isinstance(v, EntityReference):
return v
parts = v.split("/")
if len(parts) == 1:
return cls(project=None, name=parts[0])
if len(parts) == 2:
return cls(project=parts[0], name=parts[1])
raise ValueError("Invalid entity reference. Only `<project>/<name>` format is allowed")

def format(self) -> str:
if self.project is None:
return self.name
return f"{self.project}/{self.name}"
18 changes: 17 additions & 1 deletion src/dstack/_internal/core/models/profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
CoreConfig,
CoreModel,
Duration,
EntityReference,
generate_dual_core_model,
)
from dstack._internal.utils.common import list_enum_values_for_annotation
Expand Down Expand Up @@ -360,7 +361,21 @@ class ProfileParams(CoreModel):
Field(description=("The schedule for starting the run at specified time")),
] = None
fleets: Annotated[
Optional[list[str]], Field(description="The fleets considered for reuse")
Optional[
list[
Union[
EntityReference,
str, # For server response compatibility with pre-0.20.14 clients
]
]
],
Field(
description=(
"The fleets considered for reuse."
" For fleets owned by the current project, specify fleet names."
" For imported fleets, specify `<project name>/<fleet name>`"
),
),
] = None
tags: Annotated[
Optional[Dict[str, str]],
Expand All @@ -382,6 +397,7 @@ class ProfileParams(CoreModel):
_validate_idle_duration = validator("idle_duration", pre=True, allow_reuse=True)(
parse_idle_duration
)
_validate_fleets = validator("fleets", allow_reuse=True, each_item=True)(EntityReference.parse)
_validate_tags = validator("tags", pre=True, allow_reuse=True)(tags_validator)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,7 @@ async def _refetch_fleet_models_with_instances(
) -> list[FleetModel]:
res = await session.execute(
select(FleetModel)
.join(FleetModel.project) # can be referenced by fleet_filters
.outerjoin(FleetModel.instances)
.where(
FleetModel.id.in_(fleets_ids),
Expand Down
13 changes: 13 additions & 0 deletions src/dstack/_internal/server/compatibility/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,23 @@

from packaging.version import Version

from dstack._internal.core.models.common import EntityReference
from dstack._internal.core.models.instances import (
InstanceAvailability,
InstanceOfferWithAvailability,
)
from dstack._internal.core.models.profiles import ProfileParams


def patch_profile_params(params: ProfileParams, client_version: Optional[Version]) -> None:
if client_version is None:
return
# Clients prior to 0.20.14 only support `list[str]` in `fleets`
if client_version < Version("0.20.14") and params.fleets is not None:
params.fleets = [
fleet_ref.format() if isinstance(fleet_ref, EntityReference) else fleet_ref
for fleet_ref in params.fleets
]


def patch_offers_list(
Expand Down
23 changes: 23 additions & 0 deletions src/dstack/_internal/server/compatibility/fleets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from typing import Optional

from packaging.version import Version

from dstack._internal.core.models.fleets import Fleet, FleetPlan, FleetSpec
from dstack._internal.server.compatibility.common import patch_offers_list, patch_profile_params


def patch_fleet_plan(fleet_plan: FleetPlan, client_version: Optional[Version]) -> None:
patch_fleet_spec(fleet_plan.spec, client_version)
if fleet_plan.effective_spec is not None:
patch_fleet_spec(fleet_plan.effective_spec, client_version)
if fleet_plan.current_resource is not None:
patch_fleet(fleet_plan.current_resource, client_version)
patch_offers_list(fleet_plan.offers, client_version)


def patch_fleet(fleet: Fleet, client_version: Optional[Version]) -> None:
patch_fleet_spec(fleet.spec, client_version)


def patch_fleet_spec(fleet_spec: FleetSpec, client_version: Optional[Version]) -> None:
patch_profile_params(fleet_spec.profile, client_version)
5 changes: 4 additions & 1 deletion src/dstack/_internal/server/compatibility/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from dstack._internal.core.models.configurations import SERVICE_HTTPS_DEFAULT, ServiceConfiguration
from dstack._internal.core.models.runs import Run, RunPlan, RunSpec
from dstack._internal.server.compatibility.common import patch_offers_list
from dstack._internal.server.compatibility.common import patch_offers_list, patch_profile_params


def patch_run_plan(run_plan: RunPlan, client_version: Optional[Version]) -> None:
Expand Down Expand Up @@ -41,3 +41,6 @@ def patch_run_spec(run_spec: RunSpec, client_version: Optional[Version]) -> None
and run_spec.configuration.https is None
):
run_spec.configuration.https = SERVICE_HTTPS_DEFAULT
patch_profile_params(run_spec.configuration, client_version)
if run_spec.profile is not None:
patch_profile_params(run_spec.profile, client_version)
82 changes: 45 additions & 37 deletions src/dstack/_internal/server/routers/fleets.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import dstack._internal.server.services.fleets as fleets_services
from dstack._internal.core.errors import ResourceNotExistsError
from dstack._internal.core.models.fleets import Fleet, FleetPlan
from dstack._internal.server.compatibility.common import patch_offers_list
from dstack._internal.server.compatibility.fleets import patch_fleet, patch_fleet_plan
from dstack._internal.server.db import get_session
from dstack._internal.server.deps import Project
from dstack._internal.server.models import ProjectModel, UserModel
Expand Down Expand Up @@ -50,6 +50,7 @@ async def list_fleets(
body: ListFleetsRequest,
session: AsyncSession = Depends(get_session),
user: UserModel = Depends(Authenticated()),
client_version: Optional[Version] = Depends(get_client_version),
):
"""
Returns all fleets and instances within them visible to user sorted by descending `created_at`.
Expand All @@ -59,26 +60,28 @@ async def list_fleets(
The results are paginated. To get the next page, pass `created_at` and `id` of
the last fleet from the previous page as `prev_created_at` and `prev_id`.
"""
return CustomORJSONResponse(
await fleets_services.list_fleets(
session=session,
user=user,
project_name=body.project_name,
only_active=body.only_active,
include_imported=body.include_imported,
prev_created_at=body.prev_created_at,
prev_id=body.prev_id,
limit=body.limit,
ascending=body.ascending,
)
fleet_list = await fleets_services.list_fleets(
session=session,
user=user,
project_name=body.project_name,
only_active=body.only_active,
include_imported=body.include_imported,
prev_created_at=body.prev_created_at,
prev_id=body.prev_id,
limit=body.limit,
ascending=body.ascending,
)
for fleet in fleet_list:
patch_fleet(fleet, client_version)
return CustomORJSONResponse(fleet_list)


@project_router.post("/list", response_model=List[Fleet])
async def list_project_fleets(
body: Optional[ListProjectFleetsRequest] = None,
session: AsyncSession = Depends(get_session),
user_project: Tuple[UserModel, ProjectModel] = Depends(ProjectMember()),
client_version: Optional[Version] = Depends(get_client_version),
):
"""
Returns all fleets in the project.
Expand All @@ -87,13 +90,14 @@ async def list_project_fleets(
_, project = user_project
if body is None:
body = ListProjectFleetsRequest()
return CustomORJSONResponse(
await fleets_services.list_project_fleets(
session=session,
project=project,
include_imported=body.include_imported,
)
fleet_list = await fleets_services.list_project_fleets(
session=session,
project=project,
include_imported=body.include_imported,
)
for fleet in fleet_list:
patch_fleet(fleet, client_version)
return CustomORJSONResponse(fleet_list)


@project_router.post("/get", response_model=Fleet)
Expand All @@ -102,6 +106,7 @@ async def get_fleet(
session: AsyncSession = Depends(get_session),
user: UserModel = Depends(Authenticated()),
project: ProjectModel = Depends(Project()),
client_version: Optional[Version] = Depends(get_client_version),
):
"""
Returns a fleet given `name` or `id`.
Expand All @@ -116,6 +121,7 @@ async def get_fleet(
)
if fleet is None:
raise ResourceNotExistsError()
patch_fleet(fleet, client_version)
return CustomORJSONResponse(fleet)


Expand All @@ -136,7 +142,7 @@ async def get_plan(
user=user,
spec=body.spec,
)
patch_offers_list(plan.offers, client_version)
patch_fleet_plan(plan, client_version)
return CustomORJSONResponse(plan)


Expand All @@ -146,23 +152,24 @@ async def apply_plan(
session: AsyncSession = Depends(get_session),
user_project: Tuple[UserModel, ProjectModel] = Depends(ProjectMember()),
pipeline_hinter: PipelineHinterProtocol = Depends(get_pipeline_hinter),
client_version: Optional[Version] = Depends(get_client_version),
):
"""
Creates a new fleet or updates an existing fleet.
Errors if the expected current resource from the plan does not match the current resource.
Use `force: true` to apply even if the current resource does not match.
"""
user, project = user_project
return CustomORJSONResponse(
await fleets_services.apply_plan(
session=session,
user=user,
project=project,
plan=body.plan,
force=body.force,
pipeline_hinter=pipeline_hinter,
)
fleet = await fleets_services.apply_plan(
session=session,
user=user,
project=project,
plan=body.plan,
force=body.force,
pipeline_hinter=pipeline_hinter,
)
patch_fleet(fleet, client_version)
return CustomORJSONResponse(fleet)


@project_router.post("/create", response_model=Fleet, deprecated=True)
Expand All @@ -171,20 +178,21 @@ async def create_fleet(
session: AsyncSession = Depends(get_session),
user_project: Tuple[UserModel, ProjectModel] = Depends(ProjectMember()),
pipeline_hinter: PipelineHinterProtocol = Depends(get_pipeline_hinter),
client_version: Optional[Version] = Depends(get_client_version),
):
"""
Creates a fleet given a fleet configuration.
"""
user, project = user_project
return CustomORJSONResponse(
await fleets_services.create_fleet(
session=session,
project=project,
user=user,
spec=body.spec,
pipeline_hinter=pipeline_hinter,
)
fleet = await fleets_services.create_fleet(
session=session,
project=project,
user=user,
spec=body.spec,
pipeline_hinter=pipeline_hinter,
)
patch_fleet(fleet, client_version)
return CustomORJSONResponse(fleet)


@project_router.post("/delete")
Expand Down
Loading
Loading