1 /* $Id: nc_automation.cpp 572261 2018-10-10 19:24:55Z sadyrovr $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Dmitry Kazimirov
27 *
28 * File Description: NetCache automation implementation.
29 *
30 */
31
32 #include <ncbi_pch.hpp>
33
34 #include "nc_automation.hpp"
35
36 USING_NCBI_SCOPE;
37
38 using namespace NAutomation;
39
40 const string SNetCacheBlob::kName = "ncblob";
41 const string SNetCacheService::kName = "ncsvc";
42 const string SNetCacheServer::kName = "ncsrv";
43
SNetCacheBlob(SNetCacheService * nc_object,const string & blob_key)44 SNetCacheBlob::SNetCacheBlob(
45 SNetCacheService* nc_object,
46 const string& blob_key) :
47 CAutomationObject(nc_object->m_AutomationProc),
48 m_NetCacheObject(nc_object),
49 m_BlobKey(blob_key)
50 {
51 }
52
CallCommand()53 CCommand SNetCacheBlob::CallCommand()
54 {
55 return CCommand(kName, TCommandGroup(CallCommands(), CheckCall<TSelf>));
56 }
57
CallCommands()58 TCommands SNetCacheBlob::CallCommands()
59 {
60 TCommands cmds =
61 {
62 { "write", ExecMethod<TSelf, &TSelf::ExecWrite>, {
63 { "value", CJsonNode::eString, },
64 }},
65 { "read", ExecMethod<TSelf, &TSelf::ExecRead>, {
66 { "buf_size", -1, },
67 }},
68 { "close", ExecMethod<TSelf, &TSelf::ExecClose>, },
69 { "get_key", ExecMethod<TSelf, &TSelf::ExecGetKey>, },
70 };
71
72 return cmds;
73 }
74
ExecWrite(const TArguments & args,SInputOutput & io)75 void SNetCacheBlob::ExecWrite(const TArguments& args, SInputOutput& io)
76 {
77 _ASSERT(args.size() == 1);
78
79 auto& reply = io.reply;
80 const auto value = args["value"].AsString();
81
82 if (m_Reader.get() != NULL) {
83 NCBI_THROW(CAutomationException, eCommandProcessingError,
84 "Cannot write blob while it's being read");
85 } else if (m_Writer.get() == NULL) {
86 // SetWriter() puts blob ID into m_BlobKey, so we have to check it before that
87 const bool return_blob_key = m_BlobKey.empty();
88 SetWriter();
89 if (return_blob_key) reply.AppendString(m_BlobKey);
90 }
91 m_Writer->Write(value.data(), value.length());
92 }
93
ExecRead(const TArguments & args,SInputOutput & io)94 void SNetCacheBlob::ExecRead(const TArguments& args, SInputOutput& io)
95 {
96 _ASSERT(args.size() == 1);
97
98 auto& reply = io.reply;
99 auto buf_size = args["buf_size"].AsInteger<size_t>();
100
101 if (m_Writer.get() != NULL) {
102 NCBI_THROW(CAutomationException, eCommandProcessingError,
103 "Cannot read blob while it's being written");
104 } else if (m_Reader.get() == NULL)
105 SetReader();
106 if (buf_size > m_BlobSize)
107 buf_size = m_BlobSize;
108 string buffer(buf_size, 0);
109 char* buf_ptr = &buffer[0];
110 size_t bytes_read;
111 size_t total_bytes_read = 0;
112
113 while (buf_size > 0) {
114 ERW_Result rw_res = m_Reader->Read(buf_ptr, buf_size, &bytes_read);
115 if (rw_res == eRW_Success) {
116 total_bytes_read += bytes_read;
117 buf_ptr += bytes_read;
118 buf_size -= bytes_read;
119 } else if (rw_res == eRW_Eof) {
120 break;
121 } else {
122 NCBI_THROW_FMT(CAutomationException, eCommandProcessingError,
123 "I/O error while reading " << m_BlobKey);
124 }
125 }
126 buffer.resize(total_bytes_read);
127 m_BlobSize -= total_bytes_read;
128
129 reply.AppendString(buffer);
130 }
131
ExecClose(const TArguments &,SInputOutput &)132 void SNetCacheBlob::ExecClose(const TArguments&, SInputOutput&)
133 {
134 if (m_Reader.get() != NULL)
135 m_Reader.reset();
136 else if (m_Writer.get() != NULL) {
137 m_Writer->Close();
138 m_Writer.reset();
139 }
140 }
141
SetWriter()142 void SNetCacheBlob::SetWriter()
143 {
144 m_Writer.reset(m_NetCacheObject->GetWriter(m_BlobKey));
145 }
146
SetReader()147 void SNetCacheBlob::SetReader()
148 {
149 m_Reader.reset(m_NetCacheObject->GetReader(m_BlobKey, m_BlobSize));
150 }
151
ExecGetKey(const TArguments &,SInputOutput & io)152 void SNetCacheBlob::ExecGetKey(const TArguments&, SInputOutput& io)
153 {
154 auto& reply = io.reply;
155 reply.AppendString(m_BlobKey);
156 }
157
SNetICacheBlob(SNetCacheService * nc_object,const string & blob_key,int blob_version,const string & blob_subkey)158 SNetICacheBlob::SNetICacheBlob(SNetCacheService* nc_object,
159 const string& blob_key, int blob_version, const string& blob_subkey) :
160 SNetCacheBlob(nc_object, blob_key),
161 m_BlobVersion(blob_version),
162 m_BlobSubKey(blob_subkey)
163 {
164 }
165
SetWriter()166 void SNetICacheBlob::SetWriter()
167 {
168 m_Writer.reset(m_NetCacheObject->GetWriter(m_BlobKey, m_BlobVersion, m_BlobSubKey));
169 }
170
SetReader()171 void SNetICacheBlob::SetReader()
172 {
173 m_Reader.reset(m_NetCacheObject->GetReader(m_BlobKey, m_BlobVersion, m_BlobSubKey, m_BlobSize));
174 }
175
ExecGetKey(const TArguments &,SInputOutput & io)176 void SNetICacheBlob::ExecGetKey(const TArguments&, SInputOutput& io)
177 {
178 auto& reply = io.reply;
179 reply.AppendString(m_BlobKey);
180 reply.AppendInteger(m_BlobVersion);
181 reply.AppendString(m_BlobSubKey);
182 }
183
SNetCacheService(CAutomationProc * automation_proc,CNetICacheClientExt ic_api)184 SNetCacheService::SNetCacheService(CAutomationProc* automation_proc,
185 CNetICacheClientExt ic_api) :
186 SNetService(automation_proc),
187 m_NetICacheClient(ic_api),
188 m_NetCacheAPI(ic_api.GetNetCacheAPI())
189 {
190 auto warning_handler = [&](const string& m, CNetServer s) {
191 auto o = m_AutomationProc->ReturnNetCacheServerObject(m_NetICacheClient, s);
192 m_AutomationProc->SendWarning(m, o);
193 return true;
194 };
195
196 GetService().SetWarningHandler(warning_handler);
197 }
198
SNetCacheServer(CAutomationProc * automation_proc,CNetICacheClientExt ic_api,CNetServer::TInstance server)199 SNetCacheServer::SNetCacheServer(CAutomationProc* automation_proc,
200 CNetICacheClientExt ic_api, CNetServer::TInstance server) :
201 SNetCacheService(automation_proc, ic_api.GetServer(server))
202 {
203 if (GetService().IsLoadBalanced()) {
204 NCBI_THROW(CAutomationException, eCommandProcessingError,
205 "NetCacheServer constructor: "
206 "'server_address' must be a host:port combination");
207 }
208 }
209
NewCommand()210 CCommand SNetCacheService::NewCommand()
211 {
212 return CCommand(kName, ExecNew<TSelf>, {
213 { "service_name", "", },
214 { "client_name", "", },
215 { "cache_name", "", },
216 });
217 }
218
Create(const TArguments & args,CAutomationProc * automation_proc)219 CAutomationObject* SNetCacheService::Create(const TArguments& args, CAutomationProc* automation_proc)
220 {
221 _ASSERT(args.size() == 3);
222
223 const auto service_name = args["service_name"].AsString();
224 const auto client_name = args["client_name"].AsString();
225 const auto cache_name = args["cache_name"].AsString();
226
227 CNetICacheClientExt ic_api(CNetICacheClient(service_name, cache_name, client_name));
228 return new SNetCacheService(automation_proc, ic_api);
229 }
230
NewCommand()231 CCommand SNetCacheServer::NewCommand()
232 {
233 return CCommand(kName, ExecNew<TSelf>, {
234 { "service_name", "", },
235 { "client_name", "", },
236 { "cache_name", "", },
237 });
238 }
239
Create(const TArguments & args,CAutomationProc * automation_proc)240 CAutomationObject* SNetCacheServer::Create(const TArguments& args, CAutomationProc* automation_proc)
241 {
242 _ASSERT(args.size() == 3);
243
244 const auto service_name = args["service_name"].AsString();
245 const auto client_name = args["client_name"].AsString();
246 const auto cache_name = args["cache_name"].AsString();
247
248 CNetICacheClientExt ic_api(CNetICacheClient(service_name, cache_name, client_name));
249 CNetServer server = ic_api.GetService().Iterate().GetServer();
250 return new SNetCacheServer(automation_proc, ic_api, server);
251 }
252
ReturnNetCacheServerObject(CNetICacheClient::TInstance ic_api,CNetServer::TInstance server)253 TAutomationObjectRef CAutomationProc::ReturnNetCacheServerObject(
254 CNetICacheClient::TInstance ic_api,
255 CNetServer::TInstance server)
256 {
257 TAutomationObjectRef object(new SNetCacheServer(this, ic_api, server));
258 AddObject(object);
259 return object;
260 }
261
CallCommand()262 CCommand SNetCacheService::CallCommand()
263 {
264 return CCommand(kName, TCommandGroup(CallCommands(), CheckCall<TSelf>));
265 }
266
CallCommand()267 CCommand SNetCacheServer::CallCommand()
268 {
269 return CCommand(kName, TCommandGroup(CallCommands(), CheckCall<TSelf>));
270 }
271
CallCommands()272 TCommands SNetCacheService::CallCommands()
273 {
274 TCommands cmds =
275 {
276 { "get_blob", ExecMethod<TSelf, &TSelf::ExecGetBlob>, {
277 { "blob_key", "", },
278 { "blob_version", 0, },
279 { "blob_subkey", "", },
280 }},
281 { "get_servers", ExecMethod<TSelf, &TSelf::ExecGetServers>, },
282 };
283
284 TCommands base_cmds = SNetService::CallCommands();
285 cmds.insert(cmds.end(), base_cmds.begin(), base_cmds.end());
286
287 return cmds;
288 }
289
ExecGetBlob(const TArguments & args,SInputOutput & io)290 void SNetCacheService::ExecGetBlob(const TArguments& args, SInputOutput& io)
291 {
292 _ASSERT(args.size() == 3);
293
294 auto& reply = io.reply;
295 const auto blob_key = args["blob_key"].AsString();
296 const auto blob_version = args["blob_version"].AsInteger<int>();
297 const auto blob_subkey = args["blob_subkey"].AsString();
298
299 TAutomationObjectRef blob;
300
301 if (blob_key.empty() || CNetCacheKey::IsValidKey(blob_key)) {
302 blob.Reset(new SNetCacheBlob(this, blob_key));
303 } else {
304 blob.Reset(new SNetICacheBlob(this, blob_key, blob_version, blob_subkey));
305 }
306
307 reply.AppendInteger(m_AutomationProc->AddObject(blob));
308 }
309
ExecGetServers(const TArguments &,SInputOutput & io)310 void SNetCacheService::ExecGetServers(const TArguments&, SInputOutput& io)
311 {
312 auto& reply = io.reply;
313
314 CJsonNode object_ids(CJsonNode::NewArrayNode());
315 for (CNetServiceIterator it = m_NetICacheClient.GetService().Iterate(
316 CNetService::eIncludePenalized); it; ++it)
317 object_ids.AppendInteger(m_AutomationProc->
318 ReturnNetCacheServerObject(m_NetICacheClient, *it)->GetID());
319 reply.Append(object_ids);
320 }
321
GetReader(const string & blob_key,size_t & blob_size)322 IReader* SNetCacheService::GetReader(const string& blob_key, size_t& blob_size)
323 {
324 return m_NetCacheAPI.GetReader(blob_key, &blob_size);
325 }
326
GetReader(const string & blob_key,int blob_version,const string & blob_subkey,size_t & blob_size)327 IReader* SNetCacheService::GetReader(const string& blob_key, int blob_version, const string& blob_subkey, size_t& blob_size)
328 {
329 auto rv = m_NetICacheClient.GetReadStream(blob_key, blob_version, blob_subkey, &blob_size);
330 if (!rv) NCBI_THROW(CNetCacheException, eBlobNotFound, "BLOB not found");
331 return rv;
332 }
333
GetWriter(string & blob_key)334 IEmbeddedStreamWriter* SNetCacheService::GetWriter(string& blob_key)
335 {
336 return m_NetCacheAPI.PutData(&blob_key);
337 }
338
GetWriter(const string & blob_key,int blob_version,const string & blob_subkey)339 IEmbeddedStreamWriter* SNetCacheService::GetWriter(const string& blob_key, int blob_version, const string& blob_subkey)
340 {
341 return m_NetICacheClient.GetNetCacheWriter(blob_key, blob_version, blob_subkey);
342 }
343