1// Licensed to the Apache Software Foundation (ASF) under one 2// or more contributor license agreements. See the NOTICE file 3// distributed with this work for additional information 4// regarding copyright ownership. The ASF licenses this file 5// to you under the Apache License, Version 2.0 (the 6// "License"); you may not use this file except in compliance 7// with the License. You may obtain a copy of the License at 8// 9// http://www.apache.org/licenses/LICENSE-2.0 10// 11// Unless required by applicable law or agreed to in writing, software 12// distributed under the License is distributed on an "AS IS" BASIS, 13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14// See the License for the specific language governing permissions and 15// limitations under the License. 16 17package flight 18 19import ( 20 "net" 21 "os" 22 "os/signal" 23 24 "google.golang.org/grpc" 25) 26 27// Server is an interface for hiding some of the grpc specifics to make 28// it slightly easier to manage a flight service, slightly modeled after 29// the C++ implementation 30type Server interface { 31 // Init takes in the address to bind to and creates the listener 32 Init(addr string) error 33 // Addr will return the address that was bound to for the service to listen on 34 Addr() net.Addr 35 // SetShutdownOnSignals sets notifications on the given signals to call GracefulStop 36 // on the grpc service if any of those signals are received 37 SetShutdownOnSignals(sig ...os.Signal) 38 // Serve blocks until accepting a connection fails with a fatal error. It will return 39 // a non-nil error unless it stopped due to calling Shutdown or receiving one of the 40 // signals set in SetShutdownOnSignals 41 Serve() error 42 // Shutdown will call GracefulStop on the grpc server so that it stops accepting connections 43 // and will wait until current methods complete 44 Shutdown() 45 // RegisterFlightService sets up the handler for the Flight Endpoints as per 46 // normal Grpc setups 47 RegisterFlightService(*FlightServiceService) 48} 49 50type server struct { 51 lis net.Listener 52 sigChannel <-chan os.Signal 53 done chan bool 54 55 authHandler ServerAuthHandler 56 server *grpc.Server 57} 58 59// NewFlightServer takes in an auth handler for managing the handshake authentication 60// and any grpc Server options desired, such as TLS certs and so on which will just 61// be passed through to the underlying grpc server. 62// 63// Alternatively, a grpc server can be created normally without this helper as the 64// grpc server generated code is still being exported. This only exists to allow 65// the utility of the helpers 66func NewFlightServer(auth ServerAuthHandler, opt ...grpc.ServerOption) Server { 67 if auth != nil { 68 opt = append([]grpc.ServerOption{ 69 grpc.ChainStreamInterceptor(createServerAuthStreamInterceptor(auth)), 70 grpc.ChainUnaryInterceptor(createServerAuthUnaryInterceptor(auth)), 71 }, opt...) 72 } 73 74 return &server{ 75 authHandler: auth, 76 server: grpc.NewServer(opt...), 77 } 78} 79 80func (s *server) Init(addr string) (err error) { 81 s.lis, err = net.Listen("tcp", addr) 82 return 83} 84 85func (s *server) Addr() net.Addr { 86 return s.lis.Addr() 87} 88 89func (s *server) SetShutdownOnSignals(sig ...os.Signal) { 90 c := make(chan os.Signal, 1) 91 signal.Notify(c, sig...) 92 s.sigChannel = c 93} 94 95func (s *server) Serve() error { 96 s.done = make(chan bool) 97 go func() { 98 select { 99 case <-s.sigChannel: 100 s.server.GracefulStop() 101 case <-s.done: 102 } 103 }() 104 err := s.server.Serve(s.lis) 105 close(s.done) 106 return err 107} 108 109func (s *server) RegisterFlightService(svc *FlightServiceService) { 110 if svc.Handshake == nil { 111 svc.Handshake = s.handshake 112 } 113 RegisterFlightServiceService(s.server, svc) 114} 115 116func (s *server) Shutdown() { 117 s.server.GracefulStop() 118} 119