1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18	"encoding/json"
19	"fmt"
20	"path"
21	"strconv"
22	"time"
23
24	"github.com/coreos/go-semver/semver"
25	"go.etcd.io/etcd/pkg/v3/pbutil"
26	"go.etcd.io/etcd/server/v3/etcdserver/api"
27	"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
28	"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
29
30	"go.uber.org/zap"
31)
32
33const v2Version = "v2"
34
35// ApplierV2 is the interface for processing V2 raft messages
36type ApplierV2 interface {
37	Delete(r *RequestV2) Response
38	Post(r *RequestV2) Response
39	Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response
40	QGet(r *RequestV2) Response
41	Sync(r *RequestV2) Response
42}
43
44func NewApplierV2(lg *zap.Logger, s v2store.Store, c *membership.RaftCluster) ApplierV2 {
45	if lg == nil {
46		lg = zap.NewNop()
47	}
48	return &applierV2store{lg: lg, store: s, cluster: c}
49}
50
51type applierV2store struct {
52	lg      *zap.Logger
53	store   v2store.Store
54	cluster *membership.RaftCluster
55}
56
57func (a *applierV2store) Delete(r *RequestV2) Response {
58	switch {
59	case r.PrevIndex > 0 || r.PrevValue != "":
60		return toResponse(a.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
61	default:
62		return toResponse(a.store.Delete(r.Path, r.Dir, r.Recursive))
63	}
64}
65
66func (a *applierV2store) Post(r *RequestV2) Response {
67	return toResponse(a.store.Create(r.Path, r.Dir, r.Val, true, r.TTLOptions()))
68}
69
70func (a *applierV2store) Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response {
71	ttlOptions := r.TTLOptions()
72	exists, existsSet := pbutil.GetBool(r.PrevExist)
73	switch {
74	case existsSet:
75		if exists {
76			if r.PrevIndex == 0 && r.PrevValue == "" {
77				return toResponse(a.store.Update(r.Path, r.Val, ttlOptions))
78			}
79			return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
80		}
81		return toResponse(a.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions))
82	case r.PrevIndex > 0 || r.PrevValue != "":
83		return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
84	default:
85		if storeMemberAttributeRegexp.MatchString(r.Path) {
86			id := membership.MustParseMemberIDFromKey(a.lg, path.Dir(r.Path))
87			var attr membership.Attributes
88			if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
89				a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
90			}
91			if a.cluster != nil {
92				a.cluster.UpdateAttributes(id, attr, shouldApplyV3)
93			}
94			// return an empty response since there is no consumer.
95			return Response{}
96		}
97		// TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6
98		if r.Path == membership.StoreClusterVersionKey() {
99			if a.cluster != nil {
100				// persist to backend given v2store can be very stale
101				a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3)
102			}
103			return Response{}
104		}
105		return toResponse(a.store.Set(r.Path, r.Dir, r.Val, ttlOptions))
106	}
107}
108
109func (a *applierV2store) QGet(r *RequestV2) Response {
110	return toResponse(a.store.Get(r.Path, r.Recursive, r.Sorted))
111}
112
113func (a *applierV2store) Sync(r *RequestV2) Response {
114	a.store.DeleteExpiredKeys(time.Unix(0, r.Time))
115	return Response{}
116}
117
118// applyV2Request interprets r as a call to v2store.X
119// and returns a Response interpreted from v2store.Event
120func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) (resp Response) {
121	stringer := panicAlternativeStringer{
122		stringer:    r,
123		alternative: func() string { return fmt.Sprintf("id:%d,method:%s,path:%s", r.ID, r.Method, r.Path) },
124	}
125	defer func(start time.Time) {
126		success := resp.Err == nil
127		applySec.WithLabelValues(v2Version, r.Method, strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
128		warnOfExpensiveRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, stringer, nil, nil)
129	}(time.Now())
130
131	switch r.Method {
132	case "POST":
133		return s.applyV2.Post(r)
134	case "PUT":
135		return s.applyV2.Put(r, shouldApplyV3)
136	case "DELETE":
137		return s.applyV2.Delete(r)
138	case "QGET":
139		return s.applyV2.QGet(r)
140	case "SYNC":
141		return s.applyV2.Sync(r)
142	default:
143		// This should never be reached, but just in case:
144		return Response{Err: ErrUnknownMethod}
145	}
146}
147
148func (r *RequestV2) TTLOptions() v2store.TTLOptionSet {
149	refresh, _ := pbutil.GetBool(r.Refresh)
150	ttlOptions := v2store.TTLOptionSet{Refresh: refresh}
151	if r.Expiration != 0 {
152		ttlOptions.ExpireTime = time.Unix(0, r.Expiration)
153	}
154	return ttlOptions
155}
156
157func toResponse(ev *v2store.Event, err error) Response {
158	return Response{Event: ev, Err: err}
159}
160