1package notifications 2 3import ( 4 "crypto/tls" 5 "encoding/json" 6 "fmt" 7 "mime" 8 "net" 9 "net/http" 10 "net/http/httptest" 11 "reflect" 12 "strconv" 13 "strings" 14 "testing" 15 16 "github.com/docker/distribution/manifest/schema1" 17) 18 19// TestHTTPSink mocks out an http endpoint and notifies it under a couple of 20// conditions, ensuring correct behavior. 21func TestHTTPSink(t *testing.T) { 22 serverHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 23 defer r.Body.Close() 24 if r.Method != "POST" { 25 w.WriteHeader(http.StatusMethodNotAllowed) 26 t.Fatalf("unexpected request method: %v", r.Method) 27 return 28 } 29 30 // Extract the content type and make sure it matches 31 contentType := r.Header.Get("Content-Type") 32 mediaType, _, err := mime.ParseMediaType(contentType) 33 if err != nil { 34 w.WriteHeader(http.StatusBadRequest) 35 t.Fatalf("error parsing media type: %v, contenttype=%q", err, contentType) 36 return 37 } 38 39 if mediaType != EventsMediaType { 40 w.WriteHeader(http.StatusUnsupportedMediaType) 41 t.Fatalf("incorrect media type: %q != %q", mediaType, EventsMediaType) 42 return 43 } 44 45 var envelope Envelope 46 dec := json.NewDecoder(r.Body) 47 if err := dec.Decode(&envelope); err != nil { 48 w.WriteHeader(http.StatusBadRequest) 49 t.Fatalf("error decoding request body: %v", err) 50 return 51 } 52 53 // Let caller choose the status 54 status, err := strconv.Atoi(r.FormValue("status")) 55 if err != nil { 56 t.Logf("error parsing status: %v", err) 57 58 // May just be empty, set status to 200 59 status = http.StatusOK 60 } 61 62 w.WriteHeader(status) 63 }) 64 server := httptest.NewTLSServer(serverHandler) 65 66 metrics := newSafeMetrics() 67 sink := newHTTPSink(server.URL, 0, nil, nil, 68 &endpointMetricsHTTPStatusListener{safeMetrics: metrics}) 69 70 // first make sure that the default transport gives x509 untrusted cert error 71 events := []Event{} 72 err := sink.Write(events...) 73 if !strings.Contains(err.Error(), "x509") { 74 t.Fatal("TLS server with default transport should give unknown CA error") 75 } 76 if err := sink.Close(); err != nil { 77 t.Fatalf("unexpected error closing http sink: %v", err) 78 } 79 80 // make sure that passing in the transport no longer gives this error 81 tr := &http.Transport{ 82 TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, 83 } 84 sink = newHTTPSink(server.URL, 0, nil, tr, 85 &endpointMetricsHTTPStatusListener{safeMetrics: metrics}) 86 err = sink.Write(events...) 87 if err != nil { 88 t.Fatalf("unexpected error writing events: %v", err) 89 } 90 91 // reset server to standard http server and sink to a basic sink 92 server = httptest.NewServer(serverHandler) 93 sink = newHTTPSink(server.URL, 0, nil, nil, 94 &endpointMetricsHTTPStatusListener{safeMetrics: metrics}) 95 var expectedMetrics EndpointMetrics 96 expectedMetrics.Statuses = make(map[string]int) 97 98 closeL, err := net.Listen("tcp", "localhost:0") 99 if err != nil { 100 t.Fatalf("unexpected error creating listener: %v", err) 101 } 102 defer closeL.Close() 103 go func() { 104 for { 105 c, err := closeL.Accept() 106 if err != nil { 107 return 108 } 109 c.Close() 110 } 111 }() 112 113 for _, tc := range []struct { 114 events []Event // events to send 115 url string 116 failure bool // true if there should be a failure. 117 statusCode int // if not set, no status code should be incremented. 118 }{ 119 { 120 statusCode: http.StatusOK, 121 events: []Event{ 122 createTestEvent("push", "library/test", schema1.MediaTypeSignedManifest)}, 123 }, 124 { 125 statusCode: http.StatusOK, 126 events: []Event{ 127 createTestEvent("push", "library/test", schema1.MediaTypeSignedManifest), 128 createTestEvent("push", "library/test", layerMediaType), 129 createTestEvent("push", "library/test", layerMediaType), 130 }, 131 }, 132 { 133 statusCode: http.StatusTemporaryRedirect, 134 }, 135 { 136 statusCode: http.StatusBadRequest, 137 failure: true, 138 }, 139 { 140 // Case where connection is immediately closed 141 url: closeL.Addr().String(), 142 failure: true, 143 }, 144 } { 145 146 if tc.failure { 147 expectedMetrics.Failures += len(tc.events) 148 } else { 149 expectedMetrics.Successes += len(tc.events) 150 } 151 152 if tc.statusCode > 0 { 153 expectedMetrics.Statuses[fmt.Sprintf("%d %s", tc.statusCode, http.StatusText(tc.statusCode))] += len(tc.events) 154 } 155 156 url := tc.url 157 if url == "" { 158 url = server.URL + "/" 159 } 160 // setup endpoint to respond with expected status code. 161 url += fmt.Sprintf("?status=%v", tc.statusCode) 162 sink.url = url 163 164 t.Logf("testcase: %v, fail=%v", url, tc.failure) 165 // Try a simple event emission. 166 err := sink.Write(tc.events...) 167 168 if !tc.failure { 169 if err != nil { 170 t.Fatalf("unexpected error send event: %v", err) 171 } 172 } else { 173 if err == nil { 174 t.Fatalf("the endpoint should have rejected the request") 175 } 176 } 177 178 if !reflect.DeepEqual(metrics.EndpointMetrics, expectedMetrics) { 179 t.Fatalf("metrics not as expected: %#v != %#v", metrics.EndpointMetrics, expectedMetrics) 180 } 181 } 182 183 if err := sink.Close(); err != nil { 184 t.Fatalf("unexpected error closing http sink: %v", err) 185 } 186 187 // double close returns error 188 if err := sink.Close(); err == nil { 189 t.Fatalf("second close should have returned error: %v", err) 190 } 191 192} 193 194func createTestEvent(action, repo, typ string) Event { 195 event := createEvent(action) 196 197 event.Target.MediaType = typ 198 event.Target.Repository = repo 199 200 return *event 201} 202