1/* 2Copyright 2019 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package guestbook 18 19import ( 20 "encoding/json" 21 "fmt" 22 "io/ioutil" 23 "log" 24 "net" 25 "net/http" 26 "net/url" 27 "strings" 28 "time" 29 30 "github.com/spf13/cobra" 31 32 utilnet "k8s.io/apimachinery/pkg/util/net" 33) 34 35// CmdGuestbook is used by agnhost Cobra. 36var CmdGuestbook = &cobra.Command{ 37 Use: "guestbook", 38 Short: "Creates a HTTP server with various endpoints representing a guestbook app", 39 Long: `Starts a HTTP server on the given --http-port (default: 80), serving various endpoints representing a guestbook app. The endpoints and their purpose are: 40 41- /register: A guestbook replica will subscribe to a primary, to its given --replicaof endpoint. The primary will then push any updates it receives to its registered replicas through the --backend-port. 42- /get: Returns '{"data": value}', where the value is the stored value for the given key if non-empty, or the entire store. 43- /set: Will set the given key-value pair in its own store and propagate it to its replicas, if any. Will return '{"data": "Updated"}' to the caller on success. 44- /guestbook: Will proxy the request to agnhost-primary if the given cmd is 'set', or agnhost-replica if the given cmd is 'get'.`, 45 Args: cobra.MaximumNArgs(0), 46 Run: main, 47} 48 49var ( 50 httpPort string 51 backendPort string 52 replicaOf string 53 replicas []string 54 store map[string]interface{} 55) 56 57const ( 58 timeout = time.Duration(15) * time.Second 59 sleep = time.Duration(1) * time.Second 60) 61 62func init() { 63 CmdGuestbook.Flags().StringVar(&httpPort, "http-port", "80", "HTTP Listen Port") 64 CmdGuestbook.Flags().StringVar(&backendPort, "backend-port", "6379", "Backend's HTTP Listen Port") 65 CmdGuestbook.Flags().StringVar(&replicaOf, "replicaof", "", "The host's name to register to") 66 store = make(map[string]interface{}) 67} 68 69func main(cmd *cobra.Command, args []string) { 70 go registerNode(replicaOf, backendPort) 71 startHTTPServer(httpPort) 72} 73 74func registerNode(registerTo, port string) { 75 if registerTo == "" { 76 return 77 } 78 79 hostPort := net.JoinHostPort(registerTo, backendPort) 80 81 start := time.Now() 82 for time.Since(start) < timeout { 83 host, err := getIP(hostPort) 84 if err != nil { 85 log.Printf("unable to get IP %s: %v. Retrying in %s.", hostPort, err, sleep) 86 time.Sleep(sleep) 87 continue 88 } 89 90 request := fmt.Sprintf("register?host=%s", host.String()) 91 log.Printf("Registering to primary: %s/%s", hostPort, request) 92 _, err = net.ResolveTCPAddr("tcp", hostPort) 93 if err != nil { 94 log.Printf("unable to resolve %s, --replicaof param and/or --backend-port param are invalid: %v. Retrying in %s.", hostPort, err, sleep) 95 time.Sleep(sleep) 96 continue 97 } 98 99 response, err := dialHTTP(request, hostPort) 100 if err != nil { 101 log.Printf("encountered error while registering to primary: %v. Retrying in %s.", err, sleep) 102 time.Sleep(sleep) 103 continue 104 } 105 106 responseJSON := make(map[string]interface{}) 107 err = json.Unmarshal([]byte(response), &responseJSON) 108 if err != nil { 109 log.Fatalf("Error while unmarshaling primary's response: %v", err) 110 } 111 112 var ok bool 113 store, ok = responseJSON["data"].(map[string]interface{}) 114 if !ok { 115 log.Fatalf("Could not cast responseJSON: %s", responseJSON["data"]) 116 } 117 log.Printf("Registered to node: %s", registerTo) 118 return 119 } 120 121 log.Fatal("Timed out while registering to primary.") 122} 123 124func startHTTPServer(port string) { 125 http.HandleFunc("/register", registerHandler) 126 http.HandleFunc("/get", getHandler) 127 http.HandleFunc("/set", setHandler) 128 http.HandleFunc("/guestbook", guestbookHandler) 129 log.Fatal(http.ListenAndServe(fmt.Sprintf(":%s", port), nil)) 130} 131 132// registerHandler will register the caller in this server's list of replicas. 133// /set requests will be propagated to replicas, if any. 134func registerHandler(w http.ResponseWriter, r *http.Request) { 135 values, err := url.Parse(r.URL.RequestURI()) 136 if err != nil { 137 http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest) 138 return 139 } 140 141 ip := values.Query().Get("host") 142 log.Printf("GET /register?host=%s", ip) 143 144 // send all the store to the replica as well. 145 output := make(map[string]interface{}) 146 output["data"] = store 147 bytes, err := json.Marshal(output) 148 if err != nil { 149 http.Error(w, fmt.Sprintf("response could not be serialized. %v", err), http.StatusExpectationFailed) 150 return 151 } 152 fmt.Fprint(w, string(bytes)) 153 replicas = append(replicas, ip) 154 log.Printf("Node '%s' registered.", ip) 155} 156 157// getHandler will return '{"data": value}', where value is the stored value for 158// the given key if non-empty, or entire store. 159func getHandler(w http.ResponseWriter, r *http.Request) { 160 values, err := url.Parse(r.URL.RequestURI()) 161 if err != nil { 162 http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest) 163 return 164 } 165 166 key := values.Query().Get("key") 167 168 log.Printf("GET /get?key=%s", key) 169 170 output := make(map[string]interface{}) 171 if key == "" { 172 output["data"] = store 173 } else { 174 value, found := store[key] 175 if !found { 176 value = "" 177 } 178 output["data"] = value 179 } 180 181 bytes, err := json.Marshal(output) 182 if err == nil { 183 fmt.Fprint(w, string(bytes)) 184 } else { 185 http.Error(w, fmt.Sprintf("response could not be serialized. %v", err), http.StatusExpectationFailed) 186 } 187} 188 189// setHandler will set the given key-value pair in its own store and propagate 190// it to its replicas, if any. Will return '{"message": "Updated"}' to the caller on success. 191func setHandler(w http.ResponseWriter, r *http.Request) { 192 values, err := url.Parse(r.URL.RequestURI()) 193 if err != nil { 194 http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest) 195 return 196 } 197 198 key := values.Query().Get("key") 199 value := values.Query().Get("value") 200 201 log.Printf("GET /set?key=%s&value=%s", key, value) 202 203 if key == "" { 204 http.Error(w, "cannot set with empty key.", http.StatusBadRequest) 205 return 206 } 207 208 store[key] = value 209 request := fmt.Sprintf("set?key=%s&value=%s", key, value) 210 for _, replica := range replicas { 211 hostPort := net.JoinHostPort(replica, backendPort) 212 _, err = dialHTTP(request, hostPort) 213 if err != nil { 214 http.Error(w, fmt.Sprintf("encountered error while propagating to replica '%s': %v", replica, err), http.StatusExpectationFailed) 215 return 216 } 217 } 218 219 output := map[string]string{} 220 output["message"] = "Updated" 221 bytes, err := json.Marshal(output) 222 if err == nil { 223 fmt.Fprint(w, string(bytes)) 224 } else { 225 http.Error(w, fmt.Sprintf("response could not be serialized. %v", err), http.StatusExpectationFailed) 226 } 227} 228 229// guestbookHandler will proxy the request to agnhost-primary if the given cmd is 230// 'set' or agnhost-replica if the given cmd is 'get'. 231func guestbookHandler(w http.ResponseWriter, r *http.Request) { 232 values, err := url.Parse(r.URL.RequestURI()) 233 if err != nil { 234 http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest) 235 return 236 } 237 238 cmd := strings.ToLower(values.Query().Get("cmd")) 239 key := values.Query().Get("key") 240 value := values.Query().Get("value") 241 242 log.Printf("GET /guestbook?cmd=%s&key=%s&value=%s", cmd, key, value) 243 244 if cmd != "get" && cmd != "set" { 245 http.Error(w, fmt.Sprintf("unsupported cmd: '%s'", cmd), http.StatusBadRequest) 246 return 247 } 248 if cmd == "set" && key == "" { 249 http.Error(w, "cannot set with empty key.", http.StatusBadRequest) 250 return 251 } 252 253 host := "agnhost-primary" 254 if cmd == "get" { 255 host = "agnhost-replica" 256 } 257 258 hostPort := net.JoinHostPort(host, backendPort) 259 _, err = net.ResolveTCPAddr("tcp", hostPort) 260 if err != nil { 261 http.Error(w, fmt.Sprintf("host and/or port param are invalid. %v", err), http.StatusBadRequest) 262 return 263 } 264 265 request := fmt.Sprintf("%s?key=%s&value=%s", cmd, key, value) 266 response, err := dialHTTP(request, hostPort) 267 if err == nil { 268 fmt.Fprint(w, response) 269 } else { 270 http.Error(w, fmt.Sprintf("encountered error: %v", err), http.StatusExpectationFailed) 271 } 272} 273 274func dialHTTP(request, hostPort string) (string, error) { 275 transport := utilnet.SetTransportDefaults(&http.Transport{}) 276 httpClient := createHTTPClient(transport) 277 resp, err := httpClient.Get(fmt.Sprintf("http://%s/%s", hostPort, request)) 278 defer transport.CloseIdleConnections() 279 if err == nil { 280 defer resp.Body.Close() 281 body, err := ioutil.ReadAll(resp.Body) 282 if err == nil { 283 return string(body), nil 284 } 285 } 286 return "", err 287} 288 289func createHTTPClient(transport *http.Transport) *http.Client { 290 client := &http.Client{ 291 Transport: transport, 292 Timeout: 5 * time.Second, 293 } 294 return client 295} 296 297func getIP(hostPort string) (net.IP, error) { 298 conn, err := net.Dial("udp", hostPort) 299 if err != nil { 300 return []byte{}, err 301 } 302 defer conn.Close() 303 304 localAddr := conn.LocalAddr().(*net.UDPAddr) 305 return localAddr.IP, nil 306} 307