Skip to content
Merged
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ require (
github.com/Azure/karpenter-provider-azure v1.5.1
github.com/crossplane/crossplane-runtime/v2 v2.1.0
github.com/evanphx/json-patch/v5 v5.9.11
github.com/gofrs/uuid v4.4.0+incompatible
github.com/go-logr/logr v1.4.3
github.com/gofrs/uuid v4.4.0+incompatible
github.com/google/go-cmp v0.7.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3
github.com/onsi/ginkgo/v2 v2.23.4
Expand Down
13 changes: 9 additions & 4 deletions pkg/clients/azure/compute/vmsizerecommenderclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ import (
"os"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/gofrs/uuid"
"google.golang.org/protobuf/encoding/protojson"
"k8s.io/klog/v2"

computev1 "go.goms.io/fleet/apis/protos/azure/compute/v1"
"go.goms.io/fleet/pkg/clients/httputil"
"go.goms.io/fleet/pkg/utils/controller"
fleetErrors "go.goms.io/fleet/pkg/utils/errors"
)

const (
Expand Down Expand Up @@ -137,17 +137,22 @@ func (c *AttributeBasedVMSizeRecommenderClient) GenerateAttributeBasedRecommenda
return nil, fmt.Errorf("failed to read response body: %w", err)
}

// Check status code
// Check status code - categorize based on transient vs non-transient errors
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("request failed with status %d: %w", resp.StatusCode, runtime.NewResponseError(resp))
desc := fmt.Sprintf("request failed with status %d: %s %s", resp.StatusCode, httpReq.Method, url)
if httputil.IsTransientStatusCode(resp.StatusCode) {
return nil, fleetErrors.NewTransientError(nil, desc)
}
// Non-transient errors (4xx client errors) should not be retried.
return nil, fleetErrors.NewUnexpectedError(nil, desc)
}

// Unmarshal response using protojson for proper proto3 support
response = &computev1.GenerateAttributeBasedRecommendationsResponse{}
unmarshaler := protojson.UnmarshalOptions{
DiscardUnknown: true,
}
if err := unmarshaler.Unmarshal(respBody, response); err != nil {
if err = unmarshaler.Unmarshal(respBody, response); err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %w", err)
}

Expand Down
82 changes: 65 additions & 17 deletions pkg/clients/azure/compute/vmsizerecommenderclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"google.golang.org/protobuf/proto"

computev1 "go.goms.io/fleet/apis/protos/azure/compute/v1"
fleetErrors "go.goms.io/fleet/pkg/utils/errors"
"go.goms.io/fleet/test/utils/azure/compute"
)

Expand Down Expand Up @@ -81,13 +82,14 @@ func TestNewAttributeBasedVMSizeRecommenderClient(t *testing.T) {

func TestClient_GenerateAttributeBasedRecommendations(t *testing.T) {
tests := []struct {
name string
request *computev1.GenerateAttributeBasedRecommendationsRequest
mockStatusCode int
mockResponse string
wantResponse *computev1.GenerateAttributeBasedRecommendationsResponse
wantErr bool
wantErrMsg string
name string
request *computev1.GenerateAttributeBasedRecommendationsRequest
mockStatusCode int
mockResponse string
wantResponse *computev1.GenerateAttributeBasedRecommendationsResponse
wantErr bool
wantErrMsg string
wantIsTransient bool // true if error should be a transient HTTPError
}{
{
name: "successful request with regular priority profile",
Expand Down Expand Up @@ -169,32 +171,64 @@ func TestClient_GenerateAttributeBasedRecommendations(t *testing.T) {
wantErrMsg: "either regular priority profile or spot priority profile must be provided",
},
{
name: "HTTP 400 error",
name: "HTTP 400 error is not transient",
request: &computev1.GenerateAttributeBasedRecommendationsRequest{
SubscriptionId: "sub-123",
Location: "eastus",
RegularPriorityProfile: &computev1.RegularPriorityProfile{
TargetCapacity: 5,
},
},
mockStatusCode: http.StatusBadRequest,
mockResponse: `{"error":"invalid request"}`,
wantErr: true,
wantErrMsg: "request failed with status 400",
mockStatusCode: http.StatusBadRequest,
mockResponse: `{"error":"invalid request"}`,
wantErr: true,
wantErrMsg: "request failed with status 400: POST",
wantIsTransient: false, // 400 is NOT transient
},
{
name: "HTTP 500 error",
name: "HTTP 500 error is transient",
request: &computev1.GenerateAttributeBasedRecommendationsRequest{
SubscriptionId: "sub-123",
Location: "eastus",
RegularPriorityProfile: &computev1.RegularPriorityProfile{
TargetCapacity: 5,
},
},
mockStatusCode: http.StatusInternalServerError,
mockResponse: `{"error":"internal server error"}`,
wantErr: true,
wantErrMsg: "request failed with status 500",
mockStatusCode: http.StatusInternalServerError,
mockResponse: `{"error":"internal server error"}`,
wantErr: true,
wantErrMsg: "request failed with status 500: POST",
wantIsTransient: true, // 500 IS transient
},
{
name: "HTTP 503 error is transient",
request: &computev1.GenerateAttributeBasedRecommendationsRequest{
SubscriptionId: "sub-123",
Location: "eastus",
RegularPriorityProfile: &computev1.RegularPriorityProfile{
TargetCapacity: 5,
},
},
mockStatusCode: http.StatusServiceUnavailable,
mockResponse: `{"error":"service unavailable"}`,
wantErr: true,
wantErrMsg: "request failed with status 503: POST",
wantIsTransient: true, // 503 IS transient
},
{
name: "HTTP 429 error is transient",
request: &computev1.GenerateAttributeBasedRecommendationsRequest{
SubscriptionId: "sub-123",
Location: "eastus",
RegularPriorityProfile: &computev1.RegularPriorityProfile{
TargetCapacity: 5,
},
},
mockStatusCode: http.StatusTooManyRequests,
mockResponse: `{"error":"too many requests"}`,
wantErr: true,
wantErrMsg: "request failed with status 429: POST",
wantIsTransient: true, // 429 IS transient
},
{
name: "invalid JSON response",
Expand Down Expand Up @@ -240,6 +274,20 @@ func TestClient_GenerateAttributeBasedRecommendations(t *testing.T) {
return
}

// Check if error has retry policy using fleetErrors.IsRetryable.
if tt.wantErr && tt.wantIsTransient {
isRetryable, hasRetryPolicy := fleetErrors.IsRetryable(err)
if !hasRetryPolicy || !isRetryable {
t.Errorf("GenerateAttributeBasedRecommendations() error = %v, want retryable error", err)
}
}
if tt.wantErr && !tt.wantIsTransient && tt.mockStatusCode >= 400 && tt.mockStatusCode < 500 {
isRetryable, hasRetryPolicy := fleetErrors.IsRetryable(err)
if hasRetryPolicy && isRetryable {
t.Errorf("GenerateAttributeBasedRecommendations() error = %v, should NOT be retryable for 4xx errors", err)
}
}

// Compare response.
if !proto.Equal(tt.wantResponse, got) {
t.Errorf("GenerateAttributeBasedRecommendations() = %+v, want %+v", got, tt.wantResponse)
Expand Down
16 changes: 16 additions & 0 deletions pkg/clients/httputil/httputil.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,19 @@ var (
// DefaultClientForAzure is the default HTTP client to access Azure services.
DefaultClientForAzure = &http.Client{Timeout: HTTPTimeoutAzure}
)

// transientHTTPStatusCodes defines HTTP status codes that indicate transient errors
// which may succeed on retry.
var transientHTTPStatusCodes = map[int]bool{
http.StatusTooManyRequests: true, // 429 - Rate limiting
http.StatusInternalServerError: true, // 500 - Server error
http.StatusBadGateway: true, // 502 - Bad gateway
http.StatusServiceUnavailable: true, // 503 - Service unavailable
http.StatusGatewayTimeout: true, // 504 - Gateway timeout
}

// IsTransientStatusCode returns true if the HTTP status code indicates a transient
// error that may succeed on retry (429, 5xx).
func IsTransientStatusCode(statusCode int) bool {
return transientHTTPStatusCodes[statusCode]
}
37 changes: 37 additions & 0 deletions pkg/clients/httputil/httputil_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
*/

package httputil

import (
"net/http"
"testing"
)

func TestIsTransientStatusCode(t *testing.T) {
tests := []struct {
name string
statusCode int
want bool
}{
{"400 Bad Request", http.StatusBadRequest, false},
{"401 Unauthorized", http.StatusUnauthorized, false},
{"403 Forbidden", http.StatusForbidden, false},
{"404 Not Found", http.StatusNotFound, false},
{"429 Too Many Requests", http.StatusTooManyRequests, true},
{"500 Internal Server Error", http.StatusInternalServerError, true},
{"502 Bad Gateway", http.StatusBadGateway, true},
{"503 Service Unavailable", http.StatusServiceUnavailable, true},
{"504 Gateway Timeout", http.StatusGatewayTimeout, true},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := IsTransientStatusCode(tt.statusCode); got != tt.want {
t.Errorf("IsTransientStatusCode(%d) = %v, want %v", tt.statusCode, got, tt.want)
}
})
}
}
23 changes: 19 additions & 4 deletions pkg/propertychecker/azure/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
placementv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
computev1 "go.goms.io/fleet/apis/protos/azure/compute/v1"
"go.goms.io/fleet/pkg/clients/azure/compute"
fleetErrors "go.goms.io/fleet/pkg/utils/errors"
"go.goms.io/fleet/pkg/utils/labels"
)

Expand Down Expand Up @@ -58,25 +59,35 @@ func NewPropertyChecker(vmSizeRecommenderClient compute.AttributeBasedVMSizeReco
//
// The cluster must have both Azure location and subscription ID labels configured.
// Returns true if the SKU capacity requirement can be met, false otherwise.
//
// Errors returned implement the RetryableError interface to indicate whether the operation
// can be retried. Configuration errors (missing labels, invalid capacity) are non-retryable,
// while Azure API errors preserve the retryability of the underlying HTTP error.
func (s *PropertyChecker) CheckIfMeetSKUCapacityRequirement(
cluster *clusterv1beta1.MemberCluster,
req placementv1beta1.PropertySelectorRequirement,
sku string,
) (bool, error) {
location, err := labels.ExtractLabelFromMemberCluster(cluster, labels.AzureLocationLabel)
if err != nil {
return false, fmt.Errorf("failed to extract Azure location label from cluster %s: %w", cluster.Name, err)
// Missing label is a configuration error; not retryable.
return false, fleetErrors.NewUserError(err,
fmt.Sprintf("failed to extract Azure location label from cluster %s", cluster.Name))
}

subID, err := labels.ExtractLabelFromMemberCluster(cluster, labels.AzureSubscriptionIDLabel)
if err != nil {
return false, fmt.Errorf("failed to extract Azure subscription ID label from cluster %s: %w", cluster.Name, err)
// Missing label is a configuration error; not retryable.
return false, fleetErrors.NewUserError(err,
fmt.Sprintf("failed to extract Azure subscription ID label from cluster %s", cluster.Name))
}

// Extract capacity requirements from the property selector requirement.
capacity, err := extractCapacityRequirements(req)
if err != nil {
return false, fmt.Errorf("failed to extract capacity requirements from property selector requirement: %w", err)
// Invalid capacity specification is a user error; not retryable.
return false, fleetErrors.NewUserError(err,
"failed to extract capacity requirements from property selector requirement")
}

// Request VM size recommendations to validate SKU availability and capacity.
Expand All @@ -101,7 +112,11 @@ func (s *PropertyChecker) CheckIfMeetSKUCapacityRequirement(

respObj, err := s.vmSizeRecommenderClient.GenerateAttributeBasedRecommendations(context.Background(), request)
if err != nil {
return false, fmt.Errorf("failed to generate VM size recommendations from Azure: %w", err)
// Wrap the error with context. The underlying error already has the appropriate
// category set (transient for 429/5xx, API server error for other failures),
// so fleetErrors.IsRetryable() will detect it in the error chain.
return false, fleetErrors.Wraps(err,
fmt.Sprintf("failed to generate VM size recommendations from Azure for SKU %s in cluster %s", sku, cluster.Name))
}

// This check is a defense mechanism; vmSizeRecommenderClient should return a VM size recommendation
Expand Down
26 changes: 26 additions & 0 deletions pkg/scheduler/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -45,6 +46,7 @@ import (
"go.goms.io/fleet/pkg/utils/annotations"
"go.goms.io/fleet/pkg/utils/condition"
"go.goms.io/fleet/pkg/utils/controller"
fleetErrors "go.goms.io/fleet/pkg/utils/errors"
"go.goms.io/fleet/pkg/utils/parallelizer"
)

Expand All @@ -57,6 +59,10 @@ const (
// NotFullyScheduledReason is the reason string of placement condition when the placement policy cannot be fully satisfied.
NotFullyScheduledReason = "SchedulingPolicyUnfulfilled"

// Event reasons for scheduling errors.
// SchedulingErrorReason is used when the scheduler encounters an error during scheduling.
SchedulingErrorReason = "SchedulingError"

fullyScheduledMessage = "found all cluster needed as specified by the scheduling policy, found %d cluster(s)"
notFullyScheduledMessage = "could not find all clusters needed as specified by the scheduling policy, found %d cluster(s) instead"

Expand Down Expand Up @@ -539,6 +545,16 @@ func (f *framework) runAllPluginsForPickAllPlacementType(
passed, filtered, err := f.runFilterPlugins(ctx, state, policy, clusters)
if err != nil {
klog.ErrorS(err, "Failed to run filter plugins", "policySnapshot", policyRef)
// Emit an event to inform the user about the scheduling error.
f.eventRecorder.Event(policy, corev1.EventTypeWarning, SchedulingErrorReason,
fmt.Sprintf("Failed to run filter plugins: %v", err))
// Check if the error has a retry policy configured.
// If the error (or any error in its chain) implements ErrorWithRetryPolicy and indicates
// it's retryable, return it as-is so the scheduler can requeue.
// Otherwise, wrap it as unexpected behavior.
if isRetryable, hasRetryPolicy := fleetErrors.IsRetryable(err); hasRetryPolicy && isRetryable {
return nil, nil, err
}
return nil, nil, controller.NewUnexpectedBehaviorError(err)
}

Expand Down Expand Up @@ -1159,6 +1175,16 @@ func (f *framework) runAllPluginsForPickNPlacementType(
passed, filtered, err := f.runFilterPlugins(ctx, state, policy, clusters)
if err != nil {
klog.ErrorS(err, "Failed to run filter plugins", "policySnapshot", policyRef)
// Emit an event to inform the user about the scheduling error.
f.eventRecorder.Event(policy, corev1.EventTypeWarning, SchedulingErrorReason,
fmt.Sprintf("Failed to run filter plugins: %v", err))
// Check if the error has a retry policy configured.
// If the error (or any error in its chain) implements ErrorWithRetryPolicy and indicates
// it's retryable, return it as-is so the scheduler can requeue.
// Otherwise, wrap it as unexpected behavior.
if isRetryable, hasRetryPolicy := fleetErrors.IsRetryable(err); hasRetryPolicy && isRetryable {
return nil, nil, err
}
return nil, nil, controller.NewUnexpectedBehaviorError(err)
}

Expand Down
11 changes: 7 additions & 4 deletions pkg/scheduler/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

Expand Down Expand Up @@ -1323,8 +1324,9 @@ func TestRunAllPluginsForPickAllPlacementType(t *testing.T) {
profile.WithFilterPlugin(p)
}
f := &framework{
profile: profile,
parallelizer: parallelizer.NewParallelizer(parallelizer.DefaultNumOfWorkers),
profile: profile,
parallelizer: parallelizer.NewParallelizer(parallelizer.DefaultNumOfWorkers),
eventRecorder: record.NewFakeRecorder(10),
}

ctx := context.Background()
Expand Down Expand Up @@ -6253,8 +6255,9 @@ func TestRunAllPluginsForPickNPlacementType(t *testing.T) {
}

f := &framework{
profile: profile,
parallelizer: parallelizer.NewParallelizer(parallelizer.DefaultNumOfWorkers),
profile: profile,
parallelizer: parallelizer.NewParallelizer(parallelizer.DefaultNumOfWorkers),
eventRecorder: record.NewFakeRecorder(10),
}

ctx := context.Background()
Expand Down
Loading
Loading