1 #include "stream_base-inl.h"
2 #include "stream_wrap.h"
3
4 #include "node.h"
5 #include "node_buffer.h"
6 #include "node_errors.h"
7 #include "node_internals.h"
8 #include "env-inl.h"
9 #include "js_stream.h"
10 #include "string_bytes.h"
11 #include "util.h"
12 #include "util-inl.h"
13 #include "v8.h"
14
15 #include <limits.h> // INT_MAX
16
17 namespace node {
18
19 using v8::Array;
20 using v8::Boolean;
21 using v8::Context;
22 using v8::FunctionCallbackInfo;
23 using v8::HandleScope;
24 using v8::Integer;
25 using v8::Local;
26 using v8::Number;
27 using v8::Object;
28 using v8::String;
29 using v8::Value;
30
31 template int StreamBase::WriteString<ASCII>(
32 const FunctionCallbackInfo<Value>& args);
33 template int StreamBase::WriteString<UTF8>(
34 const FunctionCallbackInfo<Value>& args);
35 template int StreamBase::WriteString<UCS2>(
36 const FunctionCallbackInfo<Value>& args);
37 template int StreamBase::WriteString<LATIN1>(
38 const FunctionCallbackInfo<Value>& args);
39
40
ReadStartJS(const FunctionCallbackInfo<Value> & args)41 int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
42 return ReadStart();
43 }
44
45
ReadStopJS(const FunctionCallbackInfo<Value> & args)46 int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
47 return ReadStop();
48 }
49
50
Shutdown(const FunctionCallbackInfo<Value> & args)51 int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
52 CHECK(args[0]->IsObject());
53 Local<Object> req_wrap_obj = args[0].As<Object>();
54
55 return Shutdown(req_wrap_obj);
56 }
57
SetWriteResultPropertiesOnWrapObject(Environment * env,Local<Object> req_wrap_obj,const StreamWriteResult & res)58 inline void SetWriteResultPropertiesOnWrapObject(
59 Environment* env,
60 Local<Object> req_wrap_obj,
61 const StreamWriteResult& res) {
62 req_wrap_obj->Set(
63 env->context(),
64 env->bytes_string(),
65 Number::New(env->isolate(), res.bytes)).FromJust();
66 req_wrap_obj->Set(
67 env->context(),
68 env->async(),
69 Boolean::New(env->isolate(), res.async)).FromJust();
70 }
71
Writev(const FunctionCallbackInfo<Value> & args)72 int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
73 Environment* env = Environment::GetCurrent(args);
74
75 CHECK(args[0]->IsObject());
76 CHECK(args[1]->IsArray());
77
78 Local<Object> req_wrap_obj = args[0].As<Object>();
79 Local<Array> chunks = args[1].As<Array>();
80 bool all_buffers = args[2]->IsTrue();
81
82 size_t count;
83 if (all_buffers)
84 count = chunks->Length();
85 else
86 count = chunks->Length() >> 1;
87
88 MaybeStackBuffer<uv_buf_t, 16> bufs(count);
89
90 size_t storage_size = 0;
91 size_t offset;
92
93 if (!all_buffers) {
94 // Determine storage size first
95 for (size_t i = 0; i < count; i++) {
96 Local<Value> chunk = chunks->Get(i * 2);
97
98 if (Buffer::HasInstance(chunk))
99 continue;
100 // Buffer chunk, no additional storage required
101
102 // String chunk
103 Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
104 enum encoding encoding = ParseEncoding(env->isolate(),
105 chunks->Get(i * 2 + 1));
106 size_t chunk_size;
107 if (encoding == UTF8 && string->Length() > 65535 &&
108 !StringBytes::Size(env->isolate(), string, encoding).To(&chunk_size))
109 return 0;
110 else if (!StringBytes::StorageSize(env->isolate(), string, encoding)
111 .To(&chunk_size))
112 return 0;
113 storage_size += chunk_size;
114 }
115
116 if (storage_size > INT_MAX)
117 return UV_ENOBUFS;
118 } else {
119 for (size_t i = 0; i < count; i++) {
120 Local<Value> chunk = chunks->Get(i);
121 bufs[i].base = Buffer::Data(chunk);
122 bufs[i].len = Buffer::Length(chunk);
123 }
124 }
125
126 MallocedBuffer<char> storage;
127 if (storage_size > 0)
128 storage = MallocedBuffer<char>(storage_size);
129
130 offset = 0;
131 if (!all_buffers) {
132 for (size_t i = 0; i < count; i++) {
133 Local<Value> chunk = chunks->Get(i * 2);
134
135 // Write buffer
136 if (Buffer::HasInstance(chunk)) {
137 bufs[i].base = Buffer::Data(chunk);
138 bufs[i].len = Buffer::Length(chunk);
139 continue;
140 }
141
142 // Write string
143 CHECK_LE(offset, storage_size);
144 char* str_storage = storage.data + offset;
145 size_t str_size = storage_size - offset;
146
147 Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
148 enum encoding encoding = ParseEncoding(env->isolate(),
149 chunks->Get(i * 2 + 1));
150 str_size = StringBytes::Write(env->isolate(),
151 str_storage,
152 str_size,
153 string,
154 encoding);
155 bufs[i].base = str_storage;
156 bufs[i].len = str_size;
157 offset += str_size;
158 }
159 }
160
161 StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
162 SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res);
163 if (res.wrap != nullptr && storage_size > 0) {
164 res.wrap->SetAllocatedStorage(storage.release(), storage_size);
165 }
166 return res.err;
167 }
168
169
WriteBuffer(const FunctionCallbackInfo<Value> & args)170 int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
171 CHECK(args[0]->IsObject());
172
173 Environment* env = Environment::GetCurrent(args);
174
175 if (!args[1]->IsUint8Array()) {
176 node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer");
177 return 0;
178 }
179
180 Local<Object> req_wrap_obj = args[0].As<Object>();
181
182 uv_buf_t buf;
183 buf.base = Buffer::Data(args[1]);
184 buf.len = Buffer::Length(args[1]);
185
186 StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj);
187
188 if (res.async)
189 req_wrap_obj->Set(env->context(), env->buffer_string(), args[1]).FromJust();
190 SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res);
191
192 return res.err;
193 }
194
195
196 template <enum encoding enc>
WriteString(const FunctionCallbackInfo<Value> & args)197 int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
198 Environment* env = Environment::GetCurrent(args);
199 CHECK(args[0]->IsObject());
200 CHECK(args[1]->IsString());
201
202 Local<Object> req_wrap_obj = args[0].As<Object>();
203 Local<String> string = args[1].As<String>();
204 Local<Object> send_handle_obj;
205 if (args[2]->IsObject())
206 send_handle_obj = args[2].As<Object>();
207
208 // Compute the size of the storage that the string will be flattened into.
209 // For UTF8 strings that are very long, go ahead and take the hit for
210 // computing their actual size, rather than tripling the storage.
211 size_t storage_size;
212 if (enc == UTF8 && string->Length() > 65535 &&
213 !StringBytes::Size(env->isolate(), string, enc).To(&storage_size))
214 return 0;
215 else if (!StringBytes::StorageSize(env->isolate(), string, enc)
216 .To(&storage_size))
217 return 0;
218
219 if (storage_size > INT_MAX)
220 return UV_ENOBUFS;
221
222 // Try writing immediately if write size isn't too big
223 char stack_storage[16384]; // 16kb
224 size_t data_size;
225 size_t synchronously_written = 0;
226 uv_buf_t buf;
227
228 bool try_write = storage_size <= sizeof(stack_storage) &&
229 (!IsIPCPipe() || send_handle_obj.IsEmpty());
230 if (try_write) {
231 data_size = StringBytes::Write(env->isolate(),
232 stack_storage,
233 storage_size,
234 string,
235 enc);
236 buf = uv_buf_init(stack_storage, data_size);
237
238 uv_buf_t* bufs = &buf;
239 size_t count = 1;
240 const int err = DoTryWrite(&bufs, &count);
241 // Keep track of the bytes written here, because we're taking a shortcut
242 // by using `DoTryWrite()` directly instead of using the utilities
243 // provided by `Write()`.
244 synchronously_written = count == 0 ? data_size : data_size - buf.len;
245 bytes_written_ += synchronously_written;
246
247 // Immediate failure or success
248 if (err != 0 || count == 0) {
249 req_wrap_obj->Set(env->context(), env->async(), False(env->isolate()))
250 .FromJust();
251 req_wrap_obj->Set(env->context(),
252 env->bytes_string(),
253 Integer::NewFromUnsigned(env->isolate(), data_size))
254 .FromJust();
255 return err;
256 }
257
258 // Partial write
259 CHECK_EQ(count, 1);
260 }
261
262 MallocedBuffer<char> data;
263
264 if (try_write) {
265 // Copy partial data
266 data = MallocedBuffer<char>(buf.len);
267 memcpy(data.data, buf.base, buf.len);
268 data_size = buf.len;
269 } else {
270 // Write it
271 data = MallocedBuffer<char>(storage_size);
272 data_size = StringBytes::Write(env->isolate(),
273 data.data,
274 storage_size,
275 string,
276 enc);
277 }
278
279 CHECK_LE(data_size, storage_size);
280
281 buf = uv_buf_init(data.data, data_size);
282
283 uv_stream_t* send_handle = nullptr;
284
285 if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
286 HandleWrap* wrap;
287 ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
288 send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
289 // Reference LibuvStreamWrap instance to prevent it from being garbage
290 // collected before `AfterWrite` is called.
291 req_wrap_obj->Set(env->handle_string(), send_handle_obj);
292 }
293
294 StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
295 res.bytes += synchronously_written;
296
297 SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res);
298 if (res.wrap != nullptr) {
299 res.wrap->SetAllocatedStorage(data.release(), data_size);
300 }
301
302 return res.err;
303 }
304
305
CallJSOnreadMethod(ssize_t nread,Local<Object> buf)306 void StreamBase::CallJSOnreadMethod(ssize_t nread, Local<Object> buf) {
307 Environment* env = env_;
308
309 Local<Value> argv[] = {
310 Integer::New(env->isolate(), nread),
311 buf
312 };
313
314 if (argv[1].IsEmpty())
315 argv[1] = Undefined(env->isolate());
316
317 AsyncWrap* wrap = GetAsyncWrap();
318 CHECK_NOT_NULL(wrap);
319 wrap->MakeCallback(env->onread_string(), arraysize(argv), argv);
320 }
321
322
IsIPCPipe()323 bool StreamBase::IsIPCPipe() {
324 return false;
325 }
326
327
GetFD()328 int StreamBase::GetFD() {
329 return -1;
330 }
331
332
GetObject()333 Local<Object> StreamBase::GetObject() {
334 return GetAsyncWrap()->object();
335 }
336
337
DoTryWrite(uv_buf_t ** bufs,size_t * count)338 int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
339 // No TryWrite by default
340 return 0;
341 }
342
343
Error() const344 const char* StreamResource::Error() const {
345 return nullptr;
346 }
347
348
ClearError()349 void StreamResource::ClearError() {
350 // No-op
351 }
352
353
OnStreamAlloc(size_t suggested_size)354 uv_buf_t StreamListener::OnStreamAlloc(size_t suggested_size) {
355 return uv_buf_init(Malloc(suggested_size), suggested_size);
356 }
357
358
OnStreamRead(ssize_t nread,const uv_buf_t & buf)359 void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
360 CHECK_NOT_NULL(stream_);
361 StreamBase* stream = static_cast<StreamBase*>(stream_);
362 Environment* env = stream->stream_env();
363 HandleScope handle_scope(env->isolate());
364 Context::Scope context_scope(env->context());
365
366 if (nread <= 0) {
367 free(buf.base);
368 if (nread < 0)
369 stream->CallJSOnreadMethod(nread, Local<Object>());
370 return;
371 }
372
373 CHECK_LE(static_cast<size_t>(nread), buf.len);
374 char* base = Realloc(buf.base, nread);
375
376 Local<Object> obj = Buffer::New(env, base, nread).ToLocalChecked();
377 stream->CallJSOnreadMethod(nread, obj);
378 }
379
380
OnStreamAfterReqFinished(StreamReq * req_wrap,int status)381 void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
382 StreamReq* req_wrap, int status) {
383 StreamBase* stream = static_cast<StreamBase*>(stream_);
384 Environment* env = stream->stream_env();
385 AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
386 HandleScope handle_scope(env->isolate());
387 Context::Scope context_scope(env->context());
388 CHECK(!async_wrap->persistent().IsEmpty());
389 Local<Object> req_wrap_obj = async_wrap->object();
390
391 Local<Value> argv[] = {
392 Integer::New(env->isolate(), status),
393 stream->GetObject(),
394 Undefined(env->isolate())
395 };
396
397 const char* msg = stream->Error();
398 if (msg != nullptr) {
399 argv[2] = OneByteString(env->isolate(), msg);
400 stream->ClearError();
401 }
402
403 if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
404 async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
405 }
406
OnStreamAfterWrite(WriteWrap * req_wrap,int status)407 void ReportWritesToJSStreamListener::OnStreamAfterWrite(
408 WriteWrap* req_wrap, int status) {
409 OnStreamAfterReqFinished(req_wrap, status);
410 }
411
OnStreamAfterShutdown(ShutdownWrap * req_wrap,int status)412 void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
413 ShutdownWrap* req_wrap, int status) {
414 OnStreamAfterReqFinished(req_wrap, status);
415 }
416
417
418 } // namespace node
419