feat: support emulated SAVEPOINTs#832
Conversation
… using emulated savepoints
There was a problem hiding this comment.
Code Review
This pull request implements support for SQL savepoints (SAVEPOINT, ROLLBACK TO SAVEPOINT, and RELEASE SAVEPOINT) in the Spanner driver by parsing these statements and tracking statement and mutation counts for rollback replay. Feedback focuses on properly emulating standard SQL savepoint behavior by tracking creation order and destroying subsequent savepoints upon rollback or release. Additionally, it is recommended to avoid shadowing the named return value err in recreateAndReplay, add a deferred rollback to prevent leaking transactions on replay failure, and add tests verifying savepoint cleanup.
| // savepoints maps a savepoint name to the number of statements and mutations | ||
| // that were executed before the savepoint was created. | ||
| savepoints map[string]savepoint | ||
| } | ||
|
|
||
| type savepoint struct { | ||
| statementCount int | ||
| mutationCount int | ||
| } |
There was a problem hiding this comment.
To correctly emulate standard SQL savepoint behavior, savepoints created after a rolled back or released savepoint must be destroyed. Without tracking the creation order, rolling back to an earlier savepoint while keeping later savepoints in the map can lead to slice out-of-bounds panics when attempting to roll back to those later (now invalid) savepoints. Adding a sequential ID to each savepoint allows us to easily identify and clean up any savepoints created after the target savepoint.
| // savepoints maps a savepoint name to the number of statements and mutations | |
| // that were executed before the savepoint was created. | |
| savepoints map[string]savepoint | |
| } | |
| type savepoint struct { | |
| statementCount int | |
| mutationCount int | |
| } | |
| // savepoints maps a savepoint name to the number of statements and mutations | |
| // that were executed before the savepoint was created. | |
| savepoints map[string]savepoint | |
| savepointID int | |
| } | |
| type savepoint struct { | |
| id int | |
| statementCount int | |
| mutationCount int | |
| } |
| func (tx *readWriteTransaction) Savepoint(name string) error { | ||
| tx.logger.Debug("creating savepoint", "name", name) | ||
| if tx.savepoints == nil { | ||
| tx.savepoints = make(map[string]savepoint) | ||
| } | ||
| tx.savepoints[name] = savepoint{ | ||
| statementCount: len(tx.statements), | ||
| mutationCount: len(tx.mutations), | ||
| } | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Increment and assign a sequential ID to each savepoint upon creation to track their relative order.
func (tx *readWriteTransaction) Savepoint(name string) error {
tx.logger.Debug("creating savepoint", "name", name)
if tx.savepoints == nil {
tx.savepoints = make(map[string]savepoint)
}
tx.savepointID++
tx.savepoints[name] = savepoint{
id: tx.savepointID,
statementCount: len(tx.statements),
mutationCount: len(tx.mutations),
}
return nil
}| func (tx *readWriteTransaction) RollbackToSavepoint(ctx context.Context, name string) error { | ||
| tx.logger.Debug("rolling back to savepoint", "name", name) | ||
| if tx.savepoints == nil { | ||
| return spanner.ToSpannerError(status.Errorf(codes.FailedPrecondition, "savepoint %q does not exist", name)) | ||
| } | ||
| sp, ok := tx.savepoints[name] | ||
| if !ok { | ||
| return spanner.ToSpannerError(status.Errorf(codes.FailedPrecondition, "savepoint %q does not exist", name)) | ||
| } | ||
| tx.statements = tx.statements[:sp.statementCount] | ||
| tx.mutations = tx.mutations[:sp.mutationCount] | ||
|
|
||
| tx.rwTx.Rollback(context.Background()) | ||
|
|
||
| return tx.recreateAndReplay(ctx) | ||
| } |
There was a problem hiding this comment.
When rolling back to a savepoint, destroy all savepoints created after it to match standard SQL behavior and prevent potential slice out-of-bounds panics.
func (tx *readWriteTransaction) RollbackToSavepoint(ctx context.Context, name string) error {
tx.logger.Debug("rolling back to savepoint", "name", name)
if tx.savepoints == nil {
return spanner.ToSpannerError(status.Errorf(codes.FailedPrecondition, "savepoint %q does not exist", name))
}
sp, ok := tx.savepoints[name]
if !ok {
return spanner.ToSpannerError(status.Errorf(codes.FailedPrecondition, "savepoint %q does not exist", name))
}
tx.statements = tx.statements[:sp.statementCount]
tx.mutations = tx.mutations[:sp.mutationCount]
for k, v := range tx.savepoints {
if v.id > sp.id {
delete(tx.savepoints, k)
}
}
tx.rwTx.Rollback(context.Background())
return tx.recreateAndReplay(ctx)
}| func (tx *readWriteTransaction) ReleaseSavepoint(name string) error { | ||
| tx.logger.Debug("releasing savepoint", "name", name) | ||
| if tx.savepoints == nil { | ||
| return spanner.ToSpannerError(status.Errorf(codes.FailedPrecondition, "savepoint %q does not exist", name)) | ||
| } | ||
| if _, ok := tx.savepoints[name]; !ok { | ||
| return spanner.ToSpannerError(status.Errorf(codes.FailedPrecondition, "savepoint %q does not exist", name)) | ||
| } | ||
| delete(tx.savepoints, name) | ||
| return nil | ||
| } |
There was a problem hiding this comment.
When releasing a savepoint, also release/destroy all savepoints created after it to match standard SQL behavior.
| func (tx *readWriteTransaction) ReleaseSavepoint(name string) error { | |
| tx.logger.Debug("releasing savepoint", "name", name) | |
| if tx.savepoints == nil { | |
| return spanner.ToSpannerError(status.Errorf(codes.FailedPrecondition, "savepoint %q does not exist", name)) | |
| } | |
| if _, ok := tx.savepoints[name]; !ok { | |
| return spanner.ToSpannerError(status.Errorf(codes.FailedPrecondition, "savepoint %q does not exist", name)) | |
| } | |
| delete(tx.savepoints, name) | |
| return nil | |
| } | |
| func (tx *readWriteTransaction) ReleaseSavepoint(name string) error { | |
| tx.logger.Debug("releasing savepoint", "name", name) | |
| if tx.savepoints == nil { | |
| return spanner.ToSpannerError(status.Errorf(codes.FailedPrecondition, "savepoint %q does not exist", name)) | |
| } | |
| sp, ok := tx.savepoints[name] | |
| if !ok { | |
| return spanner.ToSpannerError(status.Errorf(codes.FailedPrecondition, "savepoint %q does not exist", name)) | |
| } | |
| for k, v := range tx.savepoints { | |
| if v.id >= sp.id { | |
| delete(tx.savepoints, k) | |
| } | |
| } | |
| return nil | |
| } |
| func (tx *readWriteTransaction) recreateAndReplay(ctx context.Context) (err error) { | ||
| tx.logger.Log(ctx, LevelNotice, "starting transaction retry for savepoint rollback") | ||
| tx.rwTx, err = spanner.NewReadWriteStmtBasedTransactionWithCallbackForOptions(ctx, tx.conn.client, tx.txOptions, tx.txOptionsCallback) | ||
| if err != nil { | ||
| tx.logger.Log(ctx, LevelNotice, "failed to recreate transaction") | ||
| return err | ||
| } | ||
| if err := tx.rwTx.BufferWrite(tx.mutations); err != nil { | ||
| return err | ||
| } | ||
| for _, stmt := range tx.statements { | ||
| tx.logger.Log(ctx, slog.LevelDebug, "retrying statement", "stmt", stmt) | ||
| err = stmt.retry(ctx, tx.rwTx) | ||
| if err != nil { | ||
| tx.logger.Log(ctx, slog.LevelDebug, "retrying statement failed", "stmt", stmt) | ||
| return err | ||
| } | ||
| } | ||
| tx.logger.Log(ctx, LevelNotice, "finished transaction retry for savepoint rollback") | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Avoid shadowing the named return value err to prevent potential confusion or bugs, and add a deferred rollback of the newly created transaction if the replay fails to prevent leaking active Spanner transactions.
func (tx *readWriteTransaction) recreateAndReplay(ctx context.Context) (err error) {
tx.logger.Log(ctx, LevelNotice, "starting transaction retry for savepoint rollback")
tx.rwTx, err = spanner.NewReadWriteStmtBasedTransactionWithCallbackForOptions(ctx, tx.conn.client, tx.txOptions, tx.txOptionsCallback)
if err != nil {
tx.logger.Log(ctx, LevelNotice, "failed to recreate transaction")
return err
}
defer func() {
if err != nil {
tx.rwTx.Rollback(context.Background())
}
}()
err = tx.rwTx.BufferWrite(tx.mutations)
if err != nil {
return err
}
for _, stmt := range tx.statements {
tx.logger.Log(ctx, slog.LevelDebug, "retrying statement", "stmt", stmt)
err = stmt.retry(ctx, tx.rwTx)
if err != nil {
tx.logger.Log(ctx, slog.LevelDebug, "retrying statement failed", "stmt", stmt)
return err
}
}
tx.logger.Log(ctx, LevelNotice, "finished transaction retry for savepoint rollback")
return nil
}| if err := tx.Rollback(); err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| } |
There was a problem hiding this comment.
Add a test case to verify that savepoints created after a rolled back or released savepoint are correctly destroyed and can no longer be rolled back to.
if err := tx.Rollback(); err != nil {
t.Fatal(err)
}
}
func TestSavepointCleanup(t *testing.T) {
t.Parallel()
db, _, teardown := setupTestDBConnection(t)
defer teardown()
ctx := context.Background()
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
t.Fatal(err)
}
defer tx.Rollback()
if _, err := tx.ExecContext(ctx, "SAVEPOINT s1"); err != nil {
t.Fatal(err)
}
if _, err := tx.ExecContext(ctx, "SAVEPOINT s2"); err != nil {
t.Fatal(err)
}
// Rolling back to s1 should destroy s2
if _, err := tx.ExecContext(ctx, "ROLLBACK TO SAVEPOINT s1"); err != nil {
t.Fatal(err)
}
if _, err := tx.ExecContext(ctx, "ROLLBACK TO SAVEPOINT s2"); err == nil {
t.Error("expected error for rolling back to destroyed savepoint s2")
}
if _, err := tx.ExecContext(ctx, "SAVEPOINT s3"); err != nil {
t.Fatal(err)
}
if _, err := tx.ExecContext(ctx, "SAVEPOINT s4"); err != nil {
t.Fatal(err)
}
// Releasing s3 should destroy s4
if _, err := tx.ExecContext(ctx, "RELEASE SAVEPOINT s3"); err != nil {
t.Fatal(err)
}
if _, err := tx.ExecContext(ctx, "ROLLBACK TO SAVEPOINT s4"); err == nil {
t.Error("expected error for rolling back to destroyed savepoint s4")
}
}|
Impressive work implementing emulated SAVEPOINTs. Replaying transaction statements from the beginning in a new transaction is the correct way to achieve savepoint rollback on Spanner. However, I have one recommendation for robustness: |
Implements local emulation of SAVEPOINT, ROLLBACK TO SAVEPOINT, and RELEASE SAVEPOINT. Savepoint rollback is achieved by rolling back the underlying transaction, discarding statement and mutation history from the savepoint marker onward, and starting a new transaction to replay the remaining statement history.
Fixes #605