1 // Licensed to the Apache Software Foundation (ASF) under one or more
2 // contributor license agreements. See the NOTICE file distributed with
3 // this work for additional information regarding copyright ownership.
4 // The ASF licenses this file to You under the Apache License, Version 2.0
5 // (the "License"); you may not use this file except in compliance with
6 // the License.  You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 
16 using System;
17 using System.Collections.Generic;
18 using System.Threading.Tasks;
19 using Apache.Arrow.Flight.Internal;
20 using Apache.Arrow.Flight.Protocol;
21 using Grpc.Core;
22 using Grpc.Net.Client;
23 
24 namespace Apache.Arrow.Flight.Client
25 {
26     public class FlightClient
27     {
28         internal static readonly Empty EmptyInstance = new Empty();
29 
30         private readonly FlightService.FlightServiceClient _client;
31 
FlightClient(GrpcChannel grpcChannel)32         public FlightClient(GrpcChannel grpcChannel)
33         {
34             _client = new FlightService.FlightServiceClient(grpcChannel);
35         }
36 
ListFlights(FlightCriteria criteria = null, Metadata headers = null)37         public AsyncServerStreamingCall<FlightInfo> ListFlights(FlightCriteria criteria = null, Metadata headers = null)
38         {
39             if(criteria == null)
40             {
41                 criteria = FlightCriteria.Empty;
42             }
43 
44             var response = _client.ListFlights(criteria.ToProtocol(), headers);
45             var convertStream = new StreamReader<Protocol.FlightInfo, FlightInfo>(response.ResponseStream, inFlight => new FlightInfo(inFlight));
46 
47             return new AsyncServerStreamingCall<FlightInfo>(convertStream, response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose);
48         }
49 
ListActions(Metadata headers = null)50         public AsyncServerStreamingCall<FlightActionType> ListActions(Metadata headers = null)
51         {
52             var response = _client.ListActions(EmptyInstance, headers);
53             var convertStream = new StreamReader<Protocol.ActionType, FlightActionType>(response.ResponseStream, actionType => new FlightActionType(actionType));
54 
55             return new AsyncServerStreamingCall<FlightActionType>(convertStream, response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose);
56         }
57 
GetStream(FlightTicket ticket, Metadata headers = null)58         public FlightRecordBatchStreamingCall GetStream(FlightTicket ticket, Metadata headers = null)
59         {
60             var stream = _client.DoGet(ticket.ToProtocol(),  headers);
61             var responseStream = new FlightClientRecordBatchStreamReader(stream.ResponseStream);
62             return new FlightRecordBatchStreamingCall(responseStream, stream.ResponseHeadersAsync, stream.GetStatus, stream.GetTrailers, stream.Dispose);
63         }
64 
GetInfo(FlightDescriptor flightDescriptor, Metadata headers = null)65         public AsyncUnaryCall<FlightInfo> GetInfo(FlightDescriptor flightDescriptor, Metadata headers = null)
66         {
67             var flightInfoResult = _client.GetFlightInfoAsync(flightDescriptor.ToProtocol(), headers);
68 
69             var flightInfo = flightInfoResult
70                 .ResponseAsync
71                 .ContinueWith(async flightInfo => new FlightInfo(await flightInfo.ConfigureAwait(false)))
72                 .Unwrap();
73 
74             return new AsyncUnaryCall<FlightInfo>(
75                 flightInfo,
76                 flightInfoResult.ResponseHeadersAsync,
77                 flightInfoResult.GetStatus,
78                 flightInfoResult.GetTrailers,
79                 flightInfoResult.Dispose);
80         }
81 
StartPut(FlightDescriptor flightDescriptor, Metadata headers = null)82         public FlightRecordBatchDuplexStreamingCall StartPut(FlightDescriptor flightDescriptor, Metadata headers = null)
83         {
84             var channels = _client.DoPut(headers);
85             var requestStream = new FlightClientRecordBatchStreamWriter(channels.RequestStream, flightDescriptor);
86             var readStream = new StreamReader<Protocol.PutResult, FlightPutResult>(channels.ResponseStream, putResult => new FlightPutResult(putResult));
87             return new FlightRecordBatchDuplexStreamingCall(
88                 requestStream,
89                 readStream,
90                 channels.ResponseHeadersAsync,
91                 channels.GetStatus,
92                 channels.GetTrailers,
93                 channels.Dispose);
94         }
95 
DoAction(FlightAction action, Metadata headers = null)96         public AsyncServerStreamingCall<FlightResult> DoAction(FlightAction action, Metadata headers = null)
97         {
98             var stream = _client.DoAction(action.ToProtocol(), headers);
99             var streamReader = new StreamReader<Protocol.Result, FlightResult>(stream.ResponseStream, result => new FlightResult(result));
100             return new AsyncServerStreamingCall<FlightResult>(streamReader, stream.ResponseHeadersAsync, stream.GetStatus, stream.GetTrailers, stream.Dispose);
101         }
102 
GetSchema(FlightDescriptor flightDescriptor, Metadata headers = null)103         public AsyncUnaryCall<Schema> GetSchema(FlightDescriptor flightDescriptor, Metadata headers = null)
104         {
105             var schemaResult = _client.GetSchemaAsync(flightDescriptor.ToProtocol(), headers);
106 
107             var schema = schemaResult
108                 .ResponseAsync
109                 .ContinueWith(async schema => FlightMessageSerializer.DecodeSchema((await schemaResult.ResponseAsync.ConfigureAwait(false)).Schema.Memory))
110                 .Unwrap();
111 
112             return new AsyncUnaryCall<Schema>(
113                 schema,
114                 schemaResult.ResponseHeadersAsync,
115                 schemaResult.GetStatus,
116                 schemaResult.GetTrailers,
117                 schemaResult.Dispose);
118         }
119     }
120 }
121