Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 157 additions & 0 deletions cmd/erasure-object-conditional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,160 @@ func TestPutObjectConditionalWithReadQuorumFailure(t *testing.T) {
}
})
}

// TestDeleteObjectConditional tests that conditional DeleteObject
// operations (with if-match via CheckPrecondFn) behave correctly
// at the erasure storage layer.
func TestDeleteObjectConditional(t *testing.T) {
ctx := context.Background()

obj, fsDirs, err := prepareErasure16(ctx)
if err != nil {
t.Fatal(err)
}
defer obj.Shutdown(context.Background())
defer removeRoots(fsDirs)

bucket := "test-bucket"
object := "test-object"

err = obj.MakeBucket(ctx, bucket, MakeBucketOptions{})
if err != nil {
t.Fatal(err)
}

// Put an initial object so it exists
_, err = obj.PutObject(ctx, bucket, object,
mustGetPutObjReader(t, bytes.NewReader([]byte("test-value")),
int64(len("test-value")), "", ""), ObjectOptions{})
if err != nil {
t.Fatal(err)
}

// Get object info to capture the ETag
objInfo, err := obj.GetObjectInfo(ctx, bucket, object, ObjectOptions{})
if err != nil {
t.Fatal(err)
}
existingETag := objInfo.ETag

t.Run("if-match with wrong ETag should fail", func(t *testing.T) {
opts := ObjectOptions{
CheckPrecondFn: func(oi ObjectInfo) bool {
return oi.ETag != "wrong-etag"
},
}

_, err := obj.DeleteObject(ctx, bucket, object, opts)
if _, ok := err.(PreConditionFailed); !ok {
t.Errorf("Expected PreConditionFailed error, got: %v", err)
}

// Verify object still exists after failed conditional delete
_, err = obj.GetObjectInfo(ctx, bucket, object, ObjectOptions{})
if err != nil {
t.Errorf("Object should still exist after failed conditional delete, got: %v", err)
}
})

t.Run("if-match with correct ETag should succeed", func(t *testing.T) {
opts := ObjectOptions{
CheckPrecondFn: func(oi ObjectInfo) bool {
return oi.ETag != existingETag
},
}

_, err := obj.DeleteObject(ctx, bucket, object, opts)
if err != nil {
t.Errorf("Expected successful delete with matching ETag, got: %v", err)
}
})

t.Run("if-match on non-existent object should return error", func(t *testing.T) {
opts := ObjectOptions{
HasIfMatch: true,
CheckPrecondFn: func(oi ObjectInfo) bool {
return oi.ETag != "some-etag"
},
}

_, err := obj.DeleteObject(ctx, bucket, "non-existent-object", opts)
if err == nil {
t.Error("Expected error for If-Match on non-existent object, got nil")
}
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
// This is the expected behavior — the object doesn't exist,
// so the If-Match precondition cannot be satisfied.
} else {
t.Errorf("Expected ObjectNotFound/VersionNotFound error, got: %v", err)
}
})
}

// TestDeleteObjectConditionalWithReadQuorumFailure tests that conditional
// DeleteObject operations (with if-match) do NOT proceed
// when read quorum cannot be reached, because we cannot verify the ETag.
func TestDeleteObjectConditionalWithReadQuorumFailure(t *testing.T) {
ctx := context.Background()

obj, fsDirs, err := prepareErasure16(ctx)
if err != nil {
t.Fatal(err)
}
defer obj.Shutdown(context.Background())
defer removeRoots(fsDirs)

z := obj.(*erasureServerPools)
xl := z.serverPools[0].sets[0]

bucket := "test-bucket"
object := "test-object"

err = obj.MakeBucket(ctx, bucket, MakeBucketOptions{})
if err != nil {
t.Fatal(err)
}

// Put an initial object so it exists
_, err = obj.PutObject(ctx, bucket, object,
mustGetPutObjReader(t, bytes.NewReader([]byte("test-value")),
int64(len("test-value")), "", ""), ObjectOptions{})
if err != nil {
t.Fatal(err)
}

objInfo, err := obj.GetObjectInfo(ctx, bucket, object, ObjectOptions{})
if err != nil {
t.Fatal(err)
}
existingETag := objInfo.ETag

// Simulate read quorum failure by taking enough disks offline.
// With 16 disks (EC 8+8), read quorum is 9. Taking 8 disks offline leaves only 8,
// which is below read quorum but still has write quorum — triggering the tryDel path.
erasureDisks := xl.getDisks()
z.serverPools[0].erasureDisksMu.Lock()
xl.getDisks = func() []StorageAPI {
for i := range erasureDisks[:8] {
erasureDisks[i] = nil
}
return erasureDisks
}
z.serverPools[0].erasureDisksMu.Unlock()

t.Run("if-match should fail when read quorum lost", func(t *testing.T) {
// Even with the correct ETag, we must NOT delete because we can't
// verify the current object state due to read quorum failure.
opts := ObjectOptions{
HasIfMatch: true,
CheckPrecondFn: func(oi ObjectInfo) bool {
return oi.ETag != existingETag
},
}

_, err := obj.DeleteObject(ctx, bucket, object, opts)
if err == nil {
t.Error("Expected error when if-match is used with quorum failure, got nil (object may have been deleted without ETag verification)")
}
})
}
17 changes: 17 additions & 0 deletions cmd/erasure-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -1993,6 +1993,23 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
}
}

if opts.CheckPrecondFn != nil {
if gerr != nil {
if isErrObjectNotFound(gerr) || isErrVersionNotFound(gerr) {
// If-Match on a missing object cannot be satisfied.
if opts.HasIfMatch {
return objInfo, gerr
}
} else {
// For any other error (e.g. quorum errors), do not proceed with
// conditional delete without a verified ObjectInfo/ETag.
return objInfo, gerr
}
} else if opts.CheckPrecondFn(goi) {
return objInfo, PreConditionFailed{}
}
}

if opts.EvalMetadataFn != nil {
dsc, err := opts.EvalMetadataFn(&goi, gerr)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/object-api-interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type ObjectOptions struct {
MaxParts int // used in GetObjectAttributes. Signals how many parts we should return
PartNumberMarker int // used in GetObjectAttributes. Signals the part number after which results should be returned
PartNumber int // only useful in case of GetObject/HeadObject
CheckPrecondFn CheckPreconditionFn // only set during GetObject/HeadObject/CopyObjectPart preconditional valuation
CheckPrecondFn CheckPreconditionFn // optional precondition check for conditional requests (If-Match, If-None-Match, etc.)
EvalMetadataFn EvalMetadataFn // only set for retention settings, meant to be used only when updating metadata in-place.
DeleteReplication ReplicationState // Represents internal replication state needed for Delete replication
Transition TransitionOptions
Expand Down
24 changes: 24 additions & 0 deletions cmd/object-handlers-common.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,30 @@ func writeHeadersPrecondition(w http.ResponseWriter, objInfo ObjectInfo) {
}
}

// Validates the preconditions for DELETE. Returns true if DELETE operation should not proceed.
// Preconditions supported are:
//
// If-Match
func checkPreconditionsDELETE(ctx context.Context, w http.ResponseWriter, r *http.Request, objInfo ObjectInfo, opts ObjectOptions) bool {
// Return false for methods other than DELETE.
if r.Method != http.MethodDelete {
return false
}

// If-Match : Delete the object only if its entity tag (ETag) is the same as the one specified;
// otherwise return a 412 (precondition failed).
ifMatchETagHeader := r.Header.Get(xhttp.IfMatch)
if ifMatchETagHeader != "" {
if !isETagEqual(objInfo.ETag, ifMatchETagHeader) {
writeHeadersPrecondition(w, objInfo)
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrPreconditionFailed), r.URL)
return true
}
}

return false
}

// Validates the preconditions. Returns true if GET/HEAD operation should not proceed.
// Preconditions supported are:
//
Expand Down
77 changes: 77 additions & 0 deletions cmd/object-handlers-common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,80 @@ func TestCheckPreconditions(t *testing.T) {
})
}
}

// Tests - checkPreconditionsDELETE()
func TestCheckPreconditionsDELETE(t *testing.T) {
objInfo := ObjectInfo{ETag: "abc123"}

testCases := []struct {
name string
method string
ifMatch string
expectedFlag bool
expectedCode int
}{
// Non-DELETE method should always return false
{
name: "Non-DELETE method ignored",
method: http.MethodGet,
ifMatch: "abc123",
expectedFlag: false,
expectedCode: 200,
},
// If-Match with matching ETag — precondition passes, delete proceeds
{
name: "If-Match matching ETag",
method: http.MethodDelete,
ifMatch: "abc123",
expectedFlag: false,
expectedCode: 200,
},
// If-Match with non-matching ETag — precondition fails, 412
{
name: "If-Match non-matching ETag",
method: http.MethodDelete,
ifMatch: "wrong-etag",
expectedFlag: true,
expectedCode: 412,
},
// If-Match with wildcard — always matches
{
name: "If-Match wildcard",
method: http.MethodDelete,
ifMatch: "*",
expectedFlag: false,
expectedCode: 200,
},
// If-Match with quoted ETag — should still match
{
name: "If-Match quoted ETag",
method: http.MethodDelete,
ifMatch: "\"abc123\"",
expectedFlag: false,
expectedCode: 200,
},
// No conditional headers — no precondition check
{
name: "No conditional headers",
method: http.MethodDelete,
expectedFlag: false,
expectedCode: 200,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
recorder := httptest.NewRecorder()
request := httptest.NewRequest(tc.method, "/bucket/a", bytes.NewReader([]byte{}))
if tc.ifMatch != "" {
request.Header.Set(xhttp.IfMatch, tc.ifMatch)
}
actualFlag := checkPreconditionsDELETE(t.Context(), recorder, request, objInfo, ObjectOptions{})
if tc.expectedFlag != actualFlag {
t.Errorf("test: %s, got flag: %v, want: %v", tc.name, actualFlag, tc.expectedFlag)
}
if tc.expectedCode != recorder.Code {
t.Errorf("test: %s, got code: %d, want: %d", tc.name, recorder.Code, tc.expectedCode)
}
})
}
}
23 changes: 23 additions & 0 deletions cmd/object-handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2611,6 +2611,19 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
return
}

ifMatch := r.Header.Get(xhttp.IfMatch)

if ifMatch != "" {
opts.HasIfMatch = true
opts.CheckPrecondFn = func(oi ObjectInfo) bool {
if _, err := DecryptObjectInfo(&oi, r); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return true
}
return checkPreconditionsDELETE(ctx, w, r, oi, opts)
}
}

rcfg, _ := globalBucketObjectLockSys.Get(bucket)
if rcfg.LockEnabled && opts.DeletePrefix {
apiErr := toAPIError(ctx, errInvalidArgument)
Expand Down Expand Up @@ -2675,7 +2688,17 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if _, ok := err.(PreConditionFailed); ok {
// Request was aborted by CheckPrecondFn (e.g. precondition, auth/decrypt).
// Response has already been written by the closure; do not write another.
return
}
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
if opts.HasIfMatch {
// If-Match conditional delete on a non-existent object should fail.
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// Send an event when the object is not found
objInfo.Name = object
objInfo.VersionID = opts.VersionID
Expand Down
Loading