1// Copyright 2018 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package spanner
16
17import (
18	"context"
19
20	"cloud.google.com/go/internal/trace"
21	"github.com/googleapis/gax-go/v2"
22	sppb "google.golang.org/genproto/googleapis/spanner/v1"
23	"google.golang.org/grpc/codes"
24)
25
26// PartitionedUpdate executes a DML statement in parallel across the database,
27// using separate, internal transactions that commit independently. The DML
28// statement must be fully partitionable: it must be expressible as the union
29// of many statements each of which accesses only a single row of the table. The
30// statement should also be idempotent, because it may be applied more than once.
31//
32// PartitionedUpdate returns an estimated count of the number of rows affected.
33// The actual number of affected rows may be greater than the estimate.
34func (c *Client) PartitionedUpdate(ctx context.Context, statement Statement) (count int64, err error) {
35	return c.partitionedUpdate(ctx, statement, c.qo)
36}
37
38// PartitionedUpdateWithOptions executes a DML statement in parallel across the database,
39// using separate, internal transactions that commit independently. The sql
40// query execution will be optimized based on the given query options.
41func (c *Client) PartitionedUpdateWithOptions(ctx context.Context, statement Statement, opts QueryOptions) (count int64, err error) {
42	return c.partitionedUpdate(ctx, statement, c.qo.merge(opts))
43}
44
45func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, options QueryOptions) (count int64, err error) {
46	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.PartitionedUpdate")
47	defer func() { trace.EndSpan(ctx, err) }()
48	if err := checkNestedTxn(ctx); err != nil {
49		return 0, err
50	}
51	var (
52		s  *session
53		sh *sessionHandle
54	)
55	// Create session.
56	s, err = c.sc.createSession(ctx)
57	if err != nil {
58		return 0, toSpannerError(err)
59	}
60	// Delete the session at the end of the request. If the PDML statement
61	// timed out or was cancelled, the DeleteSession request might not succeed,
62	// but the session will eventually be garbage collected by the server.
63	defer s.delete(ctx)
64	sh = &sessionHandle{session: s}
65	// Create the parameters and the SQL request, but without a transaction.
66	// The transaction reference will be added by the executePdml method.
67	params, paramTypes, err := statement.convertParams()
68	if err != nil {
69		return 0, toSpannerError(err)
70	}
71	req := &sppb.ExecuteSqlRequest{
72		Session:      sh.getID(),
73		Sql:          statement.SQL,
74		Params:       params,
75		ParamTypes:   paramTypes,
76		QueryOptions: options.Options,
77	}
78
79	// Make a retryer for Aborted errors.
80	// TODO: use generic Aborted retryer when merged with master
81	retryer := gax.OnCodes([]codes.Code{codes.Aborted}, DefaultRetryBackoff)
82	// Execute the PDML and retry if the transaction is aborted.
83	executePdmlWithRetry := func(ctx context.Context) (int64, error) {
84		for {
85			count, err := executePdml(ctx, sh, req)
86			if err == nil {
87				return count, nil
88			}
89			delay, shouldRetry := retryer.Retry(err)
90			if !shouldRetry {
91				return 0, err
92			}
93			if err := gax.Sleep(ctx, delay); err != nil {
94				return 0, err
95			}
96		}
97	}
98	return executePdmlWithRetry(ctx)
99}
100
101// executePdml executes the following steps:
102// 1. Begin a PDML transaction
103// 2. Add the ID of the PDML transaction to the SQL request.
104// 3. Execute the update statement on the PDML transaction
105//
106// Note that PDML transactions cannot be committed or rolled back.
107func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlRequest) (count int64, err error) {
108	// Begin transaction.
109	res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
110		Session: sh.getID(),
111		Options: &sppb.TransactionOptions{
112			Mode: &sppb.TransactionOptions_PartitionedDml_{PartitionedDml: &sppb.TransactionOptions_PartitionedDml{}},
113		},
114	})
115	if err != nil {
116		return 0, toSpannerError(err)
117	}
118	// Add a reference to the PDML transaction on the ExecuteSql request.
119	req.Transaction = &sppb.TransactionSelector{
120		Selector: &sppb.TransactionSelector_Id{Id: res.Id},
121	}
122	resultSet, err := sh.getClient().ExecuteSql(ctx, req)
123	if err != nil {
124		return 0, err
125	}
126	if resultSet.Stats == nil {
127		return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", req.Sql)
128	}
129	return extractRowCount(resultSet.Stats)
130}
131