1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package flight
18
19import (
20	"net"
21	"os"
22	"os/signal"
23
24	"google.golang.org/grpc"
25)
26
27// Server is an interface for hiding some of the grpc specifics to make
28// it slightly easier to manage a flight service, slightly modeled after
29// the C++ implementation
30type Server interface {
31	// Init takes in the address to bind to and creates the listener
32	Init(addr string) error
33	// Addr will return the address that was bound to for the service to listen on
34	Addr() net.Addr
35	// SetShutdownOnSignals sets notifications on the given signals to call GracefulStop
36	// on the grpc service if any of those signals are received
37	SetShutdownOnSignals(sig ...os.Signal)
38	// Serve blocks until accepting a connection fails with a fatal error. It will return
39	// a non-nil error unless it stopped due to calling Shutdown or receiving one of the
40	// signals set in SetShutdownOnSignals
41	Serve() error
42	// Shutdown will call GracefulStop on the grpc server so that it stops accepting connections
43	// and will wait until current methods complete
44	Shutdown()
45	// RegisterFlightService sets up the handler for the Flight Endpoints as per
46	// normal Grpc setups
47	RegisterFlightService(*FlightServiceService)
48}
49
50type server struct {
51	lis        net.Listener
52	sigChannel <-chan os.Signal
53	done       chan bool
54
55	authHandler ServerAuthHandler
56	server      *grpc.Server
57}
58
59// NewFlightServer takes in an auth handler for managing the handshake authentication
60// and any grpc Server options desired, such as TLS certs and so on which will just
61// be passed through to the underlying grpc server.
62//
63// Alternatively, a grpc server can be created normally without this helper as the
64// grpc server generated code is still being exported. This only exists to allow
65// the utility of the helpers
66func NewFlightServer(auth ServerAuthHandler, opt ...grpc.ServerOption) Server {
67	if auth != nil {
68		opt = append([]grpc.ServerOption{
69			grpc.ChainStreamInterceptor(createServerAuthStreamInterceptor(auth)),
70			grpc.ChainUnaryInterceptor(createServerAuthUnaryInterceptor(auth)),
71		}, opt...)
72	}
73
74	return &server{
75		authHandler: auth,
76		server:      grpc.NewServer(opt...),
77	}
78}
79
80func (s *server) Init(addr string) (err error) {
81	s.lis, err = net.Listen("tcp", addr)
82	return
83}
84
85func (s *server) Addr() net.Addr {
86	return s.lis.Addr()
87}
88
89func (s *server) SetShutdownOnSignals(sig ...os.Signal) {
90	c := make(chan os.Signal, 1)
91	signal.Notify(c, sig...)
92	s.sigChannel = c
93}
94
95func (s *server) Serve() error {
96	s.done = make(chan bool)
97	go func() {
98		select {
99		case <-s.sigChannel:
100			s.server.GracefulStop()
101		case <-s.done:
102		}
103	}()
104	err := s.server.Serve(s.lis)
105	close(s.done)
106	return err
107}
108
109func (s *server) RegisterFlightService(svc *FlightServiceService) {
110	if svc.Handshake == nil {
111		svc.Handshake = s.handshake
112	}
113	RegisterFlightServiceService(s.server, svc)
114}
115
116func (s *server) Shutdown() {
117	s.server.GracefulStop()
118}
119