1
2 /*
3 * Redistribution and use in source and binary forms, with or
4 * without modification, are permitted provided that the following
5 * conditions are met:
6 *
7 * 1. Redistributions of source code must retain the above
8 * copyright notice, this list of conditions and the
9 * following disclaimer.
10 *
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
18 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
20 * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
21 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
24 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
25 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
27 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28 * SUCH DAMAGE.
29 */
30
31 #include <stdlib.h>
32 #include <stdint.h>
33 #include <string.h>
34
35 #include <tarantool/tnt_mem.h>
36 #include <tarantool/tnt_proto.h>
37 #include <tarantool/tnt_reply.h>
38 #include <tarantool/tnt_stream.h>
39 #include <tarantool/tnt_buf.h>
40
tnt_buf_free(struct tnt_stream * s)41 static void tnt_buf_free(struct tnt_stream *s) {
42 struct tnt_stream_buf *sb = TNT_SBUF_CAST(s);
43 if (!sb->as && sb->data)
44 tnt_mem_free(sb->data);
45 if (sb->free)
46 sb->free(s);
47 tnt_mem_free(s->data);
48 s->data = NULL;
49 }
50
51 static ssize_t
tnt_buf_read(struct tnt_stream * s,char * buf,size_t size)52 tnt_buf_read(struct tnt_stream *s, char *buf, size_t size) {
53 struct tnt_stream_buf *sb = TNT_SBUF_CAST(s);
54 if (sb->data == NULL)
55 return 0;
56 if (sb->size == sb->rdoff)
57 return 0;
58 size_t avail = sb->size - sb->rdoff;
59 if (size > avail)
60 size = avail;
61 memcpy(sb->data + sb->rdoff, buf, size);
62 sb->rdoff += size;
63 return size;
64 }
65
tnt_buf_resize(struct tnt_stream * s,size_t size)66 static char* tnt_buf_resize(struct tnt_stream *s, size_t size) {
67 struct tnt_stream_buf *sb = TNT_SBUF_CAST(s);
68 size_t off = sb->size;
69 size_t nsize = off + size;
70 char *nd = realloc(sb->data, nsize);
71 if (nd == NULL) {
72 tnt_mem_free(sb->data);
73 return NULL;
74 }
75 sb->data = nd;
76 sb->alloc = nsize;
77 return sb->data + off;
78 }
79
80 static ssize_t
tnt_buf_write(struct tnt_stream * s,const char * buf,size_t size)81 tnt_buf_write(struct tnt_stream *s, const char *buf, size_t size) {
82 if (TNT_SBUF_CAST(s)->as == 1) return -1;
83 char *p = TNT_SBUF_CAST(s)->resize(s, size);
84 if (p == NULL)
85 return -1;
86 memcpy(p, buf, size);
87 TNT_SBUF_CAST(s)->size += size;
88 s->wrcnt++;
89 return size;
90 }
91
92 static ssize_t
tnt_buf_writev(struct tnt_stream * s,struct iovec * iov,int count)93 tnt_buf_writev(struct tnt_stream *s, struct iovec *iov, int count) {
94 if (TNT_SBUF_CAST(s)->as == 1) return -1;
95 size_t size = 0;
96 int i;
97 for (i = 0 ; i < count ; i++)
98 size += iov[i].iov_len;
99 char *p = TNT_SBUF_CAST(s)->resize(s, size);
100 if (p == NULL)
101 return -1;
102 for (i = 0 ; i < count ; i++) {
103 memcpy(p, iov[i].iov_base, iov[i].iov_len);
104 p += iov[i].iov_len;
105 }
106 TNT_SBUF_CAST(s)->size += size;
107 s->wrcnt++;
108 return size;
109 }
110
111 static int
tnt_buf_reply(struct tnt_stream * s,struct tnt_reply * r)112 tnt_buf_reply(struct tnt_stream *s, struct tnt_reply *r) {
113 struct tnt_stream_buf *sb = TNT_SBUF_CAST(s);
114 if (sb->data == NULL)
115 return -1;
116 if (sb->size == sb->rdoff)
117 return 1;
118 size_t off = 0;
119 int rc = tnt_reply(r, sb->data + sb->rdoff, sb->size - sb->rdoff, &off);
120 if (rc == 0)
121 sb->rdoff += off;
122 return rc;
123 }
124
tnt_buf(struct tnt_stream * s)125 struct tnt_stream *tnt_buf(struct tnt_stream *s) {
126 int allocated = s == NULL;
127 s = tnt_stream_init(s);
128 if (s == NULL)
129 return NULL;
130 /* allocating stream data */
131 s->data = tnt_mem_alloc(sizeof(struct tnt_stream_buf));
132 if (s->data == NULL) {
133 if (allocated)
134 tnt_stream_free(s);
135 return NULL;
136 }
137 /* initializing interfaces */
138 s->read = tnt_buf_read;
139 s->read_reply = tnt_buf_reply;
140 s->write = tnt_buf_write;
141 s->writev = tnt_buf_writev;
142 s->free = tnt_buf_free;
143 /* initializing internal data */
144 struct tnt_stream_buf *sb = TNT_SBUF_CAST(s);
145 sb->rdoff = 0;
146 sb->size = 0;
147 sb->alloc = 0;
148 sb->data = NULL;
149 sb->resize = tnt_buf_resize;
150 sb->free = NULL;
151 sb->subdata = NULL;
152 sb->as = 0;
153 return s;
154 }
155
tnt_buf_as(struct tnt_stream * s,char * buf,size_t buf_len)156 struct tnt_stream *tnt_buf_as(struct tnt_stream *s, char *buf, size_t buf_len)
157 {
158 if (s == NULL) {
159 s = tnt_buf(s);
160 if (s == NULL)
161 return NULL;
162 }
163 struct tnt_stream_buf *sb = TNT_SBUF_CAST(s);
164
165 sb->data = buf;
166 sb->size = buf_len;
167 sb->alloc = buf_len;
168 sb->as = 1;
169
170 return s;
171 }
172