1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// +build cluster_proxy
16
17package integration
18
19import (
20	"sync"
21
22	"github.com/coreos/etcd/clientv3"
23	"github.com/coreos/etcd/clientv3/namespace"
24	"github.com/coreos/etcd/proxy/grpcproxy"
25	"github.com/coreos/etcd/proxy/grpcproxy/adapter"
26)
27
28var (
29	pmu     sync.Mutex
30	proxies map[*clientv3.Client]grpcClientProxy = make(map[*clientv3.Client]grpcClientProxy)
31)
32
33const proxyNamespace = "proxy-namespace"
34
35type grpcClientProxy struct {
36	grpc    grpcAPI
37	wdonec  <-chan struct{}
38	kvdonec <-chan struct{}
39	lpdonec <-chan struct{}
40}
41
42func toGRPC(c *clientv3.Client) grpcAPI {
43	pmu.Lock()
44	defer pmu.Unlock()
45
46	if v, ok := proxies[c]; ok {
47		return v.grpc
48	}
49
50	// test namespacing proxy
51	c.KV = namespace.NewKV(c.KV, proxyNamespace)
52	c.Watcher = namespace.NewWatcher(c.Watcher, proxyNamespace)
53	c.Lease = namespace.NewLease(c.Lease, proxyNamespace)
54	// test coalescing/caching proxy
55	kvp, kvpch := grpcproxy.NewKvProxy(c)
56	wp, wpch := grpcproxy.NewWatchProxy(c)
57	lp, lpch := grpcproxy.NewLeaseProxy(c)
58	mp := grpcproxy.NewMaintenanceProxy(c)
59	clp, _ := grpcproxy.NewClusterProxy(c, "", "") // without registering proxy URLs
60	authp := grpcproxy.NewAuthProxy(c)
61	lockp := grpcproxy.NewLockProxy(c)
62	electp := grpcproxy.NewElectionProxy(c)
63
64	grpc := grpcAPI{
65		adapter.ClusterServerToClusterClient(clp),
66		adapter.KvServerToKvClient(kvp),
67		adapter.LeaseServerToLeaseClient(lp),
68		adapter.WatchServerToWatchClient(wp),
69		adapter.MaintenanceServerToMaintenanceClient(mp),
70		adapter.AuthServerToAuthClient(authp),
71		adapter.LockServerToLockClient(lockp),
72		adapter.ElectionServerToElectionClient(electp),
73	}
74	proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch, kvdonec: kvpch, lpdonec: lpch}
75	return grpc
76}
77
78type proxyCloser struct {
79	clientv3.Watcher
80	wdonec  <-chan struct{}
81	kvdonec <-chan struct{}
82	lclose  func()
83	lpdonec <-chan struct{}
84}
85
86func (pc *proxyCloser) Close() error {
87	// client ctx is canceled before calling close, so kv and lp will close out
88	<-pc.kvdonec
89	err := pc.Watcher.Close()
90	<-pc.wdonec
91	pc.lclose()
92	<-pc.lpdonec
93	return err
94}
95
96func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) {
97	c, err := clientv3.New(cfg)
98	if err != nil {
99		return nil, err
100	}
101	rpc := toGRPC(c)
102	c.KV = clientv3.NewKVFromKVClient(rpc.KV, c)
103	pmu.Lock()
104	lc := c.Lease
105	c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, c, cfg.DialTimeout)
106	c.Watcher = &proxyCloser{
107		Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch, c),
108		wdonec:  proxies[c].wdonec,
109		kvdonec: proxies[c].kvdonec,
110		lclose:  func() { lc.Close() },
111		lpdonec: proxies[c].lpdonec,
112	}
113	pmu.Unlock()
114	return c, nil
115}
116