1// Copyright 2018 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package spanner 16 17import ( 18 "context" 19 20 "cloud.google.com/go/internal/trace" 21 "github.com/googleapis/gax-go/v2" 22 sppb "google.golang.org/genproto/googleapis/spanner/v1" 23 "google.golang.org/grpc/codes" 24) 25 26// PartitionedUpdate executes a DML statement in parallel across the database, 27// using separate, internal transactions that commit independently. The DML 28// statement must be fully partitionable: it must be expressible as the union 29// of many statements each of which accesses only a single row of the table. The 30// statement should also be idempotent, because it may be applied more than once. 31// 32// PartitionedUpdate returns an estimated count of the number of rows affected. 33// The actual number of affected rows may be greater than the estimate. 34func (c *Client) PartitionedUpdate(ctx context.Context, statement Statement) (count int64, err error) { 35 return c.partitionedUpdate(ctx, statement, c.qo) 36} 37 38// PartitionedUpdateWithOptions executes a DML statement in parallel across the database, 39// using separate, internal transactions that commit independently. The sql 40// query execution will be optimized based on the given query options. 41func (c *Client) PartitionedUpdateWithOptions(ctx context.Context, statement Statement, opts QueryOptions) (count int64, err error) { 42 return c.partitionedUpdate(ctx, statement, c.qo.merge(opts)) 43} 44 45func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, options QueryOptions) (count int64, err error) { 46 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.PartitionedUpdate") 47 defer func() { trace.EndSpan(ctx, err) }() 48 if err := checkNestedTxn(ctx); err != nil { 49 return 0, err 50 } 51 var ( 52 s *session 53 sh *sessionHandle 54 ) 55 // Create session. 56 s, err = c.sc.createSession(ctx) 57 if err != nil { 58 return 0, toSpannerError(err) 59 } 60 // Delete the session at the end of the request. If the PDML statement 61 // timed out or was cancelled, the DeleteSession request might not succeed, 62 // but the session will eventually be garbage collected by the server. 63 defer s.delete(ctx) 64 sh = &sessionHandle{session: s} 65 // Create the parameters and the SQL request, but without a transaction. 66 // The transaction reference will be added by the executePdml method. 67 params, paramTypes, err := statement.convertParams() 68 if err != nil { 69 return 0, toSpannerError(err) 70 } 71 req := &sppb.ExecuteSqlRequest{ 72 Session: sh.getID(), 73 Sql: statement.SQL, 74 Params: params, 75 ParamTypes: paramTypes, 76 QueryOptions: options.Options, 77 } 78 79 // Make a retryer for Aborted errors. 80 // TODO: use generic Aborted retryer when merged with master 81 retryer := gax.OnCodes([]codes.Code{codes.Aborted}, DefaultRetryBackoff) 82 // Execute the PDML and retry if the transaction is aborted. 83 executePdmlWithRetry := func(ctx context.Context) (int64, error) { 84 for { 85 count, err := executePdml(ctx, sh, req) 86 if err == nil { 87 return count, nil 88 } 89 delay, shouldRetry := retryer.Retry(err) 90 if !shouldRetry { 91 return 0, err 92 } 93 if err := gax.Sleep(ctx, delay); err != nil { 94 return 0, err 95 } 96 } 97 } 98 return executePdmlWithRetry(ctx) 99} 100 101// executePdml executes the following steps: 102// 1. Begin a PDML transaction 103// 2. Add the ID of the PDML transaction to the SQL request. 104// 3. Execute the update statement on the PDML transaction 105// 106// Note that PDML transactions cannot be committed or rolled back. 107func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlRequest) (count int64, err error) { 108 // Begin transaction. 109 res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{ 110 Session: sh.getID(), 111 Options: &sppb.TransactionOptions{ 112 Mode: &sppb.TransactionOptions_PartitionedDml_{PartitionedDml: &sppb.TransactionOptions_PartitionedDml{}}, 113 }, 114 }) 115 if err != nil { 116 return 0, toSpannerError(err) 117 } 118 // Add a reference to the PDML transaction on the ExecuteSql request. 119 req.Transaction = &sppb.TransactionSelector{ 120 Selector: &sppb.TransactionSelector_Id{Id: res.Id}, 121 } 122 resultSet, err := sh.getClient().ExecuteSql(ctx, req) 123 if err != nil { 124 return 0, err 125 } 126 if resultSet.Stats == nil { 127 return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", req.Sql) 128 } 129 return extractRowCount(resultSet.Stats) 130} 131