1/* 2Copyright 2016 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spdy 18 19import ( 20 "io" 21 "net" 22 "net/http" 23 "sync" 24 "testing" 25 "time" 26 27 "k8s.io/apimachinery/pkg/util/httpstream" 28) 29 30func runProxy(t *testing.T, backendUrl string, proxyUrl chan<- string, proxyDone chan<- struct{}, errCh chan<- error) { 31 listener, err := net.Listen("tcp4", "localhost:0") 32 if err != nil { 33 errCh <- err 34 return 35 } 36 defer listener.Close() 37 38 proxyUrl <- listener.Addr().String() 39 40 clientConn, err := listener.Accept() 41 if err != nil { 42 t.Errorf("proxy: error accepting client connection: %v", err) 43 return 44 } 45 46 backendConn, err := net.Dial("tcp4", backendUrl) 47 if err != nil { 48 t.Errorf("proxy: error dialing backend: %v", err) 49 return 50 } 51 defer backendConn.Close() 52 53 var wg sync.WaitGroup 54 wg.Add(2) 55 56 go func() { 57 defer wg.Done() 58 io.Copy(backendConn, clientConn) 59 }() 60 61 go func() { 62 defer wg.Done() 63 io.Copy(clientConn, backendConn) 64 }() 65 66 wg.Wait() 67 68 proxyDone <- struct{}{} 69} 70 71func runServer(t *testing.T, backendUrl chan<- string, serverDone chan<- struct{}, errCh chan<- error) { 72 listener, err := net.Listen("tcp4", "localhost:0") 73 if err != nil { 74 errCh <- err 75 return 76 } 77 defer listener.Close() 78 79 backendUrl <- listener.Addr().String() 80 81 conn, err := listener.Accept() 82 if err != nil { 83 t.Errorf("server: error accepting connection: %v", err) 84 return 85 } 86 87 streamChan := make(chan httpstream.Stream) 88 replySentChan := make(chan (<-chan struct{})) 89 spdyConn, err := NewServerConnection(conn, func(stream httpstream.Stream, replySent <-chan struct{}) error { 90 streamChan <- stream 91 replySentChan <- replySent 92 return nil 93 }) 94 if err != nil { 95 t.Errorf("server: error creating spdy connection: %v", err) 96 return 97 } 98 99 stream := <-streamChan 100 replySent := <-replySentChan 101 <-replySent 102 103 buf := make([]byte, 1) 104 _, err = stream.Read(buf) 105 if err != io.EOF { 106 t.Errorf("server: unexpected read error: %v", err) 107 return 108 } 109 110 <-spdyConn.CloseChan() 111 raw := spdyConn.(*connection).conn 112 if err := raw.Wait(15 * time.Second); err != nil { 113 t.Errorf("server: timed out waiting for connection closure: %v", err) 114 } 115 116 serverDone <- struct{}{} 117} 118 119func TestConnectionCloseIsImmediateThroughAProxy(t *testing.T) { 120 errCh := make(chan error) 121 122 serverDone := make(chan struct{}, 1) 123 backendUrlChan := make(chan string) 124 go runServer(t, backendUrlChan, serverDone, errCh) 125 126 var backendUrl string 127 select { 128 case err := <-errCh: 129 t.Fatalf("server: error listening: %v", err) 130 case backendUrl = <-backendUrlChan: 131 } 132 133 proxyDone := make(chan struct{}, 1) 134 proxyUrlChan := make(chan string) 135 go runProxy(t, backendUrl, proxyUrlChan, proxyDone, errCh) 136 137 var proxyUrl string 138 select { 139 case err := <-errCh: 140 t.Fatalf("error listening: %v", err) 141 case proxyUrl = <-proxyUrlChan: 142 } 143 144 conn, err := net.Dial("tcp4", proxyUrl) 145 if err != nil { 146 t.Fatalf("client: error connecting to proxy: %v", err) 147 } 148 149 spdyConn, err := NewClientConnection(conn) 150 if err != nil { 151 t.Fatalf("client: error creating spdy connection: %v", err) 152 } 153 154 if _, err := spdyConn.CreateStream(http.Header{}); err != nil { 155 t.Fatalf("client: error creating stream: %v", err) 156 } 157 158 spdyConn.Close() 159 raw := spdyConn.(*connection).conn 160 if err := raw.Wait(15 * time.Second); err != nil { 161 t.Fatalf("client: timed out waiting for connection closure: %v", err) 162 } 163 164 expired := time.NewTimer(15 * time.Second) 165 defer expired.Stop() 166 i := 0 167 for { 168 select { 169 case <-expired.C: 170 t.Fatalf("timed out waiting for proxy and/or server closure") 171 case <-serverDone: 172 i++ 173 case <-proxyDone: 174 i++ 175 } 176 if i == 2 { 177 break 178 } 179 } 180} 181