1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package membership
16
17import (
18	"encoding/json"
19	"fmt"
20	"path"
21
22	"go.etcd.io/etcd/pkg/v3/types"
23	"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
24	"go.etcd.io/etcd/server/v3/mvcc/backend"
25
26	"github.com/coreos/go-semver/semver"
27	"go.uber.org/zap"
28)
29
30const (
31	attributesSuffix     = "attributes"
32	raftAttributesSuffix = "raftAttributes"
33
34	// the prefix for stroing membership related information in store provided by store pkg.
35	storePrefix = "/0"
36)
37
38var (
39	membersBucketName        = []byte("members")
40	membersRemovedBucketName = []byte("members_removed")
41	clusterBucketName        = []byte("cluster")
42
43	StoreMembersPrefix        = path.Join(storePrefix, "members")
44	storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members")
45)
46
47func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) {
48	mkey := backendMemberKey(m.ID)
49	mvalue, err := json.Marshal(m)
50	if err != nil {
51		lg.Panic("failed to marshal member", zap.Error(err))
52	}
53
54	tx := be.BatchTx()
55	tx.Lock()
56	defer tx.Unlock()
57	tx.UnsafePut(membersBucketName, mkey, mvalue)
58}
59
60func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) {
61	mkey := backendMemberKey(id)
62
63	tx := be.BatchTx()
64	tx.Lock()
65	defer tx.Unlock()
66	tx.UnsafeDelete(membersBucketName, mkey)
67	tx.UnsafePut(membersRemovedBucketName, mkey, []byte("removed"))
68}
69
70func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
71	ckey := backendClusterVersionKey()
72
73	tx := be.BatchTx()
74	tx.Lock()
75	defer tx.Unlock()
76	tx.UnsafePut(clusterBucketName, ckey, []byte(ver.String()))
77}
78
79func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *DowngradeInfo) {
80	dkey := backendDowngradeKey()
81	dvalue, err := json.Marshal(downgrade)
82	if err != nil {
83		lg.Panic("failed to marshal downgrade information", zap.Error(err))
84	}
85	tx := be.BatchTx()
86	tx.Lock()
87	defer tx.Unlock()
88	tx.UnsafePut(clusterBucketName, dkey, dvalue)
89}
90
91func mustSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) {
92	b, err := json.Marshal(m.RaftAttributes)
93	if err != nil {
94		lg.Panic("failed to marshal raftAttributes", zap.Error(err))
95	}
96	p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
97	if _, err := s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
98		lg.Panic(
99			"failed to save member to store",
100			zap.String("path", p),
101			zap.Error(err),
102		)
103	}
104}
105
106func mustDeleteMemberFromStore(lg *zap.Logger, s v2store.Store, id types.ID) {
107	if _, err := s.Delete(MemberStoreKey(id), true, true); err != nil {
108		lg.Panic(
109			"failed to delete member from store",
110			zap.String("path", MemberStoreKey(id)),
111			zap.Error(err),
112		)
113	}
114	if _, err := s.Create(RemovedMemberStoreKey(id), false, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
115		lg.Panic(
116			"failed to create removedMember",
117			zap.String("path", RemovedMemberStoreKey(id)),
118			zap.Error(err),
119		)
120	}
121}
122
123func mustUpdateMemberInStore(lg *zap.Logger, s v2store.Store, m *Member) {
124	b, err := json.Marshal(m.RaftAttributes)
125	if err != nil {
126		lg.Panic("failed to marshal raftAttributes", zap.Error(err))
127	}
128	p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
129	if _, err := s.Update(p, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
130		lg.Panic(
131			"failed to update raftAttributes",
132			zap.String("path", p),
133			zap.Error(err),
134		)
135	}
136}
137
138func mustUpdateMemberAttrInStore(lg *zap.Logger, s v2store.Store, m *Member) {
139	b, err := json.Marshal(m.Attributes)
140	if err != nil {
141		lg.Panic("failed to marshal attributes", zap.Error(err))
142	}
143	p := path.Join(MemberStoreKey(m.ID), attributesSuffix)
144	if _, err := s.Set(p, false, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
145		lg.Panic(
146			"failed to update attributes",
147			zap.String("path", p),
148			zap.Error(err),
149		)
150	}
151}
152
153func mustSaveClusterVersionToStore(lg *zap.Logger, s v2store.Store, ver *semver.Version) {
154	if _, err := s.Set(StoreClusterVersionKey(), false, ver.String(), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
155		lg.Panic(
156			"failed to save cluster version to store",
157			zap.String("path", StoreClusterVersionKey()),
158			zap.Error(err),
159		)
160	}
161}
162
163// nodeToMember builds member from a key value node.
164// the child nodes of the given node MUST be sorted by key.
165func nodeToMember(lg *zap.Logger, n *v2store.NodeExtern) (*Member, error) {
166	m := &Member{ID: MustParseMemberIDFromKey(lg, n.Key)}
167	attrs := make(map[string][]byte)
168	raftAttrKey := path.Join(n.Key, raftAttributesSuffix)
169	attrKey := path.Join(n.Key, attributesSuffix)
170	for _, nn := range n.Nodes {
171		if nn.Key != raftAttrKey && nn.Key != attrKey {
172			return nil, fmt.Errorf("unknown key %q", nn.Key)
173		}
174		attrs[nn.Key] = []byte(*nn.Value)
175	}
176	if data := attrs[raftAttrKey]; data != nil {
177		if err := json.Unmarshal(data, &m.RaftAttributes); err != nil {
178			return nil, fmt.Errorf("unmarshal raftAttributes error: %v", err)
179		}
180	} else {
181		return nil, fmt.Errorf("raftAttributes key doesn't exist")
182	}
183	if data := attrs[attrKey]; data != nil {
184		if err := json.Unmarshal(data, &m.Attributes); err != nil {
185			return m, fmt.Errorf("unmarshal attributes error: %v", err)
186		}
187	}
188	return m, nil
189}
190
191func backendMemberKey(id types.ID) []byte {
192	return []byte(id.String())
193}
194
195func backendClusterVersionKey() []byte {
196	return []byte("clusterVersion")
197}
198
199func backendDowngradeKey() []byte {
200	return []byte("downgrade")
201}
202
203func mustCreateBackendBuckets(be backend.Backend) {
204	tx := be.BatchTx()
205	tx.Lock()
206	defer tx.Unlock()
207	tx.UnsafeCreateBucket(membersBucketName)
208	tx.UnsafeCreateBucket(membersRemovedBucketName)
209	tx.UnsafeCreateBucket(clusterBucketName)
210}
211
212func MemberStoreKey(id types.ID) string {
213	return path.Join(StoreMembersPrefix, id.String())
214}
215
216func StoreClusterVersionKey() string {
217	return path.Join(storePrefix, "version")
218}
219
220func MemberAttributesStorePath(id types.ID) string {
221	return path.Join(MemberStoreKey(id), attributesSuffix)
222}
223
224func MustParseMemberIDFromKey(lg *zap.Logger, key string) types.ID {
225	id, err := types.IDFromString(path.Base(key))
226	if err != nil {
227		lg.Panic("failed to parse memver id from key", zap.Error(err))
228	}
229	return id
230}
231
232func RemovedMemberStoreKey(id types.ID) string {
233	return path.Join(storeRemovedMembersPrefix, id.String())
234}
235