1 /*  $Id: nc_automation.cpp 572261 2018-10-10 19:24:55Z sadyrovr $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *   Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors:  Dmitry Kazimirov
27  *
28  * File Description: NetCache automation implementation.
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "nc_automation.hpp"
35 
36 USING_NCBI_SCOPE;
37 
38 using namespace NAutomation;
39 
40 const string SNetCacheBlob::kName = "ncblob";
41 const string SNetCacheService::kName = "ncsvc";
42 const string SNetCacheServer::kName = "ncsrv";
43 
SNetCacheBlob(SNetCacheService * nc_object,const string & blob_key)44 SNetCacheBlob::SNetCacheBlob(
45         SNetCacheService* nc_object,
46         const string& blob_key) :
47     CAutomationObject(nc_object->m_AutomationProc),
48     m_NetCacheObject(nc_object),
49     m_BlobKey(blob_key)
50 {
51 }
52 
CallCommand()53 CCommand SNetCacheBlob::CallCommand()
54 {
55     return CCommand(kName, TCommandGroup(CallCommands(), CheckCall<TSelf>));
56 }
57 
CallCommands()58 TCommands SNetCacheBlob::CallCommands()
59 {
60     TCommands cmds =
61     {
62         { "write",   ExecMethod<TSelf, &TSelf::ExecWrite>, {
63                 { "value", CJsonNode::eString, },
64             }},
65         { "read",    ExecMethod<TSelf, &TSelf::ExecRead>, {
66                 { "buf_size", -1, },
67             }},
68         { "close",   ExecMethod<TSelf, &TSelf::ExecClose>, },
69         { "get_key", ExecMethod<TSelf, &TSelf::ExecGetKey>, },
70     };
71 
72     return cmds;
73 }
74 
ExecWrite(const TArguments & args,SInputOutput & io)75 void SNetCacheBlob::ExecWrite(const TArguments& args, SInputOutput& io)
76 {
77     _ASSERT(args.size() == 1);
78 
79     auto& reply = io.reply;
80     const auto value = args["value"].AsString();
81 
82     if (m_Reader.get() != NULL) {
83         NCBI_THROW(CAutomationException, eCommandProcessingError,
84                 "Cannot write blob while it's being read");
85     } else if (m_Writer.get() == NULL) {
86         // SetWriter() puts blob ID into m_BlobKey, so we have to check it before that
87         const bool return_blob_key = m_BlobKey.empty();
88         SetWriter();
89         if (return_blob_key) reply.AppendString(m_BlobKey);
90     }
91     m_Writer->Write(value.data(), value.length());
92 }
93 
ExecRead(const TArguments & args,SInputOutput & io)94 void SNetCacheBlob::ExecRead(const TArguments& args, SInputOutput& io)
95 {
96     _ASSERT(args.size() == 1);
97 
98     auto& reply = io.reply;
99     auto buf_size = args["buf_size"].AsInteger<size_t>();
100 
101     if (m_Writer.get() != NULL) {
102         NCBI_THROW(CAutomationException, eCommandProcessingError,
103                 "Cannot read blob while it's being written");
104     } else if (m_Reader.get() == NULL)
105         SetReader();
106     if (buf_size > m_BlobSize)
107         buf_size = m_BlobSize;
108     string buffer(buf_size, 0);
109     char* buf_ptr = &buffer[0];
110     size_t bytes_read;
111     size_t total_bytes_read = 0;
112 
113     while (buf_size > 0) {
114         ERW_Result rw_res = m_Reader->Read(buf_ptr, buf_size, &bytes_read);
115         if (rw_res == eRW_Success) {
116             total_bytes_read += bytes_read;
117             buf_ptr += bytes_read;
118             buf_size -= bytes_read;
119         } else if (rw_res == eRW_Eof) {
120             break;
121         } else {
122             NCBI_THROW_FMT(CAutomationException, eCommandProcessingError,
123                     "I/O error while reading " << m_BlobKey);
124         }
125     }
126     buffer.resize(total_bytes_read);
127     m_BlobSize -= total_bytes_read;
128 
129     reply.AppendString(buffer);
130 }
131 
ExecClose(const TArguments &,SInputOutput &)132 void SNetCacheBlob::ExecClose(const TArguments&, SInputOutput&)
133 {
134     if (m_Reader.get() != NULL)
135         m_Reader.reset();
136     else if (m_Writer.get() != NULL) {
137         m_Writer->Close();
138         m_Writer.reset();
139     }
140 }
141 
SetWriter()142 void SNetCacheBlob::SetWriter()
143 {
144     m_Writer.reset(m_NetCacheObject->GetWriter(m_BlobKey));
145 }
146 
SetReader()147 void SNetCacheBlob::SetReader()
148 {
149     m_Reader.reset(m_NetCacheObject->GetReader(m_BlobKey, m_BlobSize));
150 }
151 
ExecGetKey(const TArguments &,SInputOutput & io)152 void SNetCacheBlob::ExecGetKey(const TArguments&, SInputOutput& io)
153 {
154     auto& reply = io.reply;
155     reply.AppendString(m_BlobKey);
156 }
157 
SNetICacheBlob(SNetCacheService * nc_object,const string & blob_key,int blob_version,const string & blob_subkey)158 SNetICacheBlob::SNetICacheBlob(SNetCacheService* nc_object,
159         const string& blob_key, int blob_version, const string& blob_subkey) :
160     SNetCacheBlob(nc_object, blob_key),
161     m_BlobVersion(blob_version),
162     m_BlobSubKey(blob_subkey)
163 {
164 }
165 
SetWriter()166 void SNetICacheBlob::SetWriter()
167 {
168     m_Writer.reset(m_NetCacheObject->GetWriter(m_BlobKey, m_BlobVersion, m_BlobSubKey));
169 }
170 
SetReader()171 void SNetICacheBlob::SetReader()
172 {
173     m_Reader.reset(m_NetCacheObject->GetReader(m_BlobKey, m_BlobVersion, m_BlobSubKey, m_BlobSize));
174 }
175 
ExecGetKey(const TArguments &,SInputOutput & io)176 void SNetICacheBlob::ExecGetKey(const TArguments&, SInputOutput& io)
177 {
178     auto& reply = io.reply;
179     reply.AppendString(m_BlobKey);
180     reply.AppendInteger(m_BlobVersion);
181     reply.AppendString(m_BlobSubKey);
182 }
183 
SNetCacheService(CAutomationProc * automation_proc,CNetICacheClientExt ic_api)184 SNetCacheService::SNetCacheService(CAutomationProc* automation_proc,
185         CNetICacheClientExt ic_api) :
186     SNetService(automation_proc),
187     m_NetICacheClient(ic_api),
188     m_NetCacheAPI(ic_api.GetNetCacheAPI())
189 {
190     auto warning_handler = [&](const string& m, CNetServer s) {
191         auto o = m_AutomationProc->ReturnNetCacheServerObject(m_NetICacheClient, s);
192         m_AutomationProc->SendWarning(m, o);
193         return true;
194     };
195 
196     GetService().SetWarningHandler(warning_handler);
197 }
198 
SNetCacheServer(CAutomationProc * automation_proc,CNetICacheClientExt ic_api,CNetServer::TInstance server)199 SNetCacheServer::SNetCacheServer(CAutomationProc* automation_proc,
200         CNetICacheClientExt ic_api, CNetServer::TInstance server) :
201     SNetCacheService(automation_proc, ic_api.GetServer(server))
202 {
203     if (GetService().IsLoadBalanced()) {
204         NCBI_THROW(CAutomationException, eCommandProcessingError,
205                 "NetCacheServer constructor: "
206                 "'server_address' must be a host:port combination");
207     }
208 }
209 
NewCommand()210 CCommand SNetCacheService::NewCommand()
211 {
212     return CCommand(kName, ExecNew<TSelf>, {
213             { "service_name", "", },
214             { "client_name", "", },
215             { "cache_name", "", },
216         });
217 }
218 
Create(const TArguments & args,CAutomationProc * automation_proc)219 CAutomationObject* SNetCacheService::Create(const TArguments& args, CAutomationProc* automation_proc)
220 {
221     _ASSERT(args.size() == 3);
222 
223     const auto service_name = args["service_name"].AsString();
224     const auto client_name  = args["client_name"].AsString();
225     const auto cache_name   = args["cache_name"].AsString();
226 
227     CNetICacheClientExt ic_api(CNetICacheClient(service_name, cache_name, client_name));
228     return new SNetCacheService(automation_proc, ic_api);
229 }
230 
NewCommand()231 CCommand SNetCacheServer::NewCommand()
232 {
233     return CCommand(kName, ExecNew<TSelf>, {
234             { "service_name", "", },
235             { "client_name", "", },
236             { "cache_name", "", },
237         });
238 }
239 
Create(const TArguments & args,CAutomationProc * automation_proc)240 CAutomationObject* SNetCacheServer::Create(const TArguments& args, CAutomationProc* automation_proc)
241 {
242     _ASSERT(args.size() == 3);
243 
244     const auto service_name = args["service_name"].AsString();
245     const auto client_name  = args["client_name"].AsString();
246     const auto cache_name   = args["cache_name"].AsString();
247 
248     CNetICacheClientExt ic_api(CNetICacheClient(service_name, cache_name, client_name));
249     CNetServer server = ic_api.GetService().Iterate().GetServer();
250     return new SNetCacheServer(automation_proc, ic_api, server);
251 }
252 
ReturnNetCacheServerObject(CNetICacheClient::TInstance ic_api,CNetServer::TInstance server)253 TAutomationObjectRef CAutomationProc::ReturnNetCacheServerObject(
254         CNetICacheClient::TInstance ic_api,
255         CNetServer::TInstance server)
256 {
257     TAutomationObjectRef object(new SNetCacheServer(this, ic_api, server));
258     AddObject(object);
259     return object;
260 }
261 
CallCommand()262 CCommand SNetCacheService::CallCommand()
263 {
264     return CCommand(kName, TCommandGroup(CallCommands(), CheckCall<TSelf>));
265 }
266 
CallCommand()267 CCommand SNetCacheServer::CallCommand()
268 {
269     return CCommand(kName, TCommandGroup(CallCommands(), CheckCall<TSelf>));
270 }
271 
CallCommands()272 TCommands SNetCacheService::CallCommands()
273 {
274     TCommands cmds =
275     {
276         { "get_blob",    ExecMethod<TSelf, &TSelf::ExecGetBlob>, {
277                 { "blob_key", "", },
278                 { "blob_version", 0, },
279                 { "blob_subkey", "", },
280             }},
281         { "get_servers", ExecMethod<TSelf, &TSelf::ExecGetServers>, },
282     };
283 
284     TCommands base_cmds = SNetService::CallCommands();
285     cmds.insert(cmds.end(), base_cmds.begin(), base_cmds.end());
286 
287     return cmds;
288 }
289 
ExecGetBlob(const TArguments & args,SInputOutput & io)290 void SNetCacheService::ExecGetBlob(const TArguments& args, SInputOutput& io)
291 {
292     _ASSERT(args.size() == 3);
293 
294     auto& reply = io.reply;
295     const auto blob_key     = args["blob_key"].AsString();
296     const auto blob_version = args["blob_version"].AsInteger<int>();
297     const auto blob_subkey  = args["blob_subkey"].AsString();
298 
299     TAutomationObjectRef blob;
300 
301     if (blob_key.empty() || CNetCacheKey::IsValidKey(blob_key)) {
302         blob.Reset(new SNetCacheBlob(this, blob_key));
303     } else {
304         blob.Reset(new SNetICacheBlob(this, blob_key, blob_version, blob_subkey));
305     }
306 
307     reply.AppendInteger(m_AutomationProc->AddObject(blob));
308 }
309 
ExecGetServers(const TArguments &,SInputOutput & io)310 void SNetCacheService::ExecGetServers(const TArguments&, SInputOutput& io)
311 {
312     auto& reply = io.reply;
313 
314     CJsonNode object_ids(CJsonNode::NewArrayNode());
315     for (CNetServiceIterator it = m_NetICacheClient.GetService().Iterate(
316             CNetService::eIncludePenalized); it; ++it)
317         object_ids.AppendInteger(m_AutomationProc->
318                 ReturnNetCacheServerObject(m_NetICacheClient, *it)->GetID());
319     reply.Append(object_ids);
320 }
321 
GetReader(const string & blob_key,size_t & blob_size)322 IReader* SNetCacheService::GetReader(const string& blob_key, size_t& blob_size)
323 {
324     return m_NetCacheAPI.GetReader(blob_key, &blob_size);
325 }
326 
GetReader(const string & blob_key,int blob_version,const string & blob_subkey,size_t & blob_size)327 IReader* SNetCacheService::GetReader(const string& blob_key, int blob_version, const string& blob_subkey, size_t& blob_size)
328 {
329     auto rv = m_NetICacheClient.GetReadStream(blob_key, blob_version, blob_subkey, &blob_size);
330     if (!rv) NCBI_THROW(CNetCacheException, eBlobNotFound, "BLOB not found");
331     return rv;
332 }
333 
GetWriter(string & blob_key)334 IEmbeddedStreamWriter* SNetCacheService::GetWriter(string& blob_key)
335 {
336     return m_NetCacheAPI.PutData(&blob_key);
337 }
338 
GetWriter(const string & blob_key,int blob_version,const string & blob_subkey)339 IEmbeddedStreamWriter* SNetCacheService::GetWriter(const string& blob_key, int blob_version, const string& blob_subkey)
340 {
341     return m_NetICacheClient.GetNetCacheWriter(blob_key, blob_version, blob_subkey);
342 }
343