1 /*
2     Copyright (c) 2007-2010 iMatix Corporation
3 
4     This file is part of 0MQ.
5 
6     0MQ is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License as published by
8     the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     0MQ is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU Lesser General Public License for more details.
15 
16     You should have received a copy of the GNU Lesser General Public License
17     along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19 
20 #include "platform.hpp"
21 
22 #ifdef ZMQ_HAVE_WINDOWS
23 #include "windows.hpp"
24 #include <io.h>
25 #else
26 #include <unistd.h>
27 #endif
28 
29 #include "../include/zmq.h"
30 
31 #include <sys/types.h>
32 #include <sys/stat.h>
33 #include <fcntl.h>
34 #include <string.h>
35 #include <sstream>
36 #include <algorithm>
37 
38 #include "swap.hpp"
39 #include "config.hpp"
40 #include "atomic_counter.hpp"
41 #include "err.hpp"
42 
swap_t(int64_t filesize_)43 zmq::swap_t::swap_t (int64_t filesize_) :
44     fd (-1),
45     filesize (filesize_),
46     file_pos (0),
47     write_pos (0),
48     read_pos (0),
49     block_size (swap_block_size),
50     write_buf_start_addr (0)
51 {
52     zmq_assert (filesize > 0);
53     zmq_assert (block_size > 0);
54 
55     buf1 = new (std::nothrow) char [block_size];
56     alloc_assert (buf1);
57 
58     buf2 = new (std::nothrow) char [block_size];
59     alloc_assert (buf2);
60 
61     read_buf = write_buf = buf1;
62 }
63 
~swap_t()64 zmq::swap_t::~swap_t ()
65 {
66     delete [] buf1;
67     delete [] buf2;
68 
69     if (fd == -1)
70         return;
71 
72 #ifdef ZMQ_HAVE_WINDOWS
73     int rc = _close (fd);
74 #else
75     int rc = close (fd);
76 #endif
77     errno_assert (rc == 0);
78 
79 #ifdef ZMQ_HAVE_WINDOWS
80     rc = _unlink (filename.c_str ());
81 #else
82     rc = unlink (filename.c_str ());
83 #endif
84     errno_assert (rc == 0);
85 }
86 
init()87 int zmq::swap_t::init ()
88 {
89     static zmq::atomic_counter_t seqnum (0);
90 
91     //  Get process ID.
92 #ifdef ZMQ_HAVE_WINDOWS
93     int pid = GetCurrentThreadId ();
94 #else
95     pid_t pid = getpid ();
96 #endif
97 
98     std::ostringstream outs;
99     outs << "zmq_" << pid << '_' << seqnum.get () << ".swap";
100     filename = outs.str ();
101 
102     seqnum.add (1);
103 
104     //  Open the backing file.
105 #ifdef ZMQ_HAVE_WINDOWS
106     fd = _open (filename.c_str (), _O_RDWR | _O_CREAT, 0600);
107 #else
108     fd = open (filename.c_str (), O_RDWR | O_CREAT, 0600);
109 #endif
110     if (fd == -1)
111         return -1;
112 
113 #ifdef ZMQ_HAVE_LINUX
114     //  Enable more aggresive read-ahead optimization.
115     posix_fadvise (fd, 0, filesize, POSIX_FADV_SEQUENTIAL);
116 #endif
117     return 0;
118 }
119 
store(zmq_msg_t * msg_)120 bool zmq::swap_t::store (zmq_msg_t *msg_)
121 {
122     size_t msg_size = zmq_msg_size (msg_);
123 
124     //  Check buffer space availability.
125     //  NOTE: We always keep one byte open.
126     if (buffer_space () <= (int64_t) (sizeof msg_size + 1 + msg_size))
127         return false;
128 
129     //  Don't store the ZMQ_MSG_SHARED flag.
130     uint8_t msg_flags = msg_->flags & ~ZMQ_MSG_SHARED;
131 
132     //  Write message length, flags, and message body.
133     copy_to_file (&msg_size, sizeof msg_size);
134     copy_to_file (&msg_flags, sizeof msg_flags);
135     copy_to_file (zmq_msg_data (msg_), msg_size);
136 
137     zmq_msg_close (msg_);
138 
139     return true;
140 }
141 
fetch(zmq_msg_t * msg_)142 void zmq::swap_t::fetch (zmq_msg_t *msg_)
143 {
144     //  There must be at least one message available.
145     zmq_assert (read_pos != write_pos);
146 
147     //  Retrieve the message size.
148     size_t msg_size;
149     copy_from_file (&msg_size, sizeof msg_size);
150 
151     //  Initialize the message.
152     zmq_msg_init_size (msg_, msg_size);
153 
154     //  Retrieve the message flags.
155     copy_from_file (&msg_->flags, sizeof msg_->flags);
156 
157     //  Retrieve the message payload.
158     copy_from_file (zmq_msg_data (msg_), msg_size);
159 }
160 
commit()161 void zmq::swap_t::commit ()
162 {
163     commit_pos = write_pos;
164 }
165 
rollback()166 void zmq::swap_t::rollback ()
167 {
168     if (commit_pos == write_pos || read_pos == write_pos)
169         return;
170 
171     if (write_pos > read_pos)
172         zmq_assert (read_pos <= commit_pos && commit_pos <= write_pos);
173     else
174         zmq_assert (read_pos <= commit_pos || commit_pos <= write_pos);
175 
176     if (commit_pos / block_size == read_pos / block_size) {
177         write_buf_start_addr = commit_pos % block_size;
178         write_buf = read_buf;
179     }
180     else if (commit_pos / block_size != write_pos / block_size) {
181         write_buf_start_addr = commit_pos % block_size;
182         fill_buf (write_buf, write_buf_start_addr);
183     }
184     write_pos = commit_pos;
185 }
186 
empty()187 bool zmq::swap_t::empty ()
188 {
189     return read_pos == write_pos;
190 }
191 
192 /*
193 bool zmq::swap_t::full ()
194 {
195     //  Check that at least the message size can be written to the swap.
196     return buffer_space () < (int64_t) (sizeof (size_t) + 1);
197 }
198 */
199 
fits(zmq_msg_t * msg_)200 bool zmq::swap_t::fits (zmq_msg_t *msg_)
201 {
202     //  Check whether whole binary representation of the message
203     //  fits into the swap.
204     size_t msg_size = zmq_msg_size (msg_);
205     if (buffer_space () <= (int64_t) (sizeof msg_size + 1 + msg_size))
206         return false;
207     return true;
208  }
209 
copy_from_file(void * buffer_,size_t count_)210 void zmq::swap_t::copy_from_file (void *buffer_, size_t count_)
211 {
212     char *dest_ptr = (char *) buffer_;
213     size_t chunk_size, remainder = count_;
214 
215     while (remainder > 0) {
216         chunk_size = std::min (remainder,
217             std::min ((size_t) (filesize - read_pos),
218             (size_t) (block_size - read_pos % block_size)));
219 
220         memcpy (dest_ptr, &read_buf [read_pos % block_size], chunk_size);
221         dest_ptr += chunk_size;
222 
223         read_pos = (read_pos + chunk_size) % filesize;
224         if (read_pos % block_size == 0) {
225             if (read_pos / block_size == write_pos / block_size)
226                 read_buf = write_buf;
227             else
228                 fill_buf (read_buf, read_pos);
229         }
230         remainder -= chunk_size;
231     }
232 }
233 
copy_to_file(const void * buffer_,size_t count_)234 void zmq::swap_t::copy_to_file (const void *buffer_, size_t count_)
235 {
236     char *source_ptr = (char *) buffer_;
237     size_t chunk_size, remainder = count_;
238 
239     while (remainder > 0) {
240         chunk_size = std::min (remainder,
241             std::min ((size_t) (filesize - write_pos),
242             (size_t) (block_size - write_pos % block_size)));
243 
244         memcpy (&write_buf [write_pos % block_size], source_ptr, chunk_size);
245         source_ptr += chunk_size;
246 
247         write_pos = (write_pos + chunk_size) % filesize;
248         if (write_pos % block_size == 0) {
249             save_write_buf ();
250             write_buf_start_addr = write_pos;
251 
252             if (write_buf == read_buf) {
253                 if (read_buf == buf2)
254                     write_buf = buf1;
255                 else
256                     write_buf = buf2;
257             }
258         }
259         remainder -= chunk_size;
260     }
261 }
262 
fill_buf(char * buf,int64_t pos)263 void zmq::swap_t::fill_buf (char *buf, int64_t pos)
264 {
265     if (file_pos != pos) {
266 #ifdef ZMQ_HAVE_WINDOWS
267         __int64 offset = _lseeki64 (fd, pos, SEEK_SET);
268 #else
269         off_t offset = lseek (fd, (off_t) pos, SEEK_SET);
270 #endif
271         errno_assert (offset == pos);
272         file_pos = pos;
273     }
274     size_t octets_stored = 0;
275     size_t octets_total = std::min (block_size, (size_t) (filesize - file_pos));
276 
277     while (octets_stored < octets_total) {
278 #ifdef ZMQ_HAVE_WINDOWS
279         int rc = _read (fd, &buf [octets_stored], octets_total - octets_stored);
280 #else
281         ssize_t rc = read (fd, &buf [octets_stored],
282             octets_total - octets_stored);
283 #endif
284         errno_assert (rc > 0);
285         octets_stored += rc;
286     }
287     file_pos += octets_total;
288 }
289 
save_write_buf()290 void zmq::swap_t::save_write_buf ()
291 {
292     if (file_pos != write_buf_start_addr) {
293 #ifdef ZMQ_HAVE_WINDOWS
294         __int64 offset = _lseeki64 (fd, write_buf_start_addr, SEEK_SET);
295 #else
296         off_t offset = lseek (fd, (off_t) write_buf_start_addr, SEEK_SET);
297 #endif
298         errno_assert (offset == write_buf_start_addr);
299         file_pos = write_buf_start_addr;
300     }
301     size_t octets_stored = 0;
302     size_t octets_total = std::min (block_size, (size_t) (filesize - file_pos));
303 
304     while (octets_stored < octets_total) {
305 #ifdef ZMQ_HAVE_WINDOWS
306         int rc = _write (fd, &write_buf [octets_stored],
307             octets_total - octets_stored);
308 #else
309         ssize_t rc = write (fd, &write_buf [octets_stored],
310             octets_total - octets_stored);
311 #endif
312         errno_assert (rc > 0);
313         octets_stored += rc;
314     }
315     file_pos += octets_total;
316 }
317 
buffer_space()318 int64_t zmq::swap_t::buffer_space ()
319 {
320     if (write_pos < read_pos)
321         return read_pos - write_pos;
322 
323     return filesize - (write_pos - read_pos);
324 }
325