Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions src/dstack/_internal/server/services/backends/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import heapq
import json
import time
from collections.abc import Iterable, Iterator
Expand Down Expand Up @@ -43,6 +42,7 @@
from dstack._internal.core.models.runs import Requirements
from dstack._internal.server import settings
from dstack._internal.server.models import BackendModel, DecryptedString, ProjectModel
from dstack._internal.server.services.offers import merge_offer_iterables
from dstack._internal.settings import LOCAL_BACKEND_ENABLED
from dstack._internal.utils.common import run_async
from dstack._internal.utils.logging import get_logger
Expand Down Expand Up @@ -459,7 +459,7 @@ async def get_backend_offers(
backends: List[Backend],
requirements: Requirements,
exclude_not_available: bool = False,
) -> Iterator[Tuple[Backend, InstanceOfferWithAvailability]]:
) -> Iterable[Tuple[Backend, InstanceOfferWithAvailability]]:
"""
Yields backend offers satisfying `requirements` sorted by price.
"""
Expand All @@ -474,7 +474,7 @@ def get_filtered_offers_with_backends(

logger.debug("Requesting instance offers from backends: %s", [b.TYPE.value for b in backends])
tasks = [run_async(get_offers_tracked, backend, requirements) for backend in backends]
offers_by_backend = []
offers_by_backend: list[Iterable[tuple[Backend, InstanceOfferWithAvailability]]] = []
for backend, result in zip(backends, await asyncio.gather(*tasks, return_exceptions=True)):
if isinstance(result, BackendError):
logger.warning(
Expand All @@ -491,9 +491,7 @@ def get_filtered_offers_with_backends(
)
continue
offers_by_backend.append(get_filtered_offers_with_backends(backend, result))
# Merge preserving order for every backend.
offers = heapq.merge(*offers_by_backend, key=lambda i: i[1].price)
return offers
return merge_offer_iterables(*offers_by_backend)


def check_backend_type_available(backend_type: BackendType):
Expand Down
18 changes: 17 additions & 1 deletion src/dstack/_internal/server/services/offers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import heapq
import itertools
from collections.abc import Container, Iterable, Iterator
from typing import List, Literal, Optional, Tuple, Union
from typing import List, Literal, Optional, Tuple, TypeVar, Union

import gpuhunt

Expand Down Expand Up @@ -116,6 +117,21 @@ async def get_offers_by_requirements(
return sorted(offers, key=lambda i: not i[1].availability.is_available())


T = TypeVar("T")


def merge_offer_iterables(
*iterables: Iterable[tuple[T, InstanceOfferWithAvailability]],
) -> Iterable[tuple[T, InstanceOfferWithAvailability]]:
"""
Merge offers from different sources (e.g., different backends, different fleets).

Some backends produce offers that are not sorted by price (e.g., `vastai` sorts by pod score).
That backend-specific order is preserved.
"""
return heapq.merge(*iterables, key=lambda i: i[1].price)


def is_divisible_into_blocks(
cpu_count: int, gpu_count: int, blocks: Union[int, Literal["auto"]]
) -> tuple[bool, int]:
Expand Down
25 changes: 13 additions & 12 deletions src/dstack/_internal/server/services/runs/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@
is_multinode_job,
remove_job_spec_sensitive_info,
)
from dstack._internal.server.services.offers import get_offers_by_requirements
from dstack._internal.server.services.offers import (
get_offers_by_requirements,
merge_offer_iterables,
)
from dstack._internal.server.services.requirements.combine import (
combine_fleet_and_run_profiles,
combine_fleet_and_run_requirements,
Expand Down Expand Up @@ -711,11 +714,10 @@ async def get_backend_offers_in_run_candidate_fleets(
run_model=None,
run_spec=run_spec,
)
deduplicated_backend_offers: dict[
Hashable,
tuple[Backend, InstanceOfferWithAvailability],
] = {}
seen_offer_identities = set()
offers: list[tuple[Backend, InstanceOfferWithAvailability]] = []
for candidate_fleet_model in candidate_fleet_models:
offers_from_fleet = []
for backend, offer in await _get_backend_offers_in_fleet(
project=project,
fleet_model=candidate_fleet_model,
Expand All @@ -724,13 +726,12 @@ async def get_backend_offers_in_run_candidate_fleets(
volumes=volumes,
max_offers=max_offers_per_fleet,
):
deduplicated_backend_offers.setdefault(
_get_backend_offer_identity(offer),
(backend, offer),
)
backend_offers = list(deduplicated_backend_offers.values())
backend_offers.sort(key=lambda offer: offer[1].price)
return backend_offers
offer_identity = _get_backend_offer_identity(offer)
if offer_identity not in seen_offer_identities:
offers_from_fleet.append((backend, offer))
seen_offer_identities.add(offer_identity)
offers = list(merge_offer_iterables(offers, offers_from_fleet))
return offers


async def _get_offers_in_run_candidate_fleets(
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/server/testing/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -751,11 +751,13 @@ def get_fleet_configuration(
name: str = "test-fleet",
nodes: FleetNodesSpec = FleetNodesSpec(min=1, target=1, max=1),
placement: Optional[InstanceGroupPlacement] = None,
backends: Optional[list[BackendType]] = None,
) -> FleetConfiguration:
return FleetConfiguration(
name=name,
nodes=nodes,
placement=placement,
backends=backends,
)


Expand Down
148 changes: 148 additions & 0 deletions src/tests/_internal/server/routers/test_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
ScalingSpec,
ServiceConfiguration,
TaskConfiguration,
parse_run_configuration,
)
from dstack._internal.core.models.fleets import FleetNodesSpec
from dstack._internal.core.models.gateways import GatewayStatus
Expand Down Expand Up @@ -66,6 +67,7 @@
create_run,
create_user,
get_auth_headers,
get_fleet_configuration,
get_fleet_spec,
get_instance_offer_with_availability,
get_job_provisioning_data,
Expand Down Expand Up @@ -1916,6 +1918,152 @@ async def test_returns_no_offers_if_imported_fleet_specified_without_project_pre
assert response_json["project_name"] == "importer"
assert len(response_json["job_plans"][0]["offers"]) == 0

@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
@pytest.mark.parametrize(
"configuration",
[
pytest.param({"type": "dev-environment"}, id="regular-configuration"),
pytest.param(
{"type": "task", "commands": [":"], "image": "scratch"},
id="special-configuration-used-by-dstack-offer-cli-command",
),
pytest.param(
{"type": "task", "commands": [":"], "image": "scratch", "fleets": ["test-fleet"]},
id="special-configuration-used-by-dstack-offer-cli-command-with-fleets", # --fleet
),
],
)
async def test_preserves_backend_specific_offer_order(
self,
test_db,
session: AsyncSession,
client: AsyncClient,
configuration: dict,
) -> None:
user = await create_user(session=session, global_role=GlobalRole.USER)
project = await create_project(session=session, owner=user)
await add_project_member(
session=session,
project=project,
user=user,
project_role=ProjectRole.USER,
)
repo = await create_repo(session=session, project_id=project.id)
await create_fleet(
session=session,
project=project,
spec=get_fleet_spec(conf=get_fleet_configuration(name="test-fleet")),
)

run_spec = get_run_spec(
repo_id=repo.name, configuration=parse_run_configuration(configuration)
)
body = {"run_spec": run_spec.dict()}

backend_mock_aws = Mock()
backend_mock_aws.TYPE = BackendType.AWS
backend_mock_aws.compute.return_value.get_offers.return_value = [
get_instance_offer_with_availability(backend=BackendType.AWS, price=1.0),
get_instance_offer_with_availability(backend=BackendType.AWS, price=4.0),
]
backend_mock_vastai = Mock()
backend_mock_vastai.TYPE = BackendType.VASTAI
backend_mock_vastai.compute.return_value.get_offers.return_value = [
# not ordered by price - custom order should be preserved
get_instance_offer_with_availability(backend=BackendType.VASTAI, price=3.0),
get_instance_offer_with_availability(backend=BackendType.VASTAI, price=2.0),
]

with patch("dstack._internal.server.services.backends.get_project_backends") as m:
m.return_value = [backend_mock_aws, backend_mock_vastai]
response = await client.post(
f"/api/project/{project.name}/runs/get_plan",
headers=get_auth_headers(user.token),
json=body,
)

assert response.status_code == 200, response.json()
offers = [(o["backend"], o["price"]) for o in response.json()["job_plans"][0]["offers"]]
expected_offers = [
(BackendType.AWS.value, 1.0),
(BackendType.VASTAI.value, 3.0),
(BackendType.VASTAI.value, 2.0),
(BackendType.AWS.value, 4.0),
]
assert offers == expected_offers

@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
async def test_offer_cli_preserves_backend_specific_offer_order_across_fleets(
self, test_db, session: AsyncSession, client: AsyncClient
) -> None:
user = await create_user(session=session, global_role=GlobalRole.USER)
project = await create_project(session=session, owner=user)
await add_project_member(
session=session,
project=project,
user=user,
project_role=ProjectRole.USER,
)
repo = await create_repo(session=session, project_id=project.id)
await create_fleet(
session=session,
project=project,
spec=get_fleet_spec(
conf=get_fleet_configuration(name="fleet-aws", backends=[BackendType.AWS])
),
)
await create_fleet(
session=session,
project=project,
spec=get_fleet_spec(
conf=get_fleet_configuration(name="fleet-vastai", backends=[BackendType.VASTAI])
),
)

run_spec = get_run_spec(
repo_id=repo.name,
configuration=TaskConfiguration(
commands=[":"],
image="scratch",
fleets=["fleet-aws", "fleet-vastai"],
),
)
body = {"run_spec": run_spec.dict()}

backend_mock_aws = Mock()
backend_mock_aws.TYPE = BackendType.AWS
backend_mock_aws.compute.return_value.get_offers.return_value = [
get_instance_offer_with_availability(backend=BackendType.AWS, price=1.0),
get_instance_offer_with_availability(backend=BackendType.AWS, price=4.0),
]
backend_mock_vastai = Mock()
backend_mock_vastai.TYPE = BackendType.VASTAI
backend_mock_vastai.compute.return_value.get_offers.return_value = [
# not ordered by price - custom order should be preserved
get_instance_offer_with_availability(backend=BackendType.VASTAI, price=3.0),
get_instance_offer_with_availability(backend=BackendType.VASTAI, price=2.0),
]

with patch("dstack._internal.server.services.backends.get_project_backends") as m:
m.return_value = [backend_mock_aws, backend_mock_vastai]
response = await client.post(
f"/api/project/{project.name}/runs/get_plan",
headers=get_auth_headers(user.token),
json=body,
)

assert response.status_code == 200, response.json()
offers = [(o["backend"], o["price"]) for o in response.json()["job_plans"][0]["offers"]]
expected_offers = [
(BackendType.AWS.value, 1.0),
(BackendType.VASTAI.value, 3.0),
(BackendType.VASTAI.value, 2.0),
(BackendType.AWS.value, 4.0),
]
assert offers == expected_offers

@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
async def test_offer_cli_returns_offers_from_all_specified_fleets(
Expand Down
Loading