Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 17 additions & 64 deletions src/dstack/_internal/server/services/runs/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,77 +104,36 @@ async def get_job_plans(

job_plans = []

if run_spec.configuration.type == "service":
volumes = await get_job_configured_volumes(
session=session,
project=project,
run_spec=run_spec,
job_num=0,
)
volumes = await get_job_configured_volumes(
session=session,
project=project,
run_spec=run_spec,
job_num=0,
)

if _should_select_best_fleet_candidate(run_spec):
candidate_fleet_models = await _select_candidate_fleet_models(
session=session,
project=project,
run_model=None,
run_spec=run_spec,
)
for replica_group in run_spec.configuration.replica_groups:
jobs = await get_jobs_from_run_spec(
run_spec=run_spec,
secrets=secrets,
replica_num=0,
replica_group_name=replica_group.name,
)
fleet_model, instance_offers, backend_offers = await find_optimal_fleet_with_offers(
project=project,
fleet_models=candidate_fleet_models,
run_model=None,
run_spec=run_spec,
job=jobs[0],
master_job_provisioning_data=None,
volumes=volumes,
exclude_not_available=False,
)
if not _should_select_best_fleet_candidate(run_spec):
if profile.fleets is None:
instance_offers, backend_offers = await _get_non_fleet_offers(
session=session,
project=project,
profile=profile,
run_spec=run_spec,
job=jobs[0],
volumes=volumes,
)
else:
instance_offers, backend_offers = await _get_offers_in_run_candidate_fleets(
session=session,
project=project,
run_spec=run_spec,
job=jobs[0],
volumes=volumes,
)
else:
candidate_fleet_models = None

for job in jobs:
job_plan = _get_job_plan(
instance_offers=instance_offers,
backend_offers=backend_offers,
profile=profile,
job=job,
max_offers=max_offers,
)
job_plans.append(job_plan)
if run_spec.configuration.type == "service":
replica_group_names = [g.name for g in run_spec.configuration.replica_groups]
else:
replica_group_names = [None]

for replica_group_name in replica_group_names:
jobs = await get_jobs_from_run_spec(
run_spec=run_spec,
secrets=secrets,
replica_num=0,
replica_group_name=replica_group_name,
)
volumes = await get_job_configured_volumes(
session=session,
project=project,
run_spec=run_spec,
job_num=0,
)
if not _should_select_best_fleet_candidate(run_spec):
if candidate_fleet_models is None: # `dstack offer` path
if profile.fleets is None:
instance_offers, backend_offers = await _get_non_fleet_offers(
session=session,
Expand All @@ -193,12 +152,6 @@ async def get_job_plans(
volumes=volumes,
)
else:
candidate_fleet_models = await _select_candidate_fleet_models(
session=session,
project=project,
run_model=None,
run_spec=run_spec,
)
fleet_model, instance_offers, backend_offers = await find_optimal_fleet_with_offers(
project=project,
fleet_models=candidate_fleet_models,
Expand Down
141 changes: 139 additions & 2 deletions src/tests/_internal/server/routers/test_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,28 @@
from dstack._internal.core.models.configurations import (
AnyRunConfiguration,
DevEnvironmentConfiguration,
ReplicaGroup,
ScalingSpec,
ServiceConfiguration,
TaskConfiguration,
)
from dstack._internal.core.models.fleets import FleetNodesSpec
from dstack._internal.core.models.fleets import FleetNodesSpec, InstanceGroupPlacement
from dstack._internal.core.models.gateways import GatewayStatus
from dstack._internal.core.models.instances import (
Gpu,
InstanceAvailability,
InstanceOfferWithAvailability,
InstanceStatus,
InstanceType,
Resources,
)
from dstack._internal.core.models.profiles import Profile, Schedule
from dstack._internal.core.models.resources import Range
from dstack._internal.core.models.resources import GPUSpec, Range, ResourcesSpec
from dstack._internal.core.models.runs import (
ApplyRunPlanInput,
JobSpec,
JobStatus,
Requirements,
Run,
RunSpec,
RunStatus,
Expand Down Expand Up @@ -1531,6 +1534,140 @@ async def test_returns_run_plan_privileged_true(
assert response.status_code == 200, response.json()
assert response.json() == run_plan_dict

@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
async def test_task_with_two_nodes_returns_two_job_plans(
self, test_db, session: AsyncSession, client: AsyncClient
):
user = await create_user(session=session, global_role=GlobalRole.USER)
project = await create_project(session=session, owner=user)
await add_project_member(
session=session, project=project, user=user, project_role=ProjectRole.USER
)
fleet_spec = get_fleet_spec()
fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=None)
fleet_spec.configuration.placement = InstanceGroupPlacement.CLUSTER
await create_fleet(session=session, project=project, spec=fleet_spec)
repo = await create_repo(session=session, project_id=project.id)
offer = InstanceOfferWithAvailability(
backend=BackendType.AWS,
instance=InstanceType(
name="instance",
resources=Resources(cpus=4, memory_mib=16384, spot=False, gpus=[]),
),
region="us",
price=1.0,
availability=InstanceAvailability.AVAILABLE,
)
run_spec = get_run_spec(
repo_id=repo.name,
configuration=TaskConfiguration(commands=["echo hi"], nodes=2),
)
body = {"run_spec": json.loads(run_spec.json())}
with patch("dstack._internal.server.services.backends.get_project_backends") as m:
backend_mock = Mock()
backend_mock.TYPE = BackendType.AWS
backend_mock.compute.return_value.get_offers.return_value = [offer]
m.return_value = [backend_mock]
response = await client.post(
f"/api/project/{project.name}/runs/get_plan",
headers=get_auth_headers(user.token),
json=body,
)
assert response.status_code == 200, response.json()
job_plans = response.json()["job_plans"]
assert len(job_plans) == 2
assert job_plans[0]["job_spec"]["job_num"] == 0
assert job_plans[1]["job_spec"]["job_num"] == 1
assert len(job_plans[0]["offers"]) == 1
assert job_plans[0]["offers"] == job_plans[1]["offers"]

@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
async def test_service_with_two_replica_groups_returns_two_job_plans(
self, test_db, session: AsyncSession, client: AsyncClient
):
user = await create_user(session=session, global_role=GlobalRole.USER)
project = await create_project(session=session, owner=user)
await add_project_member(
session=session, project=project, user=user, project_role=ProjectRole.USER
)
fleet_spec = get_fleet_spec()
fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=None)
await create_fleet(session=session, project=project, spec=fleet_spec)
repo = await create_repo(session=session, project_id=project.id)
gpu_offer = InstanceOfferWithAvailability(
backend=BackendType.AWS,
instance=InstanceType(
name="gpu-instance",
resources=Resources(
cpus=8,
memory_mib=32768,
spot=False,
gpus=[Gpu(name="A100", memory_mib=40960)],
),
),
region="us",
price=5.0,
availability=InstanceAvailability.AVAILABLE,
)
cpu_offer = InstanceOfferWithAvailability(
backend=BackendType.AWS,
instance=InstanceType(
name="cpu-instance",
resources=Resources(cpus=4, memory_mib=16384, spot=False, gpus=[]),
),
region="us",
price=1.0,
availability=InstanceAvailability.AVAILABLE,
)
run_spec = get_run_spec(
repo_id=repo.name,
configuration=ServiceConfiguration(
port=8080,
gateway=False,
replicas=[
ReplicaGroup(
name="gpu-group",
count=Range[int](min=2, max=2),
resources=ResourcesSpec(gpu=GPUSpec()),
commands=["python server.py"],
),
ReplicaGroup(
name="cpu-group",
count=Range[int](min=1, max=1),
resources=ResourcesSpec(gpu=None),
commands=["python router.py"],
),
],
),
)
body = {"run_spec": json.loads(run_spec.json())}

def offers_by_requirements(requirements: Requirements):
if (
requirements.resources.gpu is not None
and requirements.resources.gpu.count.min is not None
and requirements.resources.gpu.count.min > 0
):
return [gpu_offer]
return [cpu_offer]

with patch("dstack._internal.server.services.backends.get_project_backends") as m:
backend_mock = Mock()
backend_mock.TYPE = BackendType.AWS
backend_mock.compute.return_value.get_offers.side_effect = offers_by_requirements
m.return_value = [backend_mock]
response = await client.post(
f"/api/project/{project.name}/runs/get_plan",
headers=get_auth_headers(user.token),
json=body,
)
assert response.status_code == 200, response.json()
gpu_job_plan, cpu_job_plan = response.json()["job_plans"]
assert gpu_job_plan["offers"][0]["instance"]["resources"]["gpus"] != []
assert cpu_job_plan["offers"][0]["instance"]["resources"]["gpus"] == []

@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
async def test_returns_run_plan_docker_true(
Expand Down
Loading