1 #include "stream_base-inl.h"
2 #include "stream_wrap.h"
3 
4 #include "node.h"
5 #include "node_buffer.h"
6 #include "node_errors.h"
7 #include "node_internals.h"
8 #include "env-inl.h"
9 #include "js_stream.h"
10 #include "string_bytes.h"
11 #include "util.h"
12 #include "util-inl.h"
13 #include "v8.h"
14 
15 #include <limits.h>  // INT_MAX
16 
17 namespace node {
18 
19 using v8::Array;
20 using v8::Boolean;
21 using v8::Context;
22 using v8::FunctionCallbackInfo;
23 using v8::HandleScope;
24 using v8::Integer;
25 using v8::Local;
26 using v8::Number;
27 using v8::Object;
28 using v8::String;
29 using v8::Value;
30 
31 template int StreamBase::WriteString<ASCII>(
32     const FunctionCallbackInfo<Value>& args);
33 template int StreamBase::WriteString<UTF8>(
34     const FunctionCallbackInfo<Value>& args);
35 template int StreamBase::WriteString<UCS2>(
36     const FunctionCallbackInfo<Value>& args);
37 template int StreamBase::WriteString<LATIN1>(
38     const FunctionCallbackInfo<Value>& args);
39 
40 
ReadStartJS(const FunctionCallbackInfo<Value> & args)41 int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
42   return ReadStart();
43 }
44 
45 
ReadStopJS(const FunctionCallbackInfo<Value> & args)46 int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
47   return ReadStop();
48 }
49 
50 
Shutdown(const FunctionCallbackInfo<Value> & args)51 int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
52   CHECK(args[0]->IsObject());
53   Local<Object> req_wrap_obj = args[0].As<Object>();
54 
55   return Shutdown(req_wrap_obj);
56 }
57 
SetWriteResultPropertiesOnWrapObject(Environment * env,Local<Object> req_wrap_obj,const StreamWriteResult & res)58 inline void SetWriteResultPropertiesOnWrapObject(
59     Environment* env,
60     Local<Object> req_wrap_obj,
61     const StreamWriteResult& res) {
62   req_wrap_obj->Set(
63       env->context(),
64       env->bytes_string(),
65       Number::New(env->isolate(), res.bytes)).FromJust();
66   req_wrap_obj->Set(
67       env->context(),
68       env->async(),
69       Boolean::New(env->isolate(), res.async)).FromJust();
70 }
71 
Writev(const FunctionCallbackInfo<Value> & args)72 int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
73   Environment* env = Environment::GetCurrent(args);
74 
75   CHECK(args[0]->IsObject());
76   CHECK(args[1]->IsArray());
77 
78   Local<Object> req_wrap_obj = args[0].As<Object>();
79   Local<Array> chunks = args[1].As<Array>();
80   bool all_buffers = args[2]->IsTrue();
81 
82   size_t count;
83   if (all_buffers)
84     count = chunks->Length();
85   else
86     count = chunks->Length() >> 1;
87 
88   MaybeStackBuffer<uv_buf_t, 16> bufs(count);
89 
90   size_t storage_size = 0;
91   size_t offset;
92 
93   if (!all_buffers) {
94     // Determine storage size first
95     for (size_t i = 0; i < count; i++) {
96       Local<Value> chunk = chunks->Get(i * 2);
97 
98       if (Buffer::HasInstance(chunk))
99         continue;
100         // Buffer chunk, no additional storage required
101 
102       // String chunk
103       Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
104       enum encoding encoding = ParseEncoding(env->isolate(),
105                                              chunks->Get(i * 2 + 1));
106       size_t chunk_size;
107       if (encoding == UTF8 && string->Length() > 65535 &&
108           !StringBytes::Size(env->isolate(), string, encoding).To(&chunk_size))
109         return 0;
110       else if (!StringBytes::StorageSize(env->isolate(), string, encoding)
111                     .To(&chunk_size))
112         return 0;
113       storage_size += chunk_size;
114     }
115 
116     if (storage_size > INT_MAX)
117       return UV_ENOBUFS;
118   } else {
119     for (size_t i = 0; i < count; i++) {
120       Local<Value> chunk = chunks->Get(i);
121       bufs[i].base = Buffer::Data(chunk);
122       bufs[i].len = Buffer::Length(chunk);
123     }
124   }
125 
126   MallocedBuffer<char> storage;
127   if (storage_size > 0)
128     storage = MallocedBuffer<char>(storage_size);
129 
130   offset = 0;
131   if (!all_buffers) {
132     for (size_t i = 0; i < count; i++) {
133       Local<Value> chunk = chunks->Get(i * 2);
134 
135       // Write buffer
136       if (Buffer::HasInstance(chunk)) {
137         bufs[i].base = Buffer::Data(chunk);
138         bufs[i].len = Buffer::Length(chunk);
139         continue;
140       }
141 
142       // Write string
143       CHECK_LE(offset, storage_size);
144       char* str_storage = storage.data + offset;
145       size_t str_size = storage_size - offset;
146 
147       Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
148       enum encoding encoding = ParseEncoding(env->isolate(),
149                                              chunks->Get(i * 2 + 1));
150       str_size = StringBytes::Write(env->isolate(),
151                                     str_storage,
152                                     str_size,
153                                     string,
154                                     encoding);
155       bufs[i].base = str_storage;
156       bufs[i].len = str_size;
157       offset += str_size;
158     }
159   }
160 
161   StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
162   SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res);
163   if (res.wrap != nullptr && storage_size > 0) {
164     res.wrap->SetAllocatedStorage(storage.release(), storage_size);
165   }
166   return res.err;
167 }
168 
169 
WriteBuffer(const FunctionCallbackInfo<Value> & args)170 int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
171   CHECK(args[0]->IsObject());
172 
173   Environment* env = Environment::GetCurrent(args);
174 
175   if (!args[1]->IsUint8Array()) {
176     node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer");
177     return 0;
178   }
179 
180   Local<Object> req_wrap_obj = args[0].As<Object>();
181 
182   uv_buf_t buf;
183   buf.base = Buffer::Data(args[1]);
184   buf.len = Buffer::Length(args[1]);
185 
186   StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj);
187 
188   if (res.async)
189     req_wrap_obj->Set(env->context(), env->buffer_string(), args[1]).FromJust();
190   SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res);
191 
192   return res.err;
193 }
194 
195 
196 template <enum encoding enc>
WriteString(const FunctionCallbackInfo<Value> & args)197 int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
198   Environment* env = Environment::GetCurrent(args);
199   CHECK(args[0]->IsObject());
200   CHECK(args[1]->IsString());
201 
202   Local<Object> req_wrap_obj = args[0].As<Object>();
203   Local<String> string = args[1].As<String>();
204   Local<Object> send_handle_obj;
205   if (args[2]->IsObject())
206     send_handle_obj = args[2].As<Object>();
207 
208   // Compute the size of the storage that the string will be flattened into.
209   // For UTF8 strings that are very long, go ahead and take the hit for
210   // computing their actual size, rather than tripling the storage.
211   size_t storage_size;
212   if (enc == UTF8 && string->Length() > 65535 &&
213       !StringBytes::Size(env->isolate(), string, enc).To(&storage_size))
214     return 0;
215   else if (!StringBytes::StorageSize(env->isolate(), string, enc)
216                 .To(&storage_size))
217     return 0;
218 
219   if (storage_size > INT_MAX)
220     return UV_ENOBUFS;
221 
222   // Try writing immediately if write size isn't too big
223   char stack_storage[16384];  // 16kb
224   size_t data_size;
225   size_t synchronously_written = 0;
226   uv_buf_t buf;
227 
228   bool try_write = storage_size <= sizeof(stack_storage) &&
229                    (!IsIPCPipe() || send_handle_obj.IsEmpty());
230   if (try_write) {
231     data_size = StringBytes::Write(env->isolate(),
232                                    stack_storage,
233                                    storage_size,
234                                    string,
235                                    enc);
236     buf = uv_buf_init(stack_storage, data_size);
237 
238     uv_buf_t* bufs = &buf;
239     size_t count = 1;
240     const int err = DoTryWrite(&bufs, &count);
241     // Keep track of the bytes written here, because we're taking a shortcut
242     // by using `DoTryWrite()` directly instead of using the utilities
243     // provided by `Write()`.
244     synchronously_written = count == 0 ? data_size : data_size - buf.len;
245     bytes_written_ += synchronously_written;
246 
247     // Immediate failure or success
248     if (err != 0 || count == 0) {
249       req_wrap_obj->Set(env->context(), env->async(), False(env->isolate()))
250           .FromJust();
251       req_wrap_obj->Set(env->context(),
252                         env->bytes_string(),
253                         Integer::NewFromUnsigned(env->isolate(), data_size))
254           .FromJust();
255       return err;
256     }
257 
258     // Partial write
259     CHECK_EQ(count, 1);
260   }
261 
262   MallocedBuffer<char> data;
263 
264   if (try_write) {
265     // Copy partial data
266     data = MallocedBuffer<char>(buf.len);
267     memcpy(data.data, buf.base, buf.len);
268     data_size = buf.len;
269   } else {
270     // Write it
271     data = MallocedBuffer<char>(storage_size);
272     data_size = StringBytes::Write(env->isolate(),
273                                    data.data,
274                                    storage_size,
275                                    string,
276                                    enc);
277   }
278 
279   CHECK_LE(data_size, storage_size);
280 
281   buf = uv_buf_init(data.data, data_size);
282 
283   uv_stream_t* send_handle = nullptr;
284 
285   if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
286     HandleWrap* wrap;
287     ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
288     send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
289     // Reference LibuvStreamWrap instance to prevent it from being garbage
290     // collected before `AfterWrite` is called.
291     req_wrap_obj->Set(env->handle_string(), send_handle_obj);
292   }
293 
294   StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
295   res.bytes += synchronously_written;
296 
297   SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res);
298   if (res.wrap != nullptr) {
299     res.wrap->SetAllocatedStorage(data.release(), data_size);
300   }
301 
302   return res.err;
303 }
304 
305 
CallJSOnreadMethod(ssize_t nread,Local<Object> buf)306 void StreamBase::CallJSOnreadMethod(ssize_t nread, Local<Object> buf) {
307   Environment* env = env_;
308 
309   Local<Value> argv[] = {
310     Integer::New(env->isolate(), nread),
311     buf
312   };
313 
314   if (argv[1].IsEmpty())
315     argv[1] = Undefined(env->isolate());
316 
317   AsyncWrap* wrap = GetAsyncWrap();
318   CHECK_NOT_NULL(wrap);
319   wrap->MakeCallback(env->onread_string(), arraysize(argv), argv);
320 }
321 
322 
IsIPCPipe()323 bool StreamBase::IsIPCPipe() {
324   return false;
325 }
326 
327 
GetFD()328 int StreamBase::GetFD() {
329   return -1;
330 }
331 
332 
GetObject()333 Local<Object> StreamBase::GetObject() {
334   return GetAsyncWrap()->object();
335 }
336 
337 
DoTryWrite(uv_buf_t ** bufs,size_t * count)338 int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
339   // No TryWrite by default
340   return 0;
341 }
342 
343 
Error() const344 const char* StreamResource::Error() const {
345   return nullptr;
346 }
347 
348 
ClearError()349 void StreamResource::ClearError() {
350   // No-op
351 }
352 
353 
OnStreamAlloc(size_t suggested_size)354 uv_buf_t StreamListener::OnStreamAlloc(size_t suggested_size) {
355   return uv_buf_init(Malloc(suggested_size), suggested_size);
356 }
357 
358 
OnStreamRead(ssize_t nread,const uv_buf_t & buf)359 void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
360   CHECK_NOT_NULL(stream_);
361   StreamBase* stream = static_cast<StreamBase*>(stream_);
362   Environment* env = stream->stream_env();
363   HandleScope handle_scope(env->isolate());
364   Context::Scope context_scope(env->context());
365 
366   if (nread <= 0)  {
367     free(buf.base);
368     if (nread < 0)
369       stream->CallJSOnreadMethod(nread, Local<Object>());
370     return;
371   }
372 
373   CHECK_LE(static_cast<size_t>(nread), buf.len);
374   char* base = Realloc(buf.base, nread);
375 
376   Local<Object> obj = Buffer::New(env, base, nread).ToLocalChecked();
377   stream->CallJSOnreadMethod(nread, obj);
378 }
379 
380 
OnStreamAfterReqFinished(StreamReq * req_wrap,int status)381 void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
382     StreamReq* req_wrap, int status) {
383   StreamBase* stream = static_cast<StreamBase*>(stream_);
384   Environment* env = stream->stream_env();
385   AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
386   HandleScope handle_scope(env->isolate());
387   Context::Scope context_scope(env->context());
388   CHECK(!async_wrap->persistent().IsEmpty());
389   Local<Object> req_wrap_obj = async_wrap->object();
390 
391   Local<Value> argv[] = {
392     Integer::New(env->isolate(), status),
393     stream->GetObject(),
394     Undefined(env->isolate())
395   };
396 
397   const char* msg = stream->Error();
398   if (msg != nullptr) {
399     argv[2] = OneByteString(env->isolate(), msg);
400     stream->ClearError();
401   }
402 
403   if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
404     async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
405 }
406 
OnStreamAfterWrite(WriteWrap * req_wrap,int status)407 void ReportWritesToJSStreamListener::OnStreamAfterWrite(
408     WriteWrap* req_wrap, int status) {
409   OnStreamAfterReqFinished(req_wrap, status);
410 }
411 
OnStreamAfterShutdown(ShutdownWrap * req_wrap,int status)412 void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
413     ShutdownWrap* req_wrap, int status) {
414   OnStreamAfterReqFinished(req_wrap, status);
415 }
416 
417 
418 }  // namespace node
419