1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package clientv3 16 17import ( 18 "context" 19 20 "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" 21 pb "go.etcd.io/etcd/etcdserver/etcdserverpb" 22 23 "google.golang.org/grpc" 24 "google.golang.org/grpc/codes" 25 "google.golang.org/grpc/status" 26) 27 28type retryPolicy uint8 29 30const ( 31 repeatable retryPolicy = iota 32 nonRepeatable 33) 34 35func (rp retryPolicy) String() string { 36 switch rp { 37 case repeatable: 38 return "repeatable" 39 case nonRepeatable: 40 return "nonRepeatable" 41 default: 42 return "UNKNOWN" 43 } 44} 45 46// isSafeRetryImmutableRPC returns "true" when an immutable request is safe for retry. 47// 48// immutable requests (e.g. Get) should be retried unless it's 49// an obvious server-side error (e.g. rpctypes.ErrRequestTooLarge). 50// 51// Returning "false" means retry should stop, since client cannot 52// handle itself even with retries. 53func isSafeRetryImmutableRPC(err error) bool { 54 eErr := rpctypes.Error(err) 55 if serverErr, ok := eErr.(rpctypes.EtcdError); ok && serverErr.Code() != codes.Unavailable { 56 // interrupted by non-transient server-side or gRPC-side error 57 // client cannot handle itself (e.g. rpctypes.ErrCompacted) 58 return false 59 } 60 // only retry if unavailable 61 ev, ok := status.FromError(err) 62 if !ok { 63 // all errors from RPC is typed "grpc/status.(*statusError)" 64 // (ref. https://github.com/grpc/grpc-go/pull/1782) 65 // 66 // if the error type is not "grpc/status.(*statusError)", 67 // it could be from "Dial" 68 // TODO: do not retry for now 69 // ref. https://github.com/grpc/grpc-go/issues/1581 70 return false 71 } 72 return ev.Code() == codes.Unavailable 73} 74 75// isSafeRetryMutableRPC returns "true" when a mutable request is safe for retry. 76// 77// mutable requests (e.g. Put, Delete, Txn) should only be retried 78// when the status code is codes.Unavailable when initial connection 79// has not been established (no endpoint is up). 80// 81// Returning "false" means retry should stop, otherwise it violates 82// write-at-most-once semantics. 83func isSafeRetryMutableRPC(err error) bool { 84 if ev, ok := status.FromError(err); ok && ev.Code() != codes.Unavailable { 85 // not safe for mutable RPCs 86 // e.g. interrupted by non-transient error that client cannot handle itself, 87 // or transient error while the connection has already been established 88 return false 89 } 90 desc := rpctypes.ErrorDesc(err) 91 return desc == "there is no address available" || desc == "there is no connection available" 92} 93 94type retryKVClient struct { 95 kc pb.KVClient 96} 97 98// RetryKVClient implements a KVClient. 99func RetryKVClient(c *Client) pb.KVClient { 100 return &retryKVClient{ 101 kc: pb.NewKVClient(c.conn), 102 } 103} 104func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (resp *pb.RangeResponse, err error) { 105 return rkv.kc.Range(ctx, in, append(opts, withRetryPolicy(repeatable))...) 106} 107 108func (rkv *retryKVClient) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (resp *pb.PutResponse, err error) { 109 return rkv.kc.Put(ctx, in, opts...) 110} 111 112func (rkv *retryKVClient) DeleteRange(ctx context.Context, in *pb.DeleteRangeRequest, opts ...grpc.CallOption) (resp *pb.DeleteRangeResponse, err error) { 113 return rkv.kc.DeleteRange(ctx, in, opts...) 114} 115 116func (rkv *retryKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallOption) (resp *pb.TxnResponse, err error) { 117 return rkv.kc.Txn(ctx, in, opts...) 118} 119 120func (rkv *retryKVClient) Compact(ctx context.Context, in *pb.CompactionRequest, opts ...grpc.CallOption) (resp *pb.CompactionResponse, err error) { 121 return rkv.kc.Compact(ctx, in, opts...) 122} 123 124type retryLeaseClient struct { 125 lc pb.LeaseClient 126} 127 128// RetryLeaseClient implements a LeaseClient. 129func RetryLeaseClient(c *Client) pb.LeaseClient { 130 return &retryLeaseClient{ 131 lc: pb.NewLeaseClient(c.conn), 132 } 133} 134 135func (rlc *retryLeaseClient) LeaseTimeToLive(ctx context.Context, in *pb.LeaseTimeToLiveRequest, opts ...grpc.CallOption) (resp *pb.LeaseTimeToLiveResponse, err error) { 136 return rlc.lc.LeaseTimeToLive(ctx, in, append(opts, withRetryPolicy(repeatable))...) 137} 138 139func (rlc *retryLeaseClient) LeaseLeases(ctx context.Context, in *pb.LeaseLeasesRequest, opts ...grpc.CallOption) (resp *pb.LeaseLeasesResponse, err error) { 140 return rlc.lc.LeaseLeases(ctx, in, append(opts, withRetryPolicy(repeatable))...) 141} 142 143func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (resp *pb.LeaseGrantResponse, err error) { 144 return rlc.lc.LeaseGrant(ctx, in, append(opts, withRetryPolicy(repeatable))...) 145} 146 147func (rlc *retryLeaseClient) LeaseRevoke(ctx context.Context, in *pb.LeaseRevokeRequest, opts ...grpc.CallOption) (resp *pb.LeaseRevokeResponse, err error) { 148 return rlc.lc.LeaseRevoke(ctx, in, append(opts, withRetryPolicy(repeatable))...) 149} 150 151func (rlc *retryLeaseClient) LeaseKeepAlive(ctx context.Context, opts ...grpc.CallOption) (stream pb.Lease_LeaseKeepAliveClient, err error) { 152 return rlc.lc.LeaseKeepAlive(ctx, append(opts, withRetryPolicy(repeatable))...) 153} 154 155type retryClusterClient struct { 156 cc pb.ClusterClient 157} 158 159// RetryClusterClient implements a ClusterClient. 160func RetryClusterClient(c *Client) pb.ClusterClient { 161 return &retryClusterClient{ 162 cc: pb.NewClusterClient(c.conn), 163 } 164} 165 166func (rcc *retryClusterClient) MemberList(ctx context.Context, in *pb.MemberListRequest, opts ...grpc.CallOption) (resp *pb.MemberListResponse, err error) { 167 return rcc.cc.MemberList(ctx, in, append(opts, withRetryPolicy(repeatable))...) 168} 169 170func (rcc *retryClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRequest, opts ...grpc.CallOption) (resp *pb.MemberAddResponse, err error) { 171 return rcc.cc.MemberAdd(ctx, in, opts...) 172} 173 174func (rcc *retryClusterClient) MemberRemove(ctx context.Context, in *pb.MemberRemoveRequest, opts ...grpc.CallOption) (resp *pb.MemberRemoveResponse, err error) { 175 return rcc.cc.MemberRemove(ctx, in, opts...) 176} 177 178func (rcc *retryClusterClient) MemberUpdate(ctx context.Context, in *pb.MemberUpdateRequest, opts ...grpc.CallOption) (resp *pb.MemberUpdateResponse, err error) { 179 return rcc.cc.MemberUpdate(ctx, in, opts...) 180} 181 182func (rcc *retryClusterClient) MemberPromote(ctx context.Context, in *pb.MemberPromoteRequest, opts ...grpc.CallOption) (resp *pb.MemberPromoteResponse, err error) { 183 return rcc.cc.MemberPromote(ctx, in, opts...) 184} 185 186type retryMaintenanceClient struct { 187 mc pb.MaintenanceClient 188} 189 190// RetryMaintenanceClient implements a Maintenance. 191func RetryMaintenanceClient(c *Client, conn *grpc.ClientConn) pb.MaintenanceClient { 192 return &retryMaintenanceClient{ 193 mc: pb.NewMaintenanceClient(conn), 194 } 195} 196 197func (rmc *retryMaintenanceClient) Alarm(ctx context.Context, in *pb.AlarmRequest, opts ...grpc.CallOption) (resp *pb.AlarmResponse, err error) { 198 return rmc.mc.Alarm(ctx, in, append(opts, withRetryPolicy(repeatable))...) 199} 200 201func (rmc *retryMaintenanceClient) Status(ctx context.Context, in *pb.StatusRequest, opts ...grpc.CallOption) (resp *pb.StatusResponse, err error) { 202 return rmc.mc.Status(ctx, in, append(opts, withRetryPolicy(repeatable))...) 203} 204 205func (rmc *retryMaintenanceClient) Hash(ctx context.Context, in *pb.HashRequest, opts ...grpc.CallOption) (resp *pb.HashResponse, err error) { 206 return rmc.mc.Hash(ctx, in, append(opts, withRetryPolicy(repeatable))...) 207} 208 209func (rmc *retryMaintenanceClient) HashKV(ctx context.Context, in *pb.HashKVRequest, opts ...grpc.CallOption) (resp *pb.HashKVResponse, err error) { 210 return rmc.mc.HashKV(ctx, in, append(opts, withRetryPolicy(repeatable))...) 211} 212 213func (rmc *retryMaintenanceClient) Snapshot(ctx context.Context, in *pb.SnapshotRequest, opts ...grpc.CallOption) (stream pb.Maintenance_SnapshotClient, err error) { 214 return rmc.mc.Snapshot(ctx, in, append(opts, withRetryPolicy(repeatable))...) 215} 216 217func (rmc *retryMaintenanceClient) MoveLeader(ctx context.Context, in *pb.MoveLeaderRequest, opts ...grpc.CallOption) (resp *pb.MoveLeaderResponse, err error) { 218 return rmc.mc.MoveLeader(ctx, in, append(opts, withRetryPolicy(repeatable))...) 219} 220 221func (rmc *retryMaintenanceClient) Defragment(ctx context.Context, in *pb.DefragmentRequest, opts ...grpc.CallOption) (resp *pb.DefragmentResponse, err error) { 222 return rmc.mc.Defragment(ctx, in, opts...) 223} 224 225func (rmc *retryMaintenanceClient) Downgrade(ctx context.Context, in *pb.DowngradeRequest, opts ...grpc.CallOption) (resp *pb.DowngradeResponse, err error) { 226 return rmc.mc.Downgrade(ctx, in, opts...) 227} 228 229type retryAuthClient struct { 230 ac pb.AuthClient 231} 232 233// RetryAuthClient implements a AuthClient. 234func RetryAuthClient(c *Client) pb.AuthClient { 235 return &retryAuthClient{ 236 ac: pb.NewAuthClient(c.conn), 237 } 238} 239 240func (rac *retryAuthClient) UserList(ctx context.Context, in *pb.AuthUserListRequest, opts ...grpc.CallOption) (resp *pb.AuthUserListResponse, err error) { 241 return rac.ac.UserList(ctx, in, append(opts, withRetryPolicy(repeatable))...) 242} 243 244func (rac *retryAuthClient) UserGet(ctx context.Context, in *pb.AuthUserGetRequest, opts ...grpc.CallOption) (resp *pb.AuthUserGetResponse, err error) { 245 return rac.ac.UserGet(ctx, in, append(opts, withRetryPolicy(repeatable))...) 246} 247 248func (rac *retryAuthClient) RoleGet(ctx context.Context, in *pb.AuthRoleGetRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleGetResponse, err error) { 249 return rac.ac.RoleGet(ctx, in, append(opts, withRetryPolicy(repeatable))...) 250} 251 252func (rac *retryAuthClient) RoleList(ctx context.Context, in *pb.AuthRoleListRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleListResponse, err error) { 253 return rac.ac.RoleList(ctx, in, append(opts, withRetryPolicy(repeatable))...) 254} 255 256func (rac *retryAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableRequest, opts ...grpc.CallOption) (resp *pb.AuthEnableResponse, err error) { 257 return rac.ac.AuthEnable(ctx, in, opts...) 258} 259 260func (rac *retryAuthClient) AuthDisable(ctx context.Context, in *pb.AuthDisableRequest, opts ...grpc.CallOption) (resp *pb.AuthDisableResponse, err error) { 261 return rac.ac.AuthDisable(ctx, in, opts...) 262} 263 264func (rac *retryAuthClient) AuthStatus(ctx context.Context, in *pb.AuthStatusRequest, opts ...grpc.CallOption) (resp *pb.AuthStatusResponse, err error) { 265 return rac.ac.AuthStatus(ctx, in, opts...) 266} 267 268func (rac *retryAuthClient) UserAdd(ctx context.Context, in *pb.AuthUserAddRequest, opts ...grpc.CallOption) (resp *pb.AuthUserAddResponse, err error) { 269 return rac.ac.UserAdd(ctx, in, opts...) 270} 271 272func (rac *retryAuthClient) UserDelete(ctx context.Context, in *pb.AuthUserDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthUserDeleteResponse, err error) { 273 return rac.ac.UserDelete(ctx, in, opts...) 274} 275 276func (rac *retryAuthClient) UserChangePassword(ctx context.Context, in *pb.AuthUserChangePasswordRequest, opts ...grpc.CallOption) (resp *pb.AuthUserChangePasswordResponse, err error) { 277 return rac.ac.UserChangePassword(ctx, in, opts...) 278} 279 280func (rac *retryAuthClient) UserGrantRole(ctx context.Context, in *pb.AuthUserGrantRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserGrantRoleResponse, err error) { 281 return rac.ac.UserGrantRole(ctx, in, opts...) 282} 283 284func (rac *retryAuthClient) UserRevokeRole(ctx context.Context, in *pb.AuthUserRevokeRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserRevokeRoleResponse, err error) { 285 return rac.ac.UserRevokeRole(ctx, in, opts...) 286} 287 288func (rac *retryAuthClient) RoleAdd(ctx context.Context, in *pb.AuthRoleAddRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleAddResponse, err error) { 289 return rac.ac.RoleAdd(ctx, in, opts...) 290} 291 292func (rac *retryAuthClient) RoleDelete(ctx context.Context, in *pb.AuthRoleDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleDeleteResponse, err error) { 293 return rac.ac.RoleDelete(ctx, in, opts...) 294} 295 296func (rac *retryAuthClient) RoleGrantPermission(ctx context.Context, in *pb.AuthRoleGrantPermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleGrantPermissionResponse, err error) { 297 return rac.ac.RoleGrantPermission(ctx, in, opts...) 298} 299 300func (rac *retryAuthClient) RoleRevokePermission(ctx context.Context, in *pb.AuthRoleRevokePermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleRevokePermissionResponse, err error) { 301 return rac.ac.RoleRevokePermission(ctx, in, opts...) 302} 303 304func (rac *retryAuthClient) Authenticate(ctx context.Context, in *pb.AuthenticateRequest, opts ...grpc.CallOption) (resp *pb.AuthenticateResponse, err error) { 305 return rac.ac.Authenticate(ctx, in, opts...) 306} 307