1 // Licensed to the Apache Software Foundation (ASF) under one 2 // or more contributor license agreements. See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership. The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 #pragma once 19 20 #include <memory> 21 22 #include "arrow/io/interfaces.h" 23 #include "arrow/result.h" 24 #include "arrow/status.h" 25 #include "arrow/util/checked_cast.h" 26 #include "arrow/util/macros.h" 27 #include "arrow/util/visibility.h" 28 29 namespace arrow { 30 namespace io { 31 namespace internal { 32 33 template <class LockType> 34 class SharedLockGuard { 35 public: SharedLockGuard(LockType * lock)36 explicit SharedLockGuard(LockType* lock) : lock_(lock) { lock_->LockShared(); } 37 ~SharedLockGuard()38 ~SharedLockGuard() { lock_->UnlockShared(); } 39 40 protected: 41 LockType* lock_; 42 }; 43 44 template <class LockType> 45 class ExclusiveLockGuard { 46 public: ExclusiveLockGuard(LockType * lock)47 explicit ExclusiveLockGuard(LockType* lock) : lock_(lock) { lock_->LockExclusive(); } 48 ~ExclusiveLockGuard()49 ~ExclusiveLockGuard() { lock_->UnlockExclusive(); } 50 51 protected: 52 LockType* lock_; 53 }; 54 55 // Debug concurrency checker that marks "shared" and "exclusive" code sections, 56 // aborting if the concurrency rules get violated. Does nothing in release mode. 57 // Note that we intentionally use the same class declaration in debug and 58 // release builds in order to avoid runtime failures when e.g. loading a 59 // release-built DLL with a debug-built application, or the reverse. 60 61 class ARROW_EXPORT SharedExclusiveChecker { 62 public: 63 SharedExclusiveChecker(); 64 void LockShared(); 65 void UnlockShared(); 66 void LockExclusive(); 67 void UnlockExclusive(); 68 shared_guard()69 SharedLockGuard<SharedExclusiveChecker> shared_guard() { 70 return SharedLockGuard<SharedExclusiveChecker>(this); 71 } 72 exclusive_guard()73 ExclusiveLockGuard<SharedExclusiveChecker> exclusive_guard() { 74 return ExclusiveLockGuard<SharedExclusiveChecker>(this); 75 } 76 77 protected: 78 struct Impl; 79 std::shared_ptr<Impl> impl_; 80 }; 81 82 // Concurrency wrappers for IO classes that check the correctness of 83 // concurrent calls to various methods. It is not necessary to wrap all 84 // IO classes with these, only a few core classes that get used in tests. 85 // 86 // We're not using virtual inheritance here as virtual bases have poorly 87 // understood semantic overhead which we'd be passing on to implementers 88 // and users of these interfaces. Instead, we just duplicate the method 89 // wrappers between those two classes. 90 91 template <class Derived> 92 class ARROW_EXPORT InputStreamConcurrencyWrapper : public InputStream { 93 public: Close()94 Status Close() final { 95 auto guard = lock_.exclusive_guard(); 96 return derived()->DoClose(); 97 } 98 Abort()99 Status Abort() final { 100 auto guard = lock_.exclusive_guard(); 101 return derived()->DoAbort(); 102 } 103 Tell()104 Result<int64_t> Tell() const final { 105 auto guard = lock_.exclusive_guard(); 106 return derived()->DoTell(); 107 } 108 Read(int64_t nbytes,void * out)109 Result<int64_t> Read(int64_t nbytes, void* out) final { 110 auto guard = lock_.exclusive_guard(); 111 return derived()->DoRead(nbytes, out); 112 } 113 Read(int64_t nbytes)114 Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) final { 115 auto guard = lock_.exclusive_guard(); 116 return derived()->DoRead(nbytes); 117 } 118 Peek(int64_t nbytes)119 Result<util::string_view> Peek(int64_t nbytes) final { 120 auto guard = lock_.exclusive_guard(); 121 return derived()->DoPeek(nbytes); 122 } 123 124 /* 125 Methods to implement in derived class: 126 127 Status DoClose(); 128 Result<int64_t> DoTell() const; 129 Result<int64_t> DoRead(int64_t nbytes, void* out); 130 Result<std::shared_ptr<Buffer>> DoRead(int64_t nbytes); 131 132 And optionally: 133 134 Status DoAbort() override; 135 Result<util::string_view> DoPeek(int64_t nbytes) override; 136 137 These methods should be protected in the derived class and 138 InputStreamConcurrencyWrapper declared as a friend with 139 140 friend InputStreamConcurrencyWrapper<derived>; 141 */ 142 143 protected: 144 // Default implementations. They are virtual because the derived class may 145 // have derived classes itself. DoAbort()146 virtual Status DoAbort() { return derived()->DoClose(); } 147 DoPeek(int64_t ARROW_ARG_UNUSED (nbytes))148 virtual Result<util::string_view> DoPeek(int64_t ARROW_ARG_UNUSED(nbytes)) { 149 return Status::NotImplemented("Peek not implemented"); 150 } 151 derived()152 Derived* derived() { return ::arrow::internal::checked_cast<Derived*>(this); } 153 derived()154 const Derived* derived() const { 155 return ::arrow::internal::checked_cast<const Derived*>(this); 156 } 157 158 mutable SharedExclusiveChecker lock_; 159 }; 160 161 template <class Derived> 162 class ARROW_EXPORT RandomAccessFileConcurrencyWrapper : public RandomAccessFile { 163 public: Close()164 Status Close() final { 165 auto guard = lock_.exclusive_guard(); 166 return derived()->DoClose(); 167 } 168 Abort()169 Status Abort() final { 170 auto guard = lock_.exclusive_guard(); 171 return derived()->DoAbort(); 172 } 173 Tell()174 Result<int64_t> Tell() const final { 175 auto guard = lock_.exclusive_guard(); 176 return derived()->DoTell(); 177 } 178 Read(int64_t nbytes,void * out)179 Result<int64_t> Read(int64_t nbytes, void* out) final { 180 auto guard = lock_.exclusive_guard(); 181 return derived()->DoRead(nbytes, out); 182 } 183 Read(int64_t nbytes)184 Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) final { 185 auto guard = lock_.exclusive_guard(); 186 return derived()->DoRead(nbytes); 187 } 188 Peek(int64_t nbytes)189 Result<util::string_view> Peek(int64_t nbytes) final { 190 auto guard = lock_.exclusive_guard(); 191 return derived()->DoPeek(nbytes); 192 } 193 Seek(int64_t position)194 Status Seek(int64_t position) final { 195 auto guard = lock_.exclusive_guard(); 196 return derived()->DoSeek(position); 197 } 198 GetSize()199 Result<int64_t> GetSize() final { 200 auto guard = lock_.shared_guard(); 201 return derived()->DoGetSize(); 202 } 203 204 // NOTE: ReadAt doesn't use stream pointer, but it is allowed to update it 205 // (it's the case on Windows when using ReadFileEx). 206 // So any method that relies on the current position (even if it doesn't 207 // update it, such as Peek) cannot run in parallel with ReadAt and has 208 // to use the exclusive_guard. 209 ReadAt(int64_t position,int64_t nbytes,void * out)210 Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) final { 211 auto guard = lock_.shared_guard(); 212 return derived()->DoReadAt(position, nbytes, out); 213 } 214 ReadAt(int64_t position,int64_t nbytes)215 Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) final { 216 auto guard = lock_.shared_guard(); 217 return derived()->DoReadAt(position, nbytes); 218 } 219 220 /* 221 Methods to implement in derived class: 222 223 Status DoClose(); 224 Result<int64_t> DoTell() const; 225 Result<int64_t> DoRead(int64_t nbytes, void* out); 226 Result<std::shared_ptr<Buffer>> DoRead(int64_t nbytes); 227 Status DoSeek(int64_t position); 228 Result<int64_t> DoGetSize() 229 Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, void* out); 230 Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes); 231 232 And optionally: 233 234 Status DoAbort() override; 235 Result<util::string_view> DoPeek(int64_t nbytes) override; 236 237 These methods should be protected in the derived class and 238 RandomAccessFileConcurrencyWrapper declared as a friend with 239 240 friend RandomAccessFileConcurrencyWrapper<derived>; 241 */ 242 243 protected: 244 // Default implementations. They are virtual because the derived class may 245 // have derived classes itself. DoAbort()246 virtual Status DoAbort() { return derived()->DoClose(); } 247 DoPeek(int64_t ARROW_ARG_UNUSED (nbytes))248 virtual Result<util::string_view> DoPeek(int64_t ARROW_ARG_UNUSED(nbytes)) { 249 return Status::NotImplemented("Peek not implemented"); 250 } 251 derived()252 Derived* derived() { return ::arrow::internal::checked_cast<Derived*>(this); } 253 derived()254 const Derived* derived() const { 255 return ::arrow::internal::checked_cast<const Derived*>(this); 256 } 257 258 mutable SharedExclusiveChecker lock_; 259 }; 260 261 } // namespace internal 262 } // namespace io 263 } // namespace arrow 264