1package agent 2 3import ( 4 "encoding/base64" 5 "fmt" 6 "net/http" 7 "strings" 8 9 "github.com/hashicorp/consul/agent/structs" 10 "github.com/hashicorp/consul/api" 11) 12 13const ( 14 // maxTxnOps is used to set an upper limit on the number of operations 15 // inside a transaction. If there are more operations than this, then the 16 // client is likely abusing transactions. 17 maxTxnOps = 64 18) 19 20// decodeValue decodes the value member of the given operation. 21func decodeValue(rawKV interface{}) error { 22 rawMap, ok := rawKV.(map[string]interface{}) 23 if !ok { 24 return fmt.Errorf("unexpected raw KV type: %T", rawKV) 25 } 26 for k, v := range rawMap { 27 switch strings.ToLower(k) { 28 case "value": 29 // Leave the byte slice nil if we have a nil 30 // value. 31 if v == nil { 32 return nil 33 } 34 35 // Otherwise, base64 decode it. 36 s, ok := v.(string) 37 if !ok { 38 return fmt.Errorf("unexpected value type: %T", v) 39 } 40 decoded, err := base64.StdEncoding.DecodeString(s) 41 if err != nil { 42 return fmt.Errorf("failed to decode value: %v", err) 43 } 44 rawMap[k] = decoded 45 return nil 46 } 47 } 48 return nil 49} 50 51// fixupKVOp looks for non-nil KV operations and passes them on for 52// value conversion. 53func fixupKVOp(rawOp interface{}) error { 54 rawMap, ok := rawOp.(map[string]interface{}) 55 if !ok { 56 return fmt.Errorf("unexpected raw op type: %T", rawOp) 57 } 58 for k, v := range rawMap { 59 switch strings.ToLower(k) { 60 case "kv": 61 if v == nil { 62 return nil 63 } 64 return decodeValue(v) 65 } 66 } 67 return nil 68} 69 70// fixupKVOps takes the raw decoded JSON and base64 decodes values in KV ops, 71// replacing them with byte arrays. 72func fixupKVOps(raw interface{}) error { 73 rawSlice, ok := raw.([]interface{}) 74 if !ok { 75 return fmt.Errorf("unexpected raw type: %t", raw) 76 } 77 for _, rawOp := range rawSlice { 78 if err := fixupKVOp(rawOp); err != nil { 79 return err 80 } 81 } 82 return nil 83} 84 85// isWrite returns true if the given operation alters the state store. 86func isWrite(op api.KVOp) bool { 87 switch op { 88 case api.KVSet, api.KVDelete, api.KVDeleteCAS, api.KVDeleteTree, api.KVCAS, api.KVLock, api.KVUnlock: 89 return true 90 } 91 return false 92} 93 94// convertOps takes the incoming body in API format and converts it to the 95// internal RPC format. This returns a count of the number of write ops, and 96// a boolean, that if false means an error response has been generated and 97// processing should stop. 98func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (structs.TxnOps, int, bool) { 99 // Note the body is in API format, and not the RPC format. If we can't 100 // decode it, we will return a 400 since we don't have enough context to 101 // associate the error with a given operation. 102 var ops api.TxnOps 103 if err := decodeBody(req, &ops, fixupKVOps); err != nil { 104 resp.WriteHeader(http.StatusBadRequest) 105 fmt.Fprintf(resp, "Failed to parse body: %v", err) 106 return nil, 0, false 107 } 108 109 // Enforce a reasonable upper limit on the number of operations in a 110 // transaction in order to curb abuse. 111 if size := len(ops); size > maxTxnOps { 112 resp.WriteHeader(http.StatusRequestEntityTooLarge) 113 fmt.Fprintf(resp, "Transaction contains too many operations (%d > %d)", 114 size, maxTxnOps) 115 116 return nil, 0, false 117 } 118 119 // Convert the KV API format into the RPC format. Note that fixupKVOps 120 // above will have already converted the base64 encoded strings into 121 // byte arrays so we can assign right over. 122 var opsRPC structs.TxnOps 123 var writes int 124 var netKVSize int 125 for _, in := range ops { 126 if in.KV != nil { 127 size := len(in.KV.Value) 128 if size > maxKVSize { 129 resp.WriteHeader(http.StatusRequestEntityTooLarge) 130 fmt.Fprintf(resp, "Value for key %q is too large (%d > %d bytes)", in.KV.Key, size, maxKVSize) 131 return nil, 0, false 132 } 133 netKVSize += size 134 135 verb := api.KVOp(in.KV.Verb) 136 if isWrite(verb) { 137 writes++ 138 } 139 140 out := &structs.TxnOp{ 141 KV: &structs.TxnKVOp{ 142 Verb: verb, 143 DirEnt: structs.DirEntry{ 144 Key: in.KV.Key, 145 Value: in.KV.Value, 146 Flags: in.KV.Flags, 147 Session: in.KV.Session, 148 RaftIndex: structs.RaftIndex{ 149 ModifyIndex: in.KV.Index, 150 }, 151 }, 152 }, 153 } 154 opsRPC = append(opsRPC, out) 155 } 156 } 157 158 // Enforce an overall size limit to help prevent abuse. 159 if netKVSize > maxKVSize { 160 resp.WriteHeader(http.StatusRequestEntityTooLarge) 161 fmt.Fprintf(resp, "Cumulative size of key data is too large (%d > %d bytes)", 162 netKVSize, maxKVSize) 163 164 return nil, 0, false 165 } 166 167 return opsRPC, writes, true 168} 169 170// Txn handles requests to apply multiple operations in a single, atomic 171// transaction. A transaction consisting of only read operations will be fast- 172// pathed to an endpoint that supports consistency modes (but not blocking), 173// and everything else will be routed through Raft like a normal write. 174func (s *HTTPServer) Txn(resp http.ResponseWriter, req *http.Request) (interface{}, error) { 175 if req.Method != "PUT" { 176 return nil, MethodNotAllowedError{req.Method, []string{"PUT"}} 177 } 178 179 // Convert the ops from the API format to the internal format. 180 ops, writes, ok := s.convertOps(resp, req) 181 if !ok { 182 return nil, nil 183 } 184 185 // Fast-path a transaction with only writes to the read-only endpoint, 186 // which bypasses Raft, and allows for staleness. 187 conflict := false 188 var ret interface{} 189 if writes == 0 { 190 args := structs.TxnReadRequest{Ops: ops} 191 if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { 192 return nil, nil 193 } 194 195 var reply structs.TxnReadResponse 196 if err := s.agent.RPC("Txn.Read", &args, &reply); err != nil { 197 return nil, err 198 } 199 200 // Since we don't do blocking, we only add the relevant headers 201 // for metadata. 202 setLastContact(resp, reply.LastContact) 203 setKnownLeader(resp, reply.KnownLeader) 204 205 ret, conflict = reply, len(reply.Errors) > 0 206 } else { 207 args := structs.TxnRequest{Ops: ops} 208 s.parseDC(req, &args.Datacenter) 209 s.parseToken(req, &args.Token) 210 211 var reply structs.TxnResponse 212 if err := s.agent.RPC("Txn.Apply", &args, &reply); err != nil { 213 return nil, err 214 } 215 ret, conflict = reply, len(reply.Errors) > 0 216 } 217 218 // If there was a conflict return the response object but set a special 219 // status code. 220 if conflict { 221 var buf []byte 222 var err error 223 buf, err = s.marshalJSON(req, ret) 224 if err != nil { 225 return nil, err 226 } 227 228 resp.Header().Set("Content-Type", "application/json") 229 resp.WriteHeader(http.StatusConflict) 230 resp.Write(buf) 231 return nil, nil 232 } 233 234 // Otherwise, return the results of the successful transaction. 235 return ret, nil 236} 237