1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "arrow/io/transform.h"
19 
20 #include <algorithm>
21 #include <cstring>
22 #include <mutex>
23 #include <random>
24 #include <thread>
25 #include <utility>
26 
27 #include "arrow/buffer.h"
28 #include "arrow/io/util_internal.h"
29 #include "arrow/result.h"
30 #include "arrow/status.h"
31 #include "arrow/util/logging.h"
32 
33 namespace arrow {
34 namespace io {
35 
36 struct TransformInputStream::Impl {
37   std::shared_ptr<InputStream> wrapped_;
38   TransformInputStream::TransformFunc transform_;
39   std::shared_ptr<Buffer> pending_;
40   int64_t pos_ = 0;
41   bool closed_ = false;
42 
Implarrow::io::TransformInputStream::Impl43   Impl(std::shared_ptr<InputStream> wrapped,
44        TransformInputStream::TransformFunc transform)
45       : wrapped_(std::move(wrapped)), transform_(std::move(transform)) {}
46 
Closearrow::io::TransformInputStream::Impl47   void Close() {
48     closed_ = true;
49     pending_.reset();
50   }
51 
CheckClosedarrow::io::TransformInputStream::Impl52   Status CheckClosed() const {
53     if (closed_) {
54       return Status::Invalid("Operation on closed file");
55     }
56     return Status::OK();
57   }
58 };
59 
TransformInputStream(std::shared_ptr<InputStream> wrapped,TransformInputStream::TransformFunc transform)60 TransformInputStream::TransformInputStream(std::shared_ptr<InputStream> wrapped,
61                                            TransformInputStream::TransformFunc transform)
62     : impl_(new Impl{std::move(wrapped), std::move(transform)}) {}
63 
~TransformInputStream()64 TransformInputStream::~TransformInputStream() {}
65 
Close()66 Status TransformInputStream::Close() {
67   impl_->Close();
68   return impl_->wrapped_->Close();
69 }
70 
Abort()71 Status TransformInputStream::Abort() { return impl_->wrapped_->Abort(); }
72 
closed() const73 bool TransformInputStream::closed() const { return impl_->closed_; }
74 
Read(int64_t nbytes)75 Result<std::shared_ptr<Buffer>> TransformInputStream::Read(int64_t nbytes) {
76   RETURN_NOT_OK(impl_->CheckClosed());
77 
78   ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes));
79   ARROW_ASSIGN_OR_RAISE(auto bytes_read, this->Read(nbytes, buf->mutable_data()));
80   if (bytes_read < nbytes) {
81     RETURN_NOT_OK(buf->Resize(bytes_read, /*shrink_to_fit=*/true));
82   }
83   return std::shared_ptr<Buffer>(std::move(buf));
84 }
85 
Read(int64_t nbytes,void * out)86 Result<int64_t> TransformInputStream::Read(int64_t nbytes, void* out) {
87   RETURN_NOT_OK(impl_->CheckClosed());
88 
89   if (nbytes == 0) {
90     return 0;
91   }
92 
93   int64_t avail_size = 0;
94   std::vector<std::shared_ptr<Buffer>> avail;
95   if (impl_->pending_) {
96     avail.push_back(impl_->pending_);
97     avail_size += impl_->pending_->size();
98   }
99   // Accumulate enough transformed data to satisfy read
100   while (avail_size < nbytes) {
101     ARROW_ASSIGN_OR_RAISE(auto buf, impl_->wrapped_->Read(nbytes));
102     const bool have_eof = (buf->size() == 0);
103     // Even if EOF is met, let the transform function run a last time
104     // (for example to flush internal buffers)
105     ARROW_ASSIGN_OR_RAISE(buf, impl_->transform_(std::move(buf)));
106     avail_size += buf->size();
107     avail.push_back(std::move(buf));
108     if (have_eof) {
109       break;
110     }
111   }
112   DCHECK(!avail.empty());
113 
114   // Coalesce buffer data
115   uint8_t* out_data = reinterpret_cast<uint8_t*>(out);
116   int64_t copied_bytes = 0;
117   for (size_t i = 0; i < avail.size() - 1; ++i) {
118     // All buffers except the last fit fully into `nbytes`
119     const auto buf = std::move(avail[i]);
120     DCHECK_LE(buf->size(), nbytes);
121     memcpy(out_data, buf->data(), static_cast<size_t>(buf->size()));
122     out_data += buf->size();
123     nbytes -= buf->size();
124     copied_bytes += buf->size();
125   }
126   {
127     // Last buffer: splice into `out` and `pending_`
128     const auto buf = std::move(avail.back());
129     const int64_t to_copy = std::min(buf->size(), nbytes);
130     memcpy(out_data, buf->data(), static_cast<size_t>(to_copy));
131     copied_bytes += to_copy;
132     if (buf->size() > to_copy) {
133       impl_->pending_ = SliceBuffer(buf, to_copy);
134     } else {
135       impl_->pending_.reset();
136     }
137   }
138   impl_->pos_ += copied_bytes;
139   return copied_bytes;
140 }
141 
Tell() const142 Result<int64_t> TransformInputStream::Tell() const {
143   RETURN_NOT_OK(impl_->CheckClosed());
144 
145   return impl_->pos_;
146 }
147 
ReadMetadata()148 Result<std::shared_ptr<const KeyValueMetadata>> TransformInputStream::ReadMetadata() {
149   RETURN_NOT_OK(impl_->CheckClosed());
150 
151   return impl_->wrapped_->ReadMetadata();
152 }
153 
ReadMetadataAsync(const IOContext & io_context)154 Future<std::shared_ptr<const KeyValueMetadata>> TransformInputStream::ReadMetadataAsync(
155     const IOContext& io_context) {
156   RETURN_NOT_OK(impl_->CheckClosed());
157 
158   return impl_->wrapped_->ReadMetadataAsync(io_context);
159 }
160 
161 }  // namespace io
162 }  // namespace arrow
163