1/*
2Copyright 2019 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package guestbook
18
19import (
20	"encoding/json"
21	"fmt"
22	"io/ioutil"
23	"log"
24	"net"
25	"net/http"
26	"net/url"
27	"strings"
28	"time"
29
30	"github.com/spf13/cobra"
31
32	utilnet "k8s.io/apimachinery/pkg/util/net"
33)
34
35// CmdGuestbook is used by agnhost Cobra.
36var CmdGuestbook = &cobra.Command{
37	Use:   "guestbook",
38	Short: "Creates a HTTP server with various endpoints representing a guestbook app",
39	Long: `Starts a HTTP server on the given --http-port (default: 80), serving various endpoints representing a guestbook app. The endpoints and their purpose are:
40
41- /register: A guestbook replica will subscribe to a primary, to its given --replicaof endpoint. The primary will then push any updates it receives to its registered replicas through the --backend-port.
42- /get: Returns '{"data": value}', where the value is the stored value for the given key if non-empty, or the entire store.
43- /set: Will set the given key-value pair in its own store and propagate it to its replicas, if any. Will return '{"data": "Updated"}' to the caller on success.
44- /guestbook: Will proxy the request to agnhost-primary if the given cmd is 'set', or agnhost-replica if the given cmd is 'get'.`,
45	Args: cobra.MaximumNArgs(0),
46	Run:  main,
47}
48
49var (
50	httpPort    string
51	backendPort string
52	replicaOf   string
53	replicas    []string
54	store       map[string]interface{}
55)
56
57const (
58	timeout = time.Duration(15) * time.Second
59	sleep   = time.Duration(1) * time.Second
60)
61
62func init() {
63	CmdGuestbook.Flags().StringVar(&httpPort, "http-port", "80", "HTTP Listen Port")
64	CmdGuestbook.Flags().StringVar(&backendPort, "backend-port", "6379", "Backend's HTTP Listen Port")
65	CmdGuestbook.Flags().StringVar(&replicaOf, "replicaof", "", "The host's name to register to")
66	store = make(map[string]interface{})
67}
68
69func main(cmd *cobra.Command, args []string) {
70	go registerNode(replicaOf, backendPort)
71	startHTTPServer(httpPort)
72}
73
74func registerNode(registerTo, port string) {
75	if registerTo == "" {
76		return
77	}
78
79	hostPort := net.JoinHostPort(registerTo, backendPort)
80
81	start := time.Now()
82	for time.Since(start) < timeout {
83		host, err := getIP(hostPort)
84		if err != nil {
85			log.Printf("unable to get IP %s: %v. Retrying in %s.", hostPort, err, sleep)
86			time.Sleep(sleep)
87			continue
88		}
89
90		request := fmt.Sprintf("register?host=%s", host.String())
91		log.Printf("Registering to primary: %s/%s", hostPort, request)
92		_, err = net.ResolveTCPAddr("tcp", hostPort)
93		if err != nil {
94			log.Printf("unable to resolve %s, --replicaof param and/or --backend-port param are invalid: %v. Retrying in %s.", hostPort, err, sleep)
95			time.Sleep(sleep)
96			continue
97		}
98
99		response, err := dialHTTP(request, hostPort)
100		if err != nil {
101			log.Printf("encountered error while registering to primary: %v. Retrying in %s.", err, sleep)
102			time.Sleep(sleep)
103			continue
104		}
105
106		responseJSON := make(map[string]interface{})
107		err = json.Unmarshal([]byte(response), &responseJSON)
108		if err != nil {
109			log.Fatalf("Error while unmarshaling primary's response: %v", err)
110		}
111
112		var ok bool
113		store, ok = responseJSON["data"].(map[string]interface{})
114		if !ok {
115			log.Fatalf("Could not cast responseJSON: %s", responseJSON["data"])
116		}
117		log.Printf("Registered to node: %s", registerTo)
118		return
119	}
120
121	log.Fatal("Timed out while registering to primary.")
122}
123
124func startHTTPServer(port string) {
125	http.HandleFunc("/register", registerHandler)
126	http.HandleFunc("/get", getHandler)
127	http.HandleFunc("/set", setHandler)
128	http.HandleFunc("/guestbook", guestbookHandler)
129	log.Fatal(http.ListenAndServe(fmt.Sprintf(":%s", port), nil))
130}
131
132// registerHandler will register the caller in this server's list of replicas.
133// /set requests will be propagated to replicas, if any.
134func registerHandler(w http.ResponseWriter, r *http.Request) {
135	values, err := url.Parse(r.URL.RequestURI())
136	if err != nil {
137		http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest)
138		return
139	}
140
141	ip := values.Query().Get("host")
142	log.Printf("GET /register?host=%s", ip)
143
144	// send all the store to the replica as well.
145	output := make(map[string]interface{})
146	output["data"] = store
147	bytes, err := json.Marshal(output)
148	if err != nil {
149		http.Error(w, fmt.Sprintf("response could not be serialized. %v", err), http.StatusExpectationFailed)
150		return
151	}
152	fmt.Fprint(w, string(bytes))
153	replicas = append(replicas, ip)
154	log.Printf("Node '%s' registered.", ip)
155}
156
157// getHandler will return '{"data": value}', where value is the stored value for
158// the given key if non-empty, or entire store.
159func getHandler(w http.ResponseWriter, r *http.Request) {
160	values, err := url.Parse(r.URL.RequestURI())
161	if err != nil {
162		http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest)
163		return
164	}
165
166	key := values.Query().Get("key")
167
168	log.Printf("GET /get?key=%s", key)
169
170	output := make(map[string]interface{})
171	if key == "" {
172		output["data"] = store
173	} else {
174		value, found := store[key]
175		if !found {
176			value = ""
177		}
178		output["data"] = value
179	}
180
181	bytes, err := json.Marshal(output)
182	if err == nil {
183		fmt.Fprint(w, string(bytes))
184	} else {
185		http.Error(w, fmt.Sprintf("response could not be serialized. %v", err), http.StatusExpectationFailed)
186	}
187}
188
189// setHandler will set the given key-value pair in its own store and propagate
190// it to its replicas, if any. Will return '{"message": "Updated"}' to the caller on success.
191func setHandler(w http.ResponseWriter, r *http.Request) {
192	values, err := url.Parse(r.URL.RequestURI())
193	if err != nil {
194		http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest)
195		return
196	}
197
198	key := values.Query().Get("key")
199	value := values.Query().Get("value")
200
201	log.Printf("GET /set?key=%s&value=%s", key, value)
202
203	if key == "" {
204		http.Error(w, "cannot set with empty key.", http.StatusBadRequest)
205		return
206	}
207
208	store[key] = value
209	request := fmt.Sprintf("set?key=%s&value=%s", key, value)
210	for _, replica := range replicas {
211		hostPort := net.JoinHostPort(replica, backendPort)
212		_, err = dialHTTP(request, hostPort)
213		if err != nil {
214			http.Error(w, fmt.Sprintf("encountered error while propagating to replica '%s': %v", replica, err), http.StatusExpectationFailed)
215			return
216		}
217	}
218
219	output := map[string]string{}
220	output["message"] = "Updated"
221	bytes, err := json.Marshal(output)
222	if err == nil {
223		fmt.Fprint(w, string(bytes))
224	} else {
225		http.Error(w, fmt.Sprintf("response could not be serialized. %v", err), http.StatusExpectationFailed)
226	}
227}
228
229// guestbookHandler will proxy the request to agnhost-primary if the given cmd is
230// 'set' or agnhost-replica if the given cmd is 'get'.
231func guestbookHandler(w http.ResponseWriter, r *http.Request) {
232	values, err := url.Parse(r.URL.RequestURI())
233	if err != nil {
234		http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest)
235		return
236	}
237
238	cmd := strings.ToLower(values.Query().Get("cmd"))
239	key := values.Query().Get("key")
240	value := values.Query().Get("value")
241
242	log.Printf("GET /guestbook?cmd=%s&key=%s&value=%s", cmd, key, value)
243
244	if cmd != "get" && cmd != "set" {
245		http.Error(w, fmt.Sprintf("unsupported cmd: '%s'", cmd), http.StatusBadRequest)
246		return
247	}
248	if cmd == "set" && key == "" {
249		http.Error(w, "cannot set with empty key.", http.StatusBadRequest)
250		return
251	}
252
253	host := "agnhost-primary"
254	if cmd == "get" {
255		host = "agnhost-replica"
256	}
257
258	hostPort := net.JoinHostPort(host, backendPort)
259	_, err = net.ResolveTCPAddr("tcp", hostPort)
260	if err != nil {
261		http.Error(w, fmt.Sprintf("host and/or port param are invalid. %v", err), http.StatusBadRequest)
262		return
263	}
264
265	request := fmt.Sprintf("%s?key=%s&value=%s", cmd, key, value)
266	response, err := dialHTTP(request, hostPort)
267	if err == nil {
268		fmt.Fprint(w, response)
269	} else {
270		http.Error(w, fmt.Sprintf("encountered error: %v", err), http.StatusExpectationFailed)
271	}
272}
273
274func dialHTTP(request, hostPort string) (string, error) {
275	transport := utilnet.SetTransportDefaults(&http.Transport{})
276	httpClient := createHTTPClient(transport)
277	resp, err := httpClient.Get(fmt.Sprintf("http://%s/%s", hostPort, request))
278	defer transport.CloseIdleConnections()
279	if err == nil {
280		defer resp.Body.Close()
281		body, err := ioutil.ReadAll(resp.Body)
282		if err == nil {
283			return string(body), nil
284		}
285	}
286	return "", err
287}
288
289func createHTTPClient(transport *http.Transport) *http.Client {
290	client := &http.Client{
291		Transport: transport,
292		Timeout:   5 * time.Second,
293	}
294	return client
295}
296
297func getIP(hostPort string) (net.IP, error) {
298	conn, err := net.Dial("udp", hostPort)
299	if err != nil {
300		return []byte{}, err
301	}
302	defer conn.Close()
303
304	localAddr := conn.LocalAddr().(*net.UDPAddr)
305	return localAddr.IP, nil
306}
307