1/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spdy
18
19import (
20	"net"
21	"net/http"
22	"sync"
23	"time"
24
25	"github.com/docker/spdystream"
26	"k8s.io/apimachinery/pkg/util/httpstream"
27	"k8s.io/klog"
28)
29
30// connection maintains state about a spdystream.Connection and its associated
31// streams.
32type connection struct {
33	conn             *spdystream.Connection
34	streams          []httpstream.Stream
35	streamLock       sync.Mutex
36	newStreamHandler httpstream.NewStreamHandler
37}
38
39// NewClientConnection creates a new SPDY client connection.
40func NewClientConnection(conn net.Conn) (httpstream.Connection, error) {
41	spdyConn, err := spdystream.NewConnection(conn, false)
42	if err != nil {
43		defer conn.Close()
44		return nil, err
45	}
46
47	return newConnection(spdyConn, httpstream.NoOpNewStreamHandler), nil
48}
49
50// NewServerConnection creates a new SPDY server connection. newStreamHandler
51// will be invoked when the server receives a newly created stream from the
52// client.
53func NewServerConnection(conn net.Conn, newStreamHandler httpstream.NewStreamHandler) (httpstream.Connection, error) {
54	spdyConn, err := spdystream.NewConnection(conn, true)
55	if err != nil {
56		defer conn.Close()
57		return nil, err
58	}
59
60	return newConnection(spdyConn, newStreamHandler), nil
61}
62
63// newConnection returns a new connection wrapping conn. newStreamHandler
64// will be invoked when the server receives a newly created stream from the
65// client.
66func newConnection(conn *spdystream.Connection, newStreamHandler httpstream.NewStreamHandler) httpstream.Connection {
67	c := &connection{conn: conn, newStreamHandler: newStreamHandler}
68	go conn.Serve(c.newSpdyStream)
69	return c
70}
71
72// createStreamResponseTimeout indicates how long to wait for the other side to
73// acknowledge the new stream before timing out.
74const createStreamResponseTimeout = 30 * time.Second
75
76// Close first sends a reset for all of the connection's streams, and then
77// closes the underlying spdystream.Connection.
78func (c *connection) Close() error {
79	c.streamLock.Lock()
80	for _, s := range c.streams {
81		// calling Reset instead of Close ensures that all streams are fully torn down
82		s.Reset()
83	}
84	c.streams = make([]httpstream.Stream, 0)
85	c.streamLock.Unlock()
86
87	// now that all streams are fully torn down, it's safe to call close on the underlying connection,
88	// which should be able to terminate immediately at this point, instead of waiting for any
89	// remaining graceful stream termination.
90	return c.conn.Close()
91}
92
93// CreateStream creates a new stream with the specified headers and registers
94// it with the connection.
95func (c *connection) CreateStream(headers http.Header) (httpstream.Stream, error) {
96	stream, err := c.conn.CreateStream(headers, nil, false)
97	if err != nil {
98		return nil, err
99	}
100	if err = stream.WaitTimeout(createStreamResponseTimeout); err != nil {
101		return nil, err
102	}
103
104	c.registerStream(stream)
105	return stream, nil
106}
107
108// registerStream adds the stream s to the connection's list of streams that
109// it owns.
110func (c *connection) registerStream(s httpstream.Stream) {
111	c.streamLock.Lock()
112	c.streams = append(c.streams, s)
113	c.streamLock.Unlock()
114}
115
116// CloseChan returns a channel that, when closed, indicates that the underlying
117// spdystream.Connection has been closed.
118func (c *connection) CloseChan() <-chan bool {
119	return c.conn.CloseChan()
120}
121
122// newSpdyStream is the internal new stream handler used by spdystream.Connection.Serve.
123// It calls connection's newStreamHandler, giving it the opportunity to accept or reject
124// the stream. If newStreamHandler returns an error, the stream is rejected. If not, the
125// stream is accepted and registered with the connection.
126func (c *connection) newSpdyStream(stream *spdystream.Stream) {
127	replySent := make(chan struct{})
128	err := c.newStreamHandler(stream, replySent)
129	rejectStream := (err != nil)
130	if rejectStream {
131		klog.Warningf("Stream rejected: %v", err)
132		stream.Reset()
133		return
134	}
135
136	c.registerStream(stream)
137	stream.SendReply(http.Header{}, rejectStream)
138	close(replySent)
139}
140
141// SetIdleTimeout sets the amount of time the connection may remain idle before
142// it is automatically closed.
143func (c *connection) SetIdleTimeout(timeout time.Duration) {
144	c.conn.SetIdleTimeout(timeout)
145}
146