1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 /****************************************************************************
25 
26   Async Disk IO operations.
27 
28 
29 
30  ****************************************************************************/
31 #pragma once
32 
33 #include "P_EventSystem.h"
34 #include "I_AIO.h"
35 
36 // for debugging
37 // #define AIO_STATS 1
38 
39 static constexpr ts::ModuleVersion AIO_MODULE_INTERNAL_VERSION{AIO_MODULE_PUBLIC_VERSION, ts::ModuleVersion::PRIVATE};
40 
41 TS_INLINE int
ok()42 AIOCallback::ok()
43 {
44   return (off_t)aiocb.aio_nbytes == (off_t)aio_result;
45 }
46 
47 extern Continuation *aio_err_callbck;
48 
49 #if AIO_MODE == AIO_MODE_NATIVE
50 
51 struct AIOCallbackInternal : public AIOCallback {
52   int io_complete(int event, void *data);
AIOCallbackInternalAIOCallbackInternal53   AIOCallbackInternal()
54   {
55     memset((void *)&(this->aiocb), 0, sizeof(this->aiocb));
56     SET_HANDLER(&AIOCallbackInternal::io_complete);
57   }
58 };
59 
60 TS_INLINE int
mainEvent(int,Event *)61 AIOVec::mainEvent(int /* event */, Event *)
62 {
63   ++completed;
64   if (completed < size)
65     return EVENT_CONT;
66   else if (completed == size) {
67     SCOPED_MUTEX_LOCK(lock, action.mutex, this_ethread());
68     if (!action.cancelled)
69       action.continuation->handleEvent(AIO_EVENT_DONE, first);
70     delete this;
71     return EVENT_DONE;
72   }
73   ink_assert(!"AIOVec mainEvent err");
74   return EVENT_ERROR;
75 }
76 
77 #else /* AIO_MODE != AIO_MODE_NATIVE */
78 
79 struct AIO_Reqs;
80 
81 struct AIOCallbackInternal : public AIOCallback {
82   AIO_Reqs *aio_req     = nullptr;
83   ink_hrtime sleep_time = 0;
84   SLINK(AIOCallbackInternal, alink); /* for AIO_Reqs::aio_temp_list */
85 
86   int io_complete(int event, void *data);
87 
AIOCallbackInternalAIOCallbackInternal88   AIOCallbackInternal() { SET_HANDLER(&AIOCallbackInternal::io_complete); }
89 };
90 
91 struct AIO_Reqs {
92   Que(AIOCallback, link) aio_todo; /* queue for AIO operations */
93                                    /* Atomic list to temporarily hold the request if the
94                                       lock for a particular queue cannot be acquired */
95   ASLL(AIOCallbackInternal, alink) aio_temp_list;
96   ink_mutex aio_mutex;
97   ink_cond aio_cond;
98   int index           = 0; /* position of this struct in the aio_reqs array */
99   int pending         = 0; /* number of outstanding requests on the disk */
100   int queued          = 0; /* total number of aio_todo requests */
101   int filedes         = 0; /* the file descriptor for the requests */
102   int requests_queued = 0;
103 };
104 
105 #endif // AIO_MODE == AIO_MODE_NATIVE
106 
107 TS_INLINE int
io_complete(int event,void * data)108 AIOCallbackInternal::io_complete(int event, void *data)
109 {
110   (void)event;
111   (void)data;
112   if (aio_err_callbck && !ok()) {
113     AIOCallback *err_op          = new AIOCallbackInternal();
114     err_op->aiocb.aio_fildes     = this->aiocb.aio_fildes;
115     err_op->aiocb.aio_lio_opcode = this->aiocb.aio_lio_opcode;
116     err_op->mutex                = aio_err_callbck->mutex;
117     err_op->action               = aio_err_callbck;
118     eventProcessor.schedule_imm(err_op);
119   }
120   if (!action.cancelled) {
121     action.continuation->handleEvent(AIO_EVENT_DONE, this);
122   }
123   return EVENT_DONE;
124 }
125 
126 #ifdef AIO_STATS
127 class AIOTestData : public Continuation
128 {
129 public:
130   int num_req;
131   int num_temp;
132   int num_queue;
133   ink_hrtime start;
134 
135   int ink_aio_stats(int event, void *data);
136 
AIOTestData()137   AIOTestData() : Continuation(new_ProxyMutex()), num_req(0), num_temp(0), num_queue(0)
138   {
139     start = ink_get_hrtime();
140     SET_HANDLER(&AIOTestData::ink_aio_stats);
141   }
142 };
143 #endif
144 
145 enum aio_stat_enum {
146   AIO_STAT_READ_PER_SEC,
147   AIO_STAT_KB_READ_PER_SEC,
148   AIO_STAT_WRITE_PER_SEC,
149   AIO_STAT_KB_WRITE_PER_SEC,
150   AIO_STAT_COUNT
151 };
152 extern RecRawStatBlock *aio_rsb;
153