1/// Licensed to the Apache Software Foundation (ASF) under one
2/// or more contributor license agreements. See the NOTICE file
3/// distributed with this work for additional information
4/// regarding copyright ownership. The ASF licenses this file
5/// to you under the Apache License, Version 2.0 (the
6/// "License"); you may not use this file except in compliance
7/// with the License. You may obtain a copy of the License at
8///
9/// http://www.apache.org/licenses/LICENSE-2.0
10///
11/// Unless required by applicable law or agreed to in writing,
12/// software distributed under the License is distributed on an
13/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14/// KIND, either express or implied. See the License for the
15/// specific language governing permissions and limitations
16/// under the License.
17
18library thrift.src.console.t_tcp_socket;
19
20import 'dart:async';
21import 'dart:io';
22import 'dart:typed_data' show Uint8List;
23
24import 'package:thrift/thrift.dart';
25
26/// A [TSocket] backed by a [Socket] from dart:io
27class TTcpSocket implements TSocket {
28  final StreamController<TSocketState> _onStateController;
29  Stream<TSocketState> get onState => _onStateController.stream;
30
31  final StreamController<Object> _onErrorController;
32  Stream<Object> get onError => _onErrorController.stream;
33
34  final StreamController<Uint8List> _onMessageController;
35  Stream<Uint8List> get onMessage => _onMessageController.stream;
36
37  TTcpSocket(Socket socket)
38      : _onStateController = new StreamController.broadcast(),
39        _onErrorController = new StreamController.broadcast(),
40        _onMessageController = new StreamController.broadcast() {
41    if (socket == null) {
42      throw new ArgumentError.notNull('socket');
43    }
44
45    _socket = socket;
46    _socket.listen(_onMessage, onError: _onError, onDone: close);
47  }
48
49  Socket _socket;
50
51  bool get isOpen => _socket != null;
52
53  bool get isClosed => _socket == null;
54
55  Future open() async {
56    _onStateController.add(TSocketState.OPEN);
57  }
58
59  Future close() async {
60    if (_socket != null) {
61      await _socket.close();
62      _socket = null;
63    }
64
65    _onStateController.add(TSocketState.CLOSED);
66  }
67
68  void send(Uint8List data) {
69    _socket.add(data);
70  }
71
72  void _onMessage(List<int> message) {
73    Uint8List data = new Uint8List.fromList(message);
74    _onMessageController.add(data);
75  }
76
77  void _onError(Object error) {
78    close();
79    _onErrorController.add('$error');
80  }
81}
82