Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
write-manifest.
- Perform case insensitive matching of the configured Snowflake connection authenticator.
- New `login` and `logout` subcommands for authenticating to Connect via OAuth.
- New `environment` subcommand for managing execution environments on Posit Connect.

## [1.29.0] - 2026-04-29

Expand Down
3 changes: 3 additions & 0 deletions docs/commands/environment.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
::: mkdocs-click
:module: rsconnect.main
:command: environment
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ nav:
- content: commands/content.md
- deploy: commands/deploy.md
- details: commands/details.md
- environment: commands/environment.md
- info: commands/info.md
- list: commands/list.md
- login: commands/login.md
Expand Down
123 changes: 123 additions & 0 deletions rsconnect/actions_environment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""
Public API for managing execution environments on Posit Connect.
"""

from __future__ import annotations

from typing import Optional, Union

from .api import RSConnectClient, RSConnectServer, SPCSConnectServer
from .models import (
EnvironmentCreateInput,
EnvironmentPermissionInput,
EnvironmentPermissionV1,
EnvironmentUpdateInput,
EnvironmentV1,
)


def list_environments(
connect_server: Union[RSConnectServer, SPCSConnectServer],
) -> list[EnvironmentV1]:
with RSConnectClient(connect_server) as client:
return client.environment_list()


def get_environment(
connect_server: Union[RSConnectServer, SPCSConnectServer],
guid: str,
) -> EnvironmentV1:
with RSConnectClient(connect_server) as client:
return client.environment_get(guid)


def create_environment(
connect_server: Union[RSConnectServer, SPCSConnectServer],
image: str,
title: Optional[str] = None,
description: Optional[str] = None,
matching: Optional[str] = None,
supervisor: Optional[str] = None,
user_guids: Optional[list[str]] = None,
group_guids: Optional[list[str]] = None,
) -> EnvironmentV1:
body: EnvironmentCreateInput = {
"cluster_name": "Kubernetes",
"name": image,
}
if title is not None:
body["title"] = title
if description is not None:
body["description"] = description
if matching is not None:
body["matching"] = matching
if supervisor is not None:
body["supervisor"] = supervisor

with RSConnectClient(connect_server) as client:
result = client.environment_create(body)
if user_guids is not None or group_guids is not None:
_sync_permissions(client, result["guid"], user_guids, group_guids)
return client.environment_get(result["guid"])


def update_environment(
connect_server: Union[RSConnectServer, SPCSConnectServer],
guid: str,
title: Optional[str] = None,
description: Optional[str] = None,
matching: Optional[str] = None,
supervisor: Optional[str] = None,
user_guids: Optional[list[str]] = None,
group_guids: Optional[list[str]] = None,
) -> EnvironmentV1:
with RSConnectClient(connect_server) as client:
existing = client.environment_get(guid)

body: EnvironmentUpdateInput = {
"title": title if title is not None else existing["title"],
"description": description if description is not None else existing["description"],
"matching": matching if matching is not None else existing["matching"],
"supervisor": supervisor if supervisor is not None else existing["supervisor"],
"python": existing["python"],
"quarto": existing["quarto"],
"r": existing["r"],
"tensorflow": existing["tensorflow"],
"volume_mounts": existing["volume_mounts"],
}

result = client.environment_update(guid, body)

if user_guids is not None or group_guids is not None:
_sync_permissions(client, guid, user_guids, group_guids)
return client.environment_get(guid)

return result


def delete_environment(
connect_server: Union[RSConnectServer, SPCSConnectServer],
guid: str,
) -> None:
with RSConnectClient(connect_server) as client:
client.environment_delete(guid)


def _sync_permissions(
client: RSConnectClient,
env_guid: str,
user_guids: Optional[list[str]],
group_guids: Optional[list[str]],
) -> list[EnvironmentPermissionV1]:
existing = client.environment_permission_list(env_guid)
for perm in existing:
client.environment_permission_delete(env_guid, perm["guid"])

results: list[EnvironmentPermissionV1] = []
for g in user_guids or []:
body: EnvironmentPermissionInput = {"user_guid": g}
results.append(client.environment_permission_add(env_guid, body))
for g in group_guids or []:
body = {"group_guid": g}
results.append(client.environment_permission_add(env_guid, body))
return results
52 changes: 52 additions & 0 deletions rsconnect/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@
ContentItemV1,
DeleteInputDTO,
DeleteOutputDTO,
EnvironmentCreateInput,
EnvironmentPermissionInput,
EnvironmentPermissionV1,
EnvironmentUpdateInput,
EnvironmentV1,
ListEntryOutputDTO,
PyInfo,
ServerSettings,
Expand Down Expand Up @@ -730,6 +735,53 @@ def system_caches_runtime_delete(self, target: DeleteInputDTO) -> DeleteOutputDT
response = self._server.handle_bad_response(response)
return response

def environment_list(self) -> list[EnvironmentV1]:
response = cast(Union[List[EnvironmentV1], HTTPResponse], self.get("v1/environments"))
response = self._server.handle_bad_response(response)
return response

def environment_get(self, guid: str) -> EnvironmentV1:
response = cast(Union[EnvironmentV1, HTTPResponse], self.get(f"v1/environments/{guid}"))
response = self._server.handle_bad_response(response)
return response

def environment_create(self, body: EnvironmentCreateInput) -> EnvironmentV1:
response = cast(Union[EnvironmentV1, HTTPResponse], self.post("v1/environments", body=body))
response = self._server.handle_bad_response(response)
return response

def environment_update(self, guid: str, body: EnvironmentUpdateInput) -> EnvironmentV1:
response = cast(Union[EnvironmentV1, HTTPResponse], self.put(f"v1/environments/{guid}", body=body))
response = self._server.handle_bad_response(response)
return response

def environment_delete(self, guid: str) -> None:
response = cast(HTTPResponse, self.delete(f"v1/environments/{guid}", decode_response=False))
self._server.handle_bad_response(response, is_httpresponse=True)

def environment_permission_list(self, env_guid: str) -> list[EnvironmentPermissionV1]:
response = cast(
Union[List[EnvironmentPermissionV1], HTTPResponse],
self.get(f"v1/environments/{env_guid}/permissions"),
)
response = self._server.handle_bad_response(response)
return response

def environment_permission_add(self, env_guid: str, body: EnvironmentPermissionInput) -> EnvironmentPermissionV1:
response = cast(
Union[EnvironmentPermissionV1, HTTPResponse],
self.post(f"v1/environments/{env_guid}/permissions", body=body),
)
response = self._server.handle_bad_response(response)
return response

def environment_permission_delete(self, env_guid: str, permission_guid: str) -> None:
response = cast(
HTTPResponse,
self.delete(f"v1/environments/{env_guid}/permissions/{permission_guid}", decode_response=False),
)
self._server.handle_bad_response(response, is_httpresponse=True)

def task_get(
self,
task_id: str,
Expand Down
Loading
Loading