1/* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19#import "ProtoRPC.h" 20 21#if GPB_USE_PROTOBUF_FRAMEWORK_IMPORTS 22#import <Protobuf/GPBProtocolBuffers.h> 23#else 24#import <GPBProtocolBuffers.h> 25#endif 26#import <GRPCClient/GRPCCall.h> 27#import <RxLibrary/GRXWriteable.h> 28#import <RxLibrary/GRXWriter+Transformations.h> 29 30@implementation GRPCUnaryResponseHandler { 31 void (^_responseHandler)(id, NSError *); 32 dispatch_queue_t _responseDispatchQueue; 33 34 GPBMessage *_message; 35} 36 37- (nullable instancetype)initWithResponseHandler:(void (^)(id, NSError *))handler 38 responseDispatchQueue:(dispatch_queue_t)dispatchQueue { 39 if ((self = [super init])) { 40 _responseHandler = handler; 41 if (dispatchQueue == nil) { 42 _responseDispatchQueue = dispatch_get_main_queue(); 43 } else { 44 _responseDispatchQueue = dispatchQueue; 45 } 46 } 47 return self; 48} 49 50// Implements GRPCProtoResponseHandler 51- (dispatch_queue_t)dispatchQueue { 52 return _responseDispatchQueue; 53} 54 55- (void)didReceiveInitialMetadata:(NSDictionary *)initialMetadata { 56 _responseHeaders = [initialMetadata copy]; 57} 58 59- (void)didReceiveProtoMessage:(GPBMessage *)message { 60 _message = message; 61} 62 63- (void)didCloseWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error { 64 _responseTrailers = [trailingMetadata copy]; 65 GPBMessage *message = _message; 66 _message = nil; 67 _responseHandler(message, error); 68} 69 70// Intentional no-op since flow control is N/A in a unary call 71- (void)didWriteMessage { 72} 73 74@end 75 76@implementation GRPCUnaryProtoCall { 77 GRPCStreamingProtoCall *_call; 78 GPBMessage *_message; 79} 80 81- (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions 82 message:(GPBMessage *)message 83 responseHandler:(id<GRPCProtoResponseHandler>)handler 84 callOptions:(GRPCCallOptions *)callOptions 85 responseClass:(Class)responseClass { 86 NSAssert(message != nil, @"message cannot be empty."); 87 NSAssert(responseClass != nil, @"responseClass cannot be empty."); 88 if (message == nil || responseClass == nil) { 89 return nil; 90 } 91 if ((self = [super init])) { 92 _call = [[GRPCStreamingProtoCall alloc] initWithRequestOptions:requestOptions 93 responseHandler:handler 94 callOptions:callOptions 95 responseClass:responseClass]; 96 _message = [message copy]; 97 } 98 return self; 99} 100 101- (void)start { 102 [_call start]; 103 [_call receiveNextMessage]; 104 [_call writeMessage:_message]; 105 [_call finish]; 106} 107 108- (void)cancel { 109 [_call cancel]; 110} 111 112@end 113 114@interface GRPCStreamingProtoCall () <GRPCResponseHandler> 115 116@end 117 118@implementation GRPCStreamingProtoCall { 119 GRPCRequestOptions *_requestOptions; 120 id<GRPCProtoResponseHandler> _handler; 121 GRPCCallOptions *_callOptions; 122 Class _responseClass; 123 124 GRPCCall2 *_call; 125 dispatch_queue_t _dispatchQueue; 126} 127 128- (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions 129 responseHandler:(id<GRPCProtoResponseHandler>)handler 130 callOptions:(GRPCCallOptions *)callOptions 131 responseClass:(Class)responseClass { 132 NSAssert(requestOptions.host.length != 0 && requestOptions.path.length != 0 && 133 requestOptions.safety <= GRPCCallSafetyCacheableRequest, 134 @"Invalid callOptions."); 135 NSAssert(handler != nil, @"handler cannot be empty."); 136 if (requestOptions.host.length == 0 || requestOptions.path.length == 0 || 137 requestOptions.safety > GRPCCallSafetyCacheableRequest) { 138 return nil; 139 } 140 if (handler == nil) { 141 return nil; 142 } 143 144 if ((self = [super init])) { 145 _requestOptions = [requestOptions copy]; 146 _handler = handler; 147 _callOptions = [callOptions copy]; 148 _responseClass = responseClass; 149 150 // Set queue QoS only when iOS version is 8.0 or above and Xcode version is 9.0 or above 151#if __IPHONE_OS_VERSION_MAX_ALLOWED < 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED < 101300 152 if (@available(iOS 8.0, macOS 10.10, *)) { 153 _dispatchQueue = dispatch_queue_create( 154 NULL, 155 dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0)); 156 } else { 157#else 158 { 159#endif 160 _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); 161 } 162 dispatch_set_target_queue(_dispatchQueue, handler.dispatchQueue); 163 164 _call = [[GRPCCall2 alloc] initWithRequestOptions:_requestOptions 165 responseHandler:self 166 callOptions:_callOptions]; 167 } 168 return self; 169} 170 171- (void)start { 172 GRPCCall2 *copiedCall; 173 @synchronized(self) { 174 copiedCall = _call; 175 } 176 [copiedCall start]; 177} 178 179- (void)cancel { 180 GRPCCall2 *copiedCall; 181 @synchronized(self) { 182 copiedCall = _call; 183 _call = nil; 184 if ([_handler respondsToSelector:@selector(didCloseWithTrailingMetadata:error:)]) { 185 dispatch_async(_dispatchQueue, ^{ 186 id<GRPCProtoResponseHandler> copiedHandler = nil; 187 @synchronized(self) { 188 copiedHandler = self->_handler; 189 self->_handler = nil; 190 } 191 [copiedHandler didCloseWithTrailingMetadata:nil 192 error:[NSError errorWithDomain:kGRPCErrorDomain 193 code:GRPCErrorCodeCancelled 194 userInfo:@{ 195 NSLocalizedDescriptionKey : 196 @"Canceled by app" 197 }]]; 198 }); 199 } else { 200 _handler = nil; 201 } 202 } 203 [copiedCall cancel]; 204} 205 206- (void)writeMessage:(GPBMessage *)message { 207 NSAssert([message isKindOfClass:[GPBMessage class]], @"Parameter message must be a GPBMessage"); 208 if (![message isKindOfClass:[GPBMessage class]]) { 209 NSLog(@"Failed to send a message that is non-proto."); 210 return; 211 } 212 213 GRPCCall2 *copiedCall; 214 @synchronized(self) { 215 copiedCall = _call; 216 } 217 [copiedCall writeData:[message data]]; 218} 219 220- (void)finish { 221 GRPCCall2 *copiedCall; 222 @synchronized(self) { 223 copiedCall = _call; 224 _call = nil; 225 } 226 [copiedCall finish]; 227} 228 229- (void)receiveNextMessage { 230 [self receiveNextMessages:1]; 231} 232- (void)receiveNextMessages:(NSUInteger)numberOfMessages { 233 GRPCCall2 *copiedCall; 234 @synchronized(self) { 235 copiedCall = _call; 236 } 237 [copiedCall receiveNextMessages:numberOfMessages]; 238} 239 240- (void)didReceiveInitialMetadata:(NSDictionary *)initialMetadata { 241 @synchronized(self) { 242 if (initialMetadata != nil && 243 [_handler respondsToSelector:@selector(didReceiveInitialMetadata:)]) { 244 dispatch_async(_dispatchQueue, ^{ 245 id<GRPCProtoResponseHandler> copiedHandler = nil; 246 @synchronized(self) { 247 copiedHandler = self->_handler; 248 } 249 [copiedHandler didReceiveInitialMetadata:initialMetadata]; 250 }); 251 } 252 } 253} 254 255- (void)didReceiveData:(id)data { 256 if (data == nil) return; 257 258 NSError *error = nil; 259 GPBMessage *parsed = [_responseClass parseFromData:data error:&error]; 260 @synchronized(self) { 261 if (parsed && [_handler respondsToSelector:@selector(didReceiveProtoMessage:)]) { 262 dispatch_async(_dispatchQueue, ^{ 263 id<GRPCProtoResponseHandler> copiedHandler = nil; 264 @synchronized(self) { 265 copiedHandler = self->_handler; 266 } 267 [copiedHandler didReceiveProtoMessage:parsed]; 268 }); 269 } else if (!parsed && [_handler respondsToSelector:@selector(didCloseWithTrailingMetadata: 270 error:)]) { 271 dispatch_async(_dispatchQueue, ^{ 272 id<GRPCProtoResponseHandler> copiedHandler = nil; 273 @synchronized(self) { 274 copiedHandler = self->_handler; 275 self->_handler = nil; 276 } 277 [copiedHandler 278 didCloseWithTrailingMetadata:nil 279 error:ErrorForBadProto(data, self->_responseClass, error)]; 280 }); 281 [_call cancel]; 282 _call = nil; 283 } 284 } 285} 286 287- (void)didCloseWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error { 288 @synchronized(self) { 289 if ([_handler respondsToSelector:@selector(didCloseWithTrailingMetadata:error:)]) { 290 dispatch_async(_dispatchQueue, ^{ 291 id<GRPCProtoResponseHandler> copiedHandler = nil; 292 @synchronized(self) { 293 copiedHandler = self->_handler; 294 self->_handler = nil; 295 } 296 [copiedHandler didCloseWithTrailingMetadata:trailingMetadata error:error]; 297 }); 298 } 299 _call = nil; 300 } 301} 302 303- (void)didWriteData { 304 @synchronized(self) { 305 if ([_handler respondsToSelector:@selector(didWriteMessage)]) { 306 dispatch_async(_dispatchQueue, ^{ 307 id<GRPCProtoResponseHandler> copiedHandler = nil; 308 @synchronized(self) { 309 copiedHandler = self->_handler; 310 } 311 [copiedHandler didWriteMessage]; 312 }); 313 } 314 } 315} 316 317- (dispatch_queue_t)dispatchQueue { 318 return _dispatchQueue; 319} 320 321@end 322 323/** 324 * Generate an NSError object that represents a failure in parsing a proto class. 325 */ 326NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsingError) { 327 NSDictionary *info = @{ 328 NSLocalizedDescriptionKey : @"Unable to parse response from the server", 329 NSLocalizedRecoverySuggestionErrorKey : 330 @"If this RPC is idempotent, retry " 331 @"with exponential backoff. Otherwise, query the server status before " 332 @"retrying.", 333 NSUnderlyingErrorKey : parsingError, 334 @"Expected class" : expectedClass, 335 @"Received value" : proto, 336 }; 337 // TODO(jcanizales): Use kGRPCErrorDomain and GRPCErrorCodeInternal when they're public. 338 return [NSError errorWithDomain:@"io.grpc" code:13 userInfo:info]; 339} 340