diff --git a/src/dstack/_internal/server/services/runs/plan.py b/src/dstack/_internal/server/services/runs/plan.py index 1fcf3e7bd..25736d3e2 100644 --- a/src/dstack/_internal/server/services/runs/plan.py +++ b/src/dstack/_internal/server/services/runs/plan.py @@ -104,77 +104,36 @@ async def get_job_plans( job_plans = [] - if run_spec.configuration.type == "service": - volumes = await get_job_configured_volumes( - session=session, - project=project, - run_spec=run_spec, - job_num=0, - ) + volumes = await get_job_configured_volumes( + session=session, + project=project, + run_spec=run_spec, + job_num=0, + ) + + if _should_select_best_fleet_candidate(run_spec): candidate_fleet_models = await _select_candidate_fleet_models( session=session, project=project, run_model=None, run_spec=run_spec, ) - for replica_group in run_spec.configuration.replica_groups: - jobs = await get_jobs_from_run_spec( - run_spec=run_spec, - secrets=secrets, - replica_num=0, - replica_group_name=replica_group.name, - ) - fleet_model, instance_offers, backend_offers = await find_optimal_fleet_with_offers( - project=project, - fleet_models=candidate_fleet_models, - run_model=None, - run_spec=run_spec, - job=jobs[0], - master_job_provisioning_data=None, - volumes=volumes, - exclude_not_available=False, - ) - if not _should_select_best_fleet_candidate(run_spec): - if profile.fleets is None: - instance_offers, backend_offers = await _get_non_fleet_offers( - session=session, - project=project, - profile=profile, - run_spec=run_spec, - job=jobs[0], - volumes=volumes, - ) - else: - instance_offers, backend_offers = await _get_offers_in_run_candidate_fleets( - session=session, - project=project, - run_spec=run_spec, - job=jobs[0], - volumes=volumes, - ) + else: + candidate_fleet_models = None - for job in jobs: - job_plan = _get_job_plan( - instance_offers=instance_offers, - backend_offers=backend_offers, - profile=profile, - job=job, - max_offers=max_offers, - ) - job_plans.append(job_plan) + if run_spec.configuration.type == "service": + replica_group_names = [g.name for g in run_spec.configuration.replica_groups] else: + replica_group_names = [None] + + for replica_group_name in replica_group_names: jobs = await get_jobs_from_run_spec( run_spec=run_spec, secrets=secrets, replica_num=0, + replica_group_name=replica_group_name, ) - volumes = await get_job_configured_volumes( - session=session, - project=project, - run_spec=run_spec, - job_num=0, - ) - if not _should_select_best_fleet_candidate(run_spec): + if candidate_fleet_models is None: # `dstack offer` path if profile.fleets is None: instance_offers, backend_offers = await _get_non_fleet_offers( session=session, @@ -193,12 +152,6 @@ async def get_job_plans( volumes=volumes, ) else: - candidate_fleet_models = await _select_candidate_fleet_models( - session=session, - project=project, - run_model=None, - run_spec=run_spec, - ) fleet_model, instance_offers, backend_offers = await find_optimal_fleet_with_offers( project=project, fleet_models=candidate_fleet_models, diff --git a/src/tests/_internal/server/routers/test_runs.py b/src/tests/_internal/server/routers/test_runs.py index e13e20853..6bdf8b7a2 100644 --- a/src/tests/_internal/server/routers/test_runs.py +++ b/src/tests/_internal/server/routers/test_runs.py @@ -18,13 +18,15 @@ from dstack._internal.core.models.configurations import ( AnyRunConfiguration, DevEnvironmentConfiguration, + ReplicaGroup, ScalingSpec, ServiceConfiguration, TaskConfiguration, ) -from dstack._internal.core.models.fleets import FleetNodesSpec +from dstack._internal.core.models.fleets import FleetNodesSpec, InstanceGroupPlacement from dstack._internal.core.models.gateways import GatewayStatus from dstack._internal.core.models.instances import ( + Gpu, InstanceAvailability, InstanceOfferWithAvailability, InstanceStatus, @@ -32,11 +34,12 @@ Resources, ) from dstack._internal.core.models.profiles import Profile, Schedule -from dstack._internal.core.models.resources import Range +from dstack._internal.core.models.resources import GPUSpec, Range, ResourcesSpec from dstack._internal.core.models.runs import ( ApplyRunPlanInput, JobSpec, JobStatus, + Requirements, Run, RunSpec, RunStatus, @@ -1531,6 +1534,140 @@ async def test_returns_run_plan_privileged_true( assert response.status_code == 200, response.json() assert response.json() == run_plan_dict + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_task_with_two_nodes_returns_two_job_plans( + self, test_db, session: AsyncSession, client: AsyncClient + ): + user = await create_user(session=session, global_role=GlobalRole.USER) + project = await create_project(session=session, owner=user) + await add_project_member( + session=session, project=project, user=user, project_role=ProjectRole.USER + ) + fleet_spec = get_fleet_spec() + fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=None) + fleet_spec.configuration.placement = InstanceGroupPlacement.CLUSTER + await create_fleet(session=session, project=project, spec=fleet_spec) + repo = await create_repo(session=session, project_id=project.id) + offer = InstanceOfferWithAvailability( + backend=BackendType.AWS, + instance=InstanceType( + name="instance", + resources=Resources(cpus=4, memory_mib=16384, spot=False, gpus=[]), + ), + region="us", + price=1.0, + availability=InstanceAvailability.AVAILABLE, + ) + run_spec = get_run_spec( + repo_id=repo.name, + configuration=TaskConfiguration(commands=["echo hi"], nodes=2), + ) + body = {"run_spec": json.loads(run_spec.json())} + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + backend_mock = Mock() + backend_mock.TYPE = BackendType.AWS + backend_mock.compute.return_value.get_offers.return_value = [offer] + m.return_value = [backend_mock] + response = await client.post( + f"/api/project/{project.name}/runs/get_plan", + headers=get_auth_headers(user.token), + json=body, + ) + assert response.status_code == 200, response.json() + job_plans = response.json()["job_plans"] + assert len(job_plans) == 2 + assert job_plans[0]["job_spec"]["job_num"] == 0 + assert job_plans[1]["job_spec"]["job_num"] == 1 + assert len(job_plans[0]["offers"]) == 1 + assert job_plans[0]["offers"] == job_plans[1]["offers"] + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_service_with_two_replica_groups_returns_two_job_plans( + self, test_db, session: AsyncSession, client: AsyncClient + ): + user = await create_user(session=session, global_role=GlobalRole.USER) + project = await create_project(session=session, owner=user) + await add_project_member( + session=session, project=project, user=user, project_role=ProjectRole.USER + ) + fleet_spec = get_fleet_spec() + fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=None) + await create_fleet(session=session, project=project, spec=fleet_spec) + repo = await create_repo(session=session, project_id=project.id) + gpu_offer = InstanceOfferWithAvailability( + backend=BackendType.AWS, + instance=InstanceType( + name="gpu-instance", + resources=Resources( + cpus=8, + memory_mib=32768, + spot=False, + gpus=[Gpu(name="A100", memory_mib=40960)], + ), + ), + region="us", + price=5.0, + availability=InstanceAvailability.AVAILABLE, + ) + cpu_offer = InstanceOfferWithAvailability( + backend=BackendType.AWS, + instance=InstanceType( + name="cpu-instance", + resources=Resources(cpus=4, memory_mib=16384, spot=False, gpus=[]), + ), + region="us", + price=1.0, + availability=InstanceAvailability.AVAILABLE, + ) + run_spec = get_run_spec( + repo_id=repo.name, + configuration=ServiceConfiguration( + port=8080, + gateway=False, + replicas=[ + ReplicaGroup( + name="gpu-group", + count=Range[int](min=2, max=2), + resources=ResourcesSpec(gpu=GPUSpec()), + commands=["python server.py"], + ), + ReplicaGroup( + name="cpu-group", + count=Range[int](min=1, max=1), + resources=ResourcesSpec(gpu=None), + commands=["python router.py"], + ), + ], + ), + ) + body = {"run_spec": json.loads(run_spec.json())} + + def offers_by_requirements(requirements: Requirements): + if ( + requirements.resources.gpu is not None + and requirements.resources.gpu.count.min is not None + and requirements.resources.gpu.count.min > 0 + ): + return [gpu_offer] + return [cpu_offer] + + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + backend_mock = Mock() + backend_mock.TYPE = BackendType.AWS + backend_mock.compute.return_value.get_offers.side_effect = offers_by_requirements + m.return_value = [backend_mock] + response = await client.post( + f"/api/project/{project.name}/runs/get_plan", + headers=get_auth_headers(user.token), + json=body, + ) + assert response.status_code == 200, response.json() + gpu_job_plan, cpu_job_plan = response.json()["job_plans"] + assert gpu_job_plan["offers"][0]["instance"]["resources"]["gpus"] != [] + assert cpu_job_plan["offers"][0]["instance"]["resources"]["gpus"] == [] + @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_returns_run_plan_docker_true(