1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 // Slow stream implementations, mainly for testing and benchmarking
19 
20 #pragma once
21 
22 #include <cstdint>
23 #include <memory>
24 #include <utility>
25 
26 #include "arrow/io/interfaces.h"
27 #include "arrow/util/visibility.h"
28 
29 namespace arrow {
30 
31 class Buffer;
32 class Status;
33 
34 namespace io {
35 
36 class ARROW_EXPORT LatencyGenerator {
37  public:
38   virtual ~LatencyGenerator();
39 
40   void Sleep();
41 
42   virtual double NextLatency() = 0;
43 
44   static std::shared_ptr<LatencyGenerator> Make(double average_latency);
45   static std::shared_ptr<LatencyGenerator> Make(double average_latency, int32_t seed);
46 };
47 
48 // XXX use ConcurrencyWrapper?  It could increase chances of finding a race.
49 
50 template <class StreamType>
51 class ARROW_EXPORT SlowInputStreamBase : public StreamType {
52  public:
SlowInputStreamBase(std::shared_ptr<StreamType> stream,std::shared_ptr<LatencyGenerator> latencies)53   SlowInputStreamBase(std::shared_ptr<StreamType> stream,
54                       std::shared_ptr<LatencyGenerator> latencies)
55       : stream_(std::move(stream)), latencies_(std::move(latencies)) {}
56 
SlowInputStreamBase(std::shared_ptr<StreamType> stream,double average_latency)57   SlowInputStreamBase(std::shared_ptr<StreamType> stream, double average_latency)
58       : stream_(std::move(stream)), latencies_(LatencyGenerator::Make(average_latency)) {}
59 
SlowInputStreamBase(std::shared_ptr<StreamType> stream,double average_latency,int32_t seed)60   SlowInputStreamBase(std::shared_ptr<StreamType> stream, double average_latency,
61                       int32_t seed)
62       : stream_(std::move(stream)),
63         latencies_(LatencyGenerator::Make(average_latency, seed)) {}
64 
65  protected:
66   std::shared_ptr<StreamType> stream_;
67   std::shared_ptr<LatencyGenerator> latencies_;
68 };
69 
70 /// \brief An InputStream wrapper that makes reads slower.
71 ///
72 /// Read() calls are made slower by an average latency (in seconds).
73 /// Actual latencies form a normal distribution closely centered
74 /// on the average latency.
75 /// Other calls are forwarded directly.
76 class ARROW_EXPORT SlowInputStream : public SlowInputStreamBase<InputStream> {
77  public:
78   ~SlowInputStream() override;
79 
80   using SlowInputStreamBase<InputStream>::SlowInputStreamBase;
81 
82   Status Close() override;
83   Status Abort() override;
84   bool closed() const override;
85 
86   Result<int64_t> Read(int64_t nbytes, void* out) override;
87   Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override;
88   Result<util::string_view> Peek(int64_t nbytes) override;
89 
90   Result<int64_t> Tell() const override;
91 };
92 
93 /// \brief A RandomAccessFile wrapper that makes reads slower.
94 ///
95 /// Similar to SlowInputStream, but allows random access and seeking.
96 class ARROW_EXPORT SlowRandomAccessFile : public SlowInputStreamBase<RandomAccessFile> {
97  public:
98   ~SlowRandomAccessFile() override;
99 
100   using SlowInputStreamBase<RandomAccessFile>::SlowInputStreamBase;
101 
102   Status Close() override;
103   Status Abort() override;
104   bool closed() const override;
105 
106   Result<int64_t> Read(int64_t nbytes, void* out) override;
107   Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override;
108   Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override;
109   Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) override;
110   Result<util::string_view> Peek(int64_t nbytes) override;
111 
112   Result<int64_t> GetSize() override;
113   Status Seek(int64_t position) override;
114   Result<int64_t> Tell() const override;
115 };
116 
117 }  // namespace io
118 }  // namespace arrow
119