1/* 2Copyright 2015 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spdy 18 19import ( 20 "net" 21 "net/http" 22 "sync" 23 "time" 24 25 "github.com/docker/spdystream" 26 "k8s.io/apimachinery/pkg/util/httpstream" 27 "k8s.io/klog" 28) 29 30// connection maintains state about a spdystream.Connection and its associated 31// streams. 32type connection struct { 33 conn *spdystream.Connection 34 streams []httpstream.Stream 35 streamLock sync.Mutex 36 newStreamHandler httpstream.NewStreamHandler 37} 38 39// NewClientConnection creates a new SPDY client connection. 40func NewClientConnection(conn net.Conn) (httpstream.Connection, error) { 41 spdyConn, err := spdystream.NewConnection(conn, false) 42 if err != nil { 43 defer conn.Close() 44 return nil, err 45 } 46 47 return newConnection(spdyConn, httpstream.NoOpNewStreamHandler), nil 48} 49 50// NewServerConnection creates a new SPDY server connection. newStreamHandler 51// will be invoked when the server receives a newly created stream from the 52// client. 53func NewServerConnection(conn net.Conn, newStreamHandler httpstream.NewStreamHandler) (httpstream.Connection, error) { 54 spdyConn, err := spdystream.NewConnection(conn, true) 55 if err != nil { 56 defer conn.Close() 57 return nil, err 58 } 59 60 return newConnection(spdyConn, newStreamHandler), nil 61} 62 63// newConnection returns a new connection wrapping conn. newStreamHandler 64// will be invoked when the server receives a newly created stream from the 65// client. 66func newConnection(conn *spdystream.Connection, newStreamHandler httpstream.NewStreamHandler) httpstream.Connection { 67 c := &connection{conn: conn, newStreamHandler: newStreamHandler} 68 go conn.Serve(c.newSpdyStream) 69 return c 70} 71 72// createStreamResponseTimeout indicates how long to wait for the other side to 73// acknowledge the new stream before timing out. 74const createStreamResponseTimeout = 30 * time.Second 75 76// Close first sends a reset for all of the connection's streams, and then 77// closes the underlying spdystream.Connection. 78func (c *connection) Close() error { 79 c.streamLock.Lock() 80 for _, s := range c.streams { 81 // calling Reset instead of Close ensures that all streams are fully torn down 82 s.Reset() 83 } 84 c.streams = make([]httpstream.Stream, 0) 85 c.streamLock.Unlock() 86 87 // now that all streams are fully torn down, it's safe to call close on the underlying connection, 88 // which should be able to terminate immediately at this point, instead of waiting for any 89 // remaining graceful stream termination. 90 return c.conn.Close() 91} 92 93// CreateStream creates a new stream with the specified headers and registers 94// it with the connection. 95func (c *connection) CreateStream(headers http.Header) (httpstream.Stream, error) { 96 stream, err := c.conn.CreateStream(headers, nil, false) 97 if err != nil { 98 return nil, err 99 } 100 if err = stream.WaitTimeout(createStreamResponseTimeout); err != nil { 101 return nil, err 102 } 103 104 c.registerStream(stream) 105 return stream, nil 106} 107 108// registerStream adds the stream s to the connection's list of streams that 109// it owns. 110func (c *connection) registerStream(s httpstream.Stream) { 111 c.streamLock.Lock() 112 c.streams = append(c.streams, s) 113 c.streamLock.Unlock() 114} 115 116// CloseChan returns a channel that, when closed, indicates that the underlying 117// spdystream.Connection has been closed. 118func (c *connection) CloseChan() <-chan bool { 119 return c.conn.CloseChan() 120} 121 122// newSpdyStream is the internal new stream handler used by spdystream.Connection.Serve. 123// It calls connection's newStreamHandler, giving it the opportunity to accept or reject 124// the stream. If newStreamHandler returns an error, the stream is rejected. If not, the 125// stream is accepted and registered with the connection. 126func (c *connection) newSpdyStream(stream *spdystream.Stream) { 127 replySent := make(chan struct{}) 128 err := c.newStreamHandler(stream, replySent) 129 rejectStream := (err != nil) 130 if rejectStream { 131 klog.Warningf("Stream rejected: %v", err) 132 stream.Reset() 133 return 134 } 135 136 c.registerStream(stream) 137 stream.SendReply(http.Header{}, rejectStream) 138 close(replySent) 139} 140 141// SetIdleTimeout sets the amount of time the connection may remain idle before 142// it is automatically closed. 143func (c *connection) SetIdleTimeout(timeout time.Duration) { 144 c.conn.SetIdleTimeout(timeout) 145} 146