1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package grpcproxy
16
17import (
18	"context"
19	"io"
20
21	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
22	"go.etcd.io/etcd/client/v3"
23)
24
25type maintenanceProxy struct {
26	client *clientv3.Client
27}
28
29func NewMaintenanceProxy(c *clientv3.Client) pb.MaintenanceServer {
30	return &maintenanceProxy{
31		client: c,
32	}
33}
34
35func (mp *maintenanceProxy) Defragment(ctx context.Context, dr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
36	conn := mp.client.ActiveConnection()
37	return pb.NewMaintenanceClient(conn).Defragment(ctx, dr)
38}
39
40func (mp *maintenanceProxy) Snapshot(sr *pb.SnapshotRequest, stream pb.Maintenance_SnapshotServer) error {
41	conn := mp.client.ActiveConnection()
42	ctx, cancel := context.WithCancel(stream.Context())
43	defer cancel()
44
45	ctx = withClientAuthToken(ctx, stream.Context())
46
47	sc, err := pb.NewMaintenanceClient(conn).Snapshot(ctx, sr)
48	if err != nil {
49		return err
50	}
51
52	for {
53		rr, err := sc.Recv()
54		if err != nil {
55			if err == io.EOF {
56				return nil
57			}
58			return err
59		}
60		err = stream.Send(rr)
61		if err != nil {
62			return err
63		}
64	}
65}
66
67func (mp *maintenanceProxy) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
68	conn := mp.client.ActiveConnection()
69	return pb.NewMaintenanceClient(conn).Hash(ctx, r)
70}
71
72func (mp *maintenanceProxy) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) {
73	conn := mp.client.ActiveConnection()
74	return pb.NewMaintenanceClient(conn).HashKV(ctx, r)
75}
76
77func (mp *maintenanceProxy) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) {
78	conn := mp.client.ActiveConnection()
79	return pb.NewMaintenanceClient(conn).Alarm(ctx, r)
80}
81
82func (mp *maintenanceProxy) Status(ctx context.Context, r *pb.StatusRequest) (*pb.StatusResponse, error) {
83	conn := mp.client.ActiveConnection()
84	return pb.NewMaintenanceClient(conn).Status(ctx, r)
85}
86
87func (mp *maintenanceProxy) MoveLeader(ctx context.Context, r *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) {
88	conn := mp.client.ActiveConnection()
89	return pb.NewMaintenanceClient(conn).MoveLeader(ctx, r)
90}
91
92func (mp *maintenanceProxy) Downgrade(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
93	conn := mp.client.ActiveConnection()
94	return pb.NewMaintenanceClient(conn).Downgrade(ctx, r)
95}
96