1 // Licensed to the Apache Software Foundation (ASF) under one or more 2 // contributor license agreements. See the NOTICE file distributed with 3 // this work for additional information regarding copyright ownership. 4 // The ASF licenses this file to You under the Apache License, Version 2.0 5 // (the "License"); you may not use this file except in compliance with 6 // the License. You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 using System; 17 using System.Collections.Generic; 18 using System.Threading.Tasks; 19 using Apache.Arrow.Flight.Internal; 20 using Apache.Arrow.Flight.Protocol; 21 using Grpc.Core; 22 using Grpc.Net.Client; 23 24 namespace Apache.Arrow.Flight.Client 25 { 26 public class FlightClient 27 { 28 internal static readonly Empty EmptyInstance = new Empty(); 29 30 private readonly FlightService.FlightServiceClient _client; 31 FlightClient(GrpcChannel grpcChannel)32 public FlightClient(GrpcChannel grpcChannel) 33 { 34 _client = new FlightService.FlightServiceClient(grpcChannel); 35 } 36 ListFlights(FlightCriteria criteria = null, Metadata headers = null)37 public AsyncServerStreamingCall<FlightInfo> ListFlights(FlightCriteria criteria = null, Metadata headers = null) 38 { 39 if(criteria == null) 40 { 41 criteria = FlightCriteria.Empty; 42 } 43 44 var response = _client.ListFlights(criteria.ToProtocol(), headers); 45 var convertStream = new StreamReader<Protocol.FlightInfo, FlightInfo>(response.ResponseStream, inFlight => new FlightInfo(inFlight)); 46 47 return new AsyncServerStreamingCall<FlightInfo>(convertStream, response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose); 48 } 49 ListActions(Metadata headers = null)50 public AsyncServerStreamingCall<FlightActionType> ListActions(Metadata headers = null) 51 { 52 var response = _client.ListActions(EmptyInstance, headers); 53 var convertStream = new StreamReader<Protocol.ActionType, FlightActionType>(response.ResponseStream, actionType => new FlightActionType(actionType)); 54 55 return new AsyncServerStreamingCall<FlightActionType>(convertStream, response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose); 56 } 57 GetStream(FlightTicket ticket, Metadata headers = null)58 public FlightRecordBatchStreamingCall GetStream(FlightTicket ticket, Metadata headers = null) 59 { 60 var stream = _client.DoGet(ticket.ToProtocol(), headers); 61 var responseStream = new FlightClientRecordBatchStreamReader(stream.ResponseStream); 62 return new FlightRecordBatchStreamingCall(responseStream, stream.ResponseHeadersAsync, stream.GetStatus, stream.GetTrailers, stream.Dispose); 63 } 64 GetInfo(FlightDescriptor flightDescriptor, Metadata headers = null)65 public AsyncUnaryCall<FlightInfo> GetInfo(FlightDescriptor flightDescriptor, Metadata headers = null) 66 { 67 var flightInfoResult = _client.GetFlightInfoAsync(flightDescriptor.ToProtocol(), headers); 68 69 var flightInfo = flightInfoResult 70 .ResponseAsync 71 .ContinueWith(async flightInfo => new FlightInfo(await flightInfo.ConfigureAwait(false))) 72 .Unwrap(); 73 74 return new AsyncUnaryCall<FlightInfo>( 75 flightInfo, 76 flightInfoResult.ResponseHeadersAsync, 77 flightInfoResult.GetStatus, 78 flightInfoResult.GetTrailers, 79 flightInfoResult.Dispose); 80 } 81 StartPut(FlightDescriptor flightDescriptor, Metadata headers = null)82 public FlightRecordBatchDuplexStreamingCall StartPut(FlightDescriptor flightDescriptor, Metadata headers = null) 83 { 84 var channels = _client.DoPut(headers); 85 var requestStream = new FlightClientRecordBatchStreamWriter(channels.RequestStream, flightDescriptor); 86 var readStream = new StreamReader<Protocol.PutResult, FlightPutResult>(channels.ResponseStream, putResult => new FlightPutResult(putResult)); 87 return new FlightRecordBatchDuplexStreamingCall( 88 requestStream, 89 readStream, 90 channels.ResponseHeadersAsync, 91 channels.GetStatus, 92 channels.GetTrailers, 93 channels.Dispose); 94 } 95 DoAction(FlightAction action, Metadata headers = null)96 public AsyncServerStreamingCall<FlightResult> DoAction(FlightAction action, Metadata headers = null) 97 { 98 var stream = _client.DoAction(action.ToProtocol(), headers); 99 var streamReader = new StreamReader<Protocol.Result, FlightResult>(stream.ResponseStream, result => new FlightResult(result)); 100 return new AsyncServerStreamingCall<FlightResult>(streamReader, stream.ResponseHeadersAsync, stream.GetStatus, stream.GetTrailers, stream.Dispose); 101 } 102 GetSchema(FlightDescriptor flightDescriptor, Metadata headers = null)103 public AsyncUnaryCall<Schema> GetSchema(FlightDescriptor flightDescriptor, Metadata headers = null) 104 { 105 var schemaResult = _client.GetSchemaAsync(flightDescriptor.ToProtocol(), headers); 106 107 var schema = schemaResult 108 .ResponseAsync 109 .ContinueWith(async schema => FlightMessageSerializer.DecodeSchema((await schemaResult.ResponseAsync.ConfigureAwait(false)).Schema.Memory)) 110 .Unwrap(); 111 112 return new AsyncUnaryCall<Schema>( 113 schema, 114 schemaResult.ResponseHeadersAsync, 115 schemaResult.GetStatus, 116 schemaResult.GetTrailers, 117 schemaResult.Dispose); 118 } 119 } 120 } 121