1/// Licensed to the Apache Software Foundation (ASF) under one 2/// or more contributor license agreements. See the NOTICE file 3/// distributed with this work for additional information 4/// regarding copyright ownership. The ASF licenses this file 5/// to you under the Apache License, Version 2.0 (the 6/// "License"); you may not use this file except in compliance 7/// with the License. You may obtain a copy of the License at 8/// 9/// http://www.apache.org/licenses/LICENSE-2.0 10/// 11/// Unless required by applicable law or agreed to in writing, 12/// software distributed under the License is distributed on an 13/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14/// KIND, either express or implied. See the License for the 15/// specific language governing permissions and limitations 16/// under the License. 17 18library thrift.src.console.t_tcp_socket; 19 20import 'dart:async'; 21import 'dart:io'; 22import 'dart:typed_data' show Uint8List; 23 24import 'package:thrift/thrift.dart'; 25 26/// A [TSocket] backed by a [Socket] from dart:io 27class TTcpSocket implements TSocket { 28 final StreamController<TSocketState> _onStateController; 29 Stream<TSocketState> get onState => _onStateController.stream; 30 31 final StreamController<Object> _onErrorController; 32 Stream<Object> get onError => _onErrorController.stream; 33 34 final StreamController<Uint8List> _onMessageController; 35 Stream<Uint8List> get onMessage => _onMessageController.stream; 36 37 TTcpSocket(Socket socket) 38 : _onStateController = new StreamController.broadcast(), 39 _onErrorController = new StreamController.broadcast(), 40 _onMessageController = new StreamController.broadcast() { 41 if (socket == null) { 42 throw new ArgumentError.notNull('socket'); 43 } 44 45 _socket = socket; 46 _socket.listen(_onMessage, onError: _onError, onDone: close); 47 } 48 49 Socket _socket; 50 51 bool get isOpen => _socket != null; 52 53 bool get isClosed => _socket == null; 54 55 Future open() async { 56 _onStateController.add(TSocketState.OPEN); 57 } 58 59 Future close() async { 60 if (_socket != null) { 61 await _socket.close(); 62 _socket = null; 63 } 64 65 _onStateController.add(TSocketState.CLOSED); 66 } 67 68 void send(Uint8List data) { 69 _socket.add(data); 70 } 71 72 void _onMessage(List<int> message) { 73 Uint8List data = new Uint8List.fromList(message); 74 _onMessageController.add(data); 75 } 76 77 void _onError(Object error) { 78 close(); 79 _onErrorController.add('$error'); 80 } 81} 82