1 /***************************************************************************
2  *  include/stxxl/bits/mng/block_prefetcher.h
3  *
4  *  Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  *  Copyright (C) 2002-2004 Roman Dementiev <dementiev@mpi-sb.mpg.de>
7  *  Copyright (C) 2009, 2010 Johannes Singler <singler@kit.edu>
8  *  Copyright (C) 2009 Andreas Beckmann <beckmann@cs.uni-frankfurt.de>
9  *
10  *  Distributed under the Boost Software License, Version 1.0.
11  *  (See accompanying file LICENSE_1_0.txt or copy at
12  *  http://www.boost.org/LICENSE_1_0.txt)
13  **************************************************************************/
14 
15 #ifndef STXXL_MNG_BLOCK_PREFETCHER_HEADER
16 #define STXXL_MNG_BLOCK_PREFETCHER_HEADER
17 
18 #include <vector>
19 #include <queue>
20 
21 #include <stxxl/bits/common/onoff_switch.h>
22 #include <stxxl/bits/io/request.h>
23 #include <stxxl/bits/io/iostats.h>
24 #include <stxxl/bits/noncopyable.h>
25 
26 STXXL_BEGIN_NAMESPACE
27 
28 //! \addtogroup schedlayer
29 //! \{
30 
31 class set_switch_handler
32 {
33     onoff_switch& switch_;
34     completion_handler on_compl;
35 
36 public:
set_switch_handler(onoff_switch & _switch,const completion_handler & on_compl)37     set_switch_handler(onoff_switch& _switch, const completion_handler& on_compl)
38         : switch_(_switch), on_compl(on_compl)
39     { }
40 
operator()41     void operator () (request* req)
42     {
43         // call before setting switch to on, otherwise, user has no way to wait
44         // for the completion handler to be executed
45         on_compl(req);
46         switch_.on();
47     }
48 };
49 
50 //! Encapsulates asynchronous prefetching engine.
51 //!
52 //! \c block_prefetcher overlaps I/Os with consumption of read data.
53 //! Utilizes optimal asynchronous prefetch scheduling (by Peter Sanders et.al.)
54 template <typename BlockType, typename BidIteratorType>
55 class block_prefetcher : private noncopyable
56 {
57 public:
58     typedef BlockType block_type;
59     typedef BidIteratorType bid_iterator_type;
60 
61     typedef typename block_type::bid_type bid_type;
62 
63 protected:
64     bid_iterator_type consume_seq_begin;
65     bid_iterator_type consume_seq_end;
66     unsigned_type seq_length;
67 
68     int_type* prefetch_seq;
69 
70     unsigned_type nextread;
71     unsigned_type nextconsume;
72 
73     const int_type nreadblocks;
74 
75     block_type* read_buffers;
76     request_ptr* read_reqs;
77     bid_type* read_bids;
78 
79     onoff_switch* completed;
80     int_type* pref_buffer;
81 
82     completion_handler do_after_fetch;
83 
wait(int_type iblock)84     block_type * wait(int_type iblock)
85     {
86         STXXL_VERBOSE1("block_prefetcher: waiting block " << iblock);
87         {
88             stats::scoped_wait_timer wait_timer(stats::WAIT_OP_READ);
89 
90             completed[iblock].wait_for_on();
91         }
92         STXXL_VERBOSE1("block_prefetcher: finished waiting block " << iblock);
93         int_type ibuffer = pref_buffer[iblock];
94         STXXL_VERBOSE1("block_prefetcher: returning buffer " << ibuffer);
95         assert(ibuffer >= 0 && ibuffer < nreadblocks);
96         return (read_buffers + ibuffer);
97     }
98 
99 public:
100     //! Constructs an object and immediately starts prefetching.
101     //! \param _cons_begin \c bid_iterator pointing to the \c bid of the first block to be consumed
102     //! \param _cons_end \c bid_iterator pointing to the \c bid of the ( \b last + 1 ) block of consumption sequence
103     //! \param _pref_seq gives the prefetch order, is a pointer to the integer array that contains
104     //!        the indices of the blocks in the consumption sequence
105     //! \param _prefetch_buf_size amount of prefetch buffers to use
106     //! \param do_after_fetch unknown
107     block_prefetcher(
108         bid_iterator_type _cons_begin,
109         bid_iterator_type _cons_end,
110         int_type* _pref_seq,
111         int_type _prefetch_buf_size,
112         completion_handler do_after_fetch = completion_handler())
consume_seq_begin(_cons_begin)113         : consume_seq_begin(_cons_begin),
114           consume_seq_end(_cons_end),
115           seq_length(_cons_end - _cons_begin),
116           prefetch_seq(_pref_seq),
117           nextread(STXXL_MIN(unsigned_type(_prefetch_buf_size), seq_length)),
118           nextconsume(0),
119           nreadblocks(nextread),
120           do_after_fetch(do_after_fetch)
121     {
122         STXXL_VERBOSE1("block_prefetcher: seq_length=" << seq_length);
123         STXXL_VERBOSE1("block_prefetcher: _prefetch_buf_size=" << _prefetch_buf_size);
124         assert(seq_length > 0);
125         assert(_prefetch_buf_size > 0);
126         int_type i;
127         read_buffers = new block_type[nreadblocks];
128         read_reqs = new request_ptr[nreadblocks];
129         read_bids = new bid_type[nreadblocks];
130         pref_buffer = new int_type[seq_length];
131 
132         std::fill(pref_buffer, pref_buffer + seq_length, -1);
133 
134         completed = new onoff_switch[seq_length];
135 
136         for (i = 0; i < nreadblocks; ++i)
137         {
138             assert(prefetch_seq[i] < int_type(seq_length));
139             assert(prefetch_seq[i] >= 0);
140             read_bids[i] = *(consume_seq_begin + prefetch_seq[i]);
141             STXXL_VERBOSE1("block_prefetcher: reading block " << i <<
142                            " prefetch_seq[" << i << "]=" << prefetch_seq[i] <<
143                            " @ " << &read_buffers[i] <<
144                            " @ " << read_bids[i]);
145             read_reqs[i] = read_buffers[i].read(
146                 read_bids[i],
147                 set_switch_handler(*(completed + prefetch_seq[i]), do_after_fetch));
148             pref_buffer[prefetch_seq[i]] = i;
149         }
150     }
151     //! Pulls next unconsumed block from the consumption sequence.
152     //! \return Pointer to the already prefetched block from the internal buffer pool
pull_block()153     block_type * pull_block()
154     {
155         STXXL_VERBOSE1("block_prefetcher: pulling a block");
156         return wait(nextconsume++);
157     }
158     //! Exchanges buffers between prefetcher and application.
159     //! \param buffer pointer to the consumed buffer. After call if return value is true \c buffer
160     //!        contains valid pointer to the next unconsumed prefetched buffer.
161     //! \remark parameter \c buffer must be value returned by \c pull_block() or \c block_consumed() methods
162     //! \return \c false if there are no blocks to prefetch left, \c true if consumption sequence is not emptied
block_consumed(block_type * & buffer)163     bool block_consumed(block_type*& buffer)
164     {
165         int_type ibuffer = buffer - read_buffers;
166         STXXL_VERBOSE1("block_prefetcher: buffer " << ibuffer << " consumed");
167         if (read_reqs[ibuffer].valid())
168             read_reqs[ibuffer]->wait();
169 
170         read_reqs[ibuffer] = NULL;
171 
172         if (nextread < seq_length)
173         {
174             assert(ibuffer >= 0 && ibuffer < nreadblocks);
175             int_type next_2_prefetch = prefetch_seq[nextread++];
176             STXXL_VERBOSE1("block_prefetcher: prefetching block " << next_2_prefetch);
177 
178             assert((next_2_prefetch < int_type(seq_length)) && (next_2_prefetch >= 0));
179             assert(!completed[next_2_prefetch].is_on());
180 
181             pref_buffer[next_2_prefetch] = ibuffer;
182             read_bids[ibuffer] = *(consume_seq_begin + next_2_prefetch);
183             read_reqs[ibuffer] = read_buffers[ibuffer].read(
184                 read_bids[ibuffer],
185                 set_switch_handler(*(completed + next_2_prefetch), do_after_fetch)
186                 );
187         }
188 
189         if (nextconsume >= seq_length)
190             return false;
191 
192         buffer = wait(nextconsume++);
193 
194         return true;
195     }
196 
197     //! No more consumable blocks available, but can't delete the prefetcher,
198     //! because not all blocks may have been returned, yet.
empty()199     bool empty() const
200     {
201         return nextconsume >= seq_length;
202     }
203 
204     //! Index of the next element in the consume sequence.
pos()205     unsigned_type pos() const
206     {
207         return nextconsume;
208     }
209 
210     //! Frees used memory.
~block_prefetcher()211     ~block_prefetcher()
212     {
213         for (int_type i = 0; i < nreadblocks; ++i)
214             if (read_reqs[i].valid())
215                 read_reqs[i]->wait();
216 
217         delete[] read_reqs;
218         delete[] read_bids;
219         delete[] completed;
220         delete[] pref_buffer;
221         delete[] read_buffers;
222     }
223 };
224 
225 //! \}
226 
227 STXXL_END_NAMESPACE
228 
229 #endif // !STXXL_MNG_BLOCK_PREFETCHER_HEADER
230