1 
2 /*
3  * Redistribution and use in source and binary forms, with or
4  * without modification, are permitted provided that the following
5  * conditions are met:
6  *
7  * 1. Redistributions of source code must retain the above
8  *    copyright notice, this list of conditions and the
9  *    following disclaimer.
10  *
11  * 2. Redistributions in binary form must reproduce the above
12  *    copyright notice, this list of conditions and the following
13  *    disclaimer in the documentation and/or other materials
14  *    provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
18  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
20  * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
21  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
24  * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
25  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
27  * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28  * SUCH DAMAGE.
29  */
30 
31 #include <stdlib.h>
32 #include <stdint.h>
33 #include <string.h>
34 
35 #include <tarantool/tnt_mem.h>
36 #include <tarantool/tnt_proto.h>
37 #include <tarantool/tnt_reply.h>
38 #include <tarantool/tnt_stream.h>
39 #include <tarantool/tnt_buf.h>
40 
tnt_buf_free(struct tnt_stream * s)41 static void tnt_buf_free(struct tnt_stream *s) {
42 	struct tnt_stream_buf *sb = TNT_SBUF_CAST(s);
43 	if (!sb->as && sb->data)
44 		tnt_mem_free(sb->data);
45 	if (sb->free)
46 		sb->free(s);
47 	tnt_mem_free(s->data);
48 	s->data = NULL;
49 }
50 
51 static ssize_t
tnt_buf_read(struct tnt_stream * s,char * buf,size_t size)52 tnt_buf_read(struct tnt_stream *s, char *buf, size_t size) {
53 	struct tnt_stream_buf *sb = TNT_SBUF_CAST(s);
54 	if (sb->data == NULL)
55 		return 0;
56 	if (sb->size == sb->rdoff)
57 		return 0;
58 	size_t avail = sb->size - sb->rdoff;
59 	if (size > avail)
60 		size = avail;
61 	memcpy(sb->data + sb->rdoff, buf, size);
62 	sb->rdoff += size;
63 	return size;
64 }
65 
tnt_buf_resize(struct tnt_stream * s,size_t size)66 static char* tnt_buf_resize(struct tnt_stream *s, size_t size) {
67 	struct tnt_stream_buf *sb = TNT_SBUF_CAST(s);
68 	size_t off = sb->size;
69 	size_t nsize = off + size;
70 	char *nd = realloc(sb->data, nsize);
71 	if (nd == NULL) {
72 		tnt_mem_free(sb->data);
73 		return NULL;
74 	}
75 	sb->data = nd;
76 	sb->alloc = nsize;
77 	return sb->data + off;
78 }
79 
80 static ssize_t
tnt_buf_write(struct tnt_stream * s,const char * buf,size_t size)81 tnt_buf_write(struct tnt_stream *s, const char *buf, size_t size) {
82 	if (TNT_SBUF_CAST(s)->as == 1) return -1;
83 	char *p = TNT_SBUF_CAST(s)->resize(s, size);
84 	if (p == NULL)
85 		return -1;
86 	memcpy(p, buf, size);
87 	TNT_SBUF_CAST(s)->size += size;
88 	s->wrcnt++;
89 	return size;
90 }
91 
92 static ssize_t
tnt_buf_writev(struct tnt_stream * s,struct iovec * iov,int count)93 tnt_buf_writev(struct tnt_stream *s, struct iovec *iov, int count) {
94 	if (TNT_SBUF_CAST(s)->as == 1) return -1;
95 	size_t size = 0;
96 	int i;
97 	for (i = 0 ; i < count ; i++)
98 		size += iov[i].iov_len;
99 	char *p = TNT_SBUF_CAST(s)->resize(s, size);
100 	if (p == NULL)
101 		return -1;
102 	for (i = 0 ; i < count ; i++) {
103 		memcpy(p, iov[i].iov_base, iov[i].iov_len);
104 		p += iov[i].iov_len;
105 	}
106 	TNT_SBUF_CAST(s)->size += size;
107 	s->wrcnt++;
108 	return size;
109 }
110 
111 static int
tnt_buf_reply(struct tnt_stream * s,struct tnt_reply * r)112 tnt_buf_reply(struct tnt_stream *s, struct tnt_reply *r) {
113 	struct tnt_stream_buf *sb = TNT_SBUF_CAST(s);
114 	if (sb->data == NULL)
115 		return -1;
116 	if (sb->size == sb->rdoff)
117 		return 1;
118 	size_t off = 0;
119 	int rc = tnt_reply(r, sb->data + sb->rdoff, sb->size - sb->rdoff, &off);
120 	if (rc == 0)
121 		sb->rdoff += off;
122 	return rc;
123 }
124 
tnt_buf(struct tnt_stream * s)125 struct tnt_stream *tnt_buf(struct tnt_stream *s) {
126 	int allocated = s == NULL;
127 	s = tnt_stream_init(s);
128 	if (s == NULL)
129 		return NULL;
130 	/* allocating stream data */
131 	s->data = tnt_mem_alloc(sizeof(struct tnt_stream_buf));
132 	if (s->data == NULL) {
133 		if (allocated)
134 			tnt_stream_free(s);
135 		return NULL;
136 	}
137 	/* initializing interfaces */
138 	s->read       = tnt_buf_read;
139 	s->read_reply = tnt_buf_reply;
140 	s->write      = tnt_buf_write;
141 	s->writev     = tnt_buf_writev;
142 	s->free       = tnt_buf_free;
143 	/* initializing internal data */
144 	struct tnt_stream_buf *sb = TNT_SBUF_CAST(s);
145 	sb->rdoff   = 0;
146 	sb->size    = 0;
147 	sb->alloc   = 0;
148 	sb->data    = NULL;
149 	sb->resize  = tnt_buf_resize;
150 	sb->free    = NULL;
151 	sb->subdata = NULL;
152 	sb->as      = 0;
153 	return s;
154 }
155 
tnt_buf_as(struct tnt_stream * s,char * buf,size_t buf_len)156 struct tnt_stream *tnt_buf_as(struct tnt_stream *s, char *buf, size_t buf_len)
157 {
158 	if (s == NULL) {
159 		s = tnt_buf(s);
160 		if (s == NULL)
161 			return NULL;
162 	}
163 	struct tnt_stream_buf *sb = TNT_SBUF_CAST(s);
164 
165 	sb->data = buf;
166 	sb->size = buf_len;
167 	sb->alloc = buf_len;
168 	sb->as = 1;
169 
170 	return s;
171 }
172