1 // Licensed to the Apache Software Foundation (ASF) under one 2 // or more contributor license agreements. See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership. The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 // Slow stream implementations, mainly for testing and benchmarking 19 20 #pragma once 21 22 #include <cstdint> 23 #include <memory> 24 #include <utility> 25 26 #include "arrow/io/interfaces.h" 27 #include "arrow/util/visibility.h" 28 29 namespace arrow { 30 31 class Buffer; 32 class Status; 33 34 namespace io { 35 36 class ARROW_EXPORT LatencyGenerator { 37 public: 38 virtual ~LatencyGenerator(); 39 40 void Sleep(); 41 42 virtual double NextLatency() = 0; 43 44 static std::shared_ptr<LatencyGenerator> Make(double average_latency); 45 static std::shared_ptr<LatencyGenerator> Make(double average_latency, int32_t seed); 46 }; 47 48 // XXX use ConcurrencyWrapper? It could increase chances of finding a race. 49 50 template <class StreamType> 51 class ARROW_EXPORT SlowInputStreamBase : public StreamType { 52 public: SlowInputStreamBase(std::shared_ptr<StreamType> stream,std::shared_ptr<LatencyGenerator> latencies)53 SlowInputStreamBase(std::shared_ptr<StreamType> stream, 54 std::shared_ptr<LatencyGenerator> latencies) 55 : stream_(std::move(stream)), latencies_(std::move(latencies)) {} 56 SlowInputStreamBase(std::shared_ptr<StreamType> stream,double average_latency)57 SlowInputStreamBase(std::shared_ptr<StreamType> stream, double average_latency) 58 : stream_(std::move(stream)), latencies_(LatencyGenerator::Make(average_latency)) {} 59 SlowInputStreamBase(std::shared_ptr<StreamType> stream,double average_latency,int32_t seed)60 SlowInputStreamBase(std::shared_ptr<StreamType> stream, double average_latency, 61 int32_t seed) 62 : stream_(std::move(stream)), 63 latencies_(LatencyGenerator::Make(average_latency, seed)) {} 64 65 protected: 66 std::shared_ptr<StreamType> stream_; 67 std::shared_ptr<LatencyGenerator> latencies_; 68 }; 69 70 /// \brief An InputStream wrapper that makes reads slower. 71 /// 72 /// Read() calls are made slower by an average latency (in seconds). 73 /// Actual latencies form a normal distribution closely centered 74 /// on the average latency. 75 /// Other calls are forwarded directly. 76 class ARROW_EXPORT SlowInputStream : public SlowInputStreamBase<InputStream> { 77 public: 78 ~SlowInputStream() override; 79 80 using SlowInputStreamBase<InputStream>::SlowInputStreamBase; 81 82 Status Close() override; 83 Status Abort() override; 84 bool closed() const override; 85 86 Result<int64_t> Read(int64_t nbytes, void* out) override; 87 Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override; 88 Result<util::string_view> Peek(int64_t nbytes) override; 89 90 Result<int64_t> Tell() const override; 91 }; 92 93 /// \brief A RandomAccessFile wrapper that makes reads slower. 94 /// 95 /// Similar to SlowInputStream, but allows random access and seeking. 96 class ARROW_EXPORT SlowRandomAccessFile : public SlowInputStreamBase<RandomAccessFile> { 97 public: 98 ~SlowRandomAccessFile() override; 99 100 using SlowInputStreamBase<RandomAccessFile>::SlowInputStreamBase; 101 102 Status Close() override; 103 Status Abort() override; 104 bool closed() const override; 105 106 Result<int64_t> Read(int64_t nbytes, void* out) override; 107 Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override; 108 Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override; 109 Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) override; 110 Result<util::string_view> Peek(int64_t nbytes) override; 111 112 Result<int64_t> GetSize() override; 113 Status Seek(int64_t position) override; 114 Result<int64_t> Tell() const override; 115 }; 116 117 } // namespace io 118 } // namespace arrow 119