1 /*  $Id: test_grid_worker.cpp 595895 2019-10-29 17:07:52Z sadyrovr $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *   Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors:  Dmitry Kazimirov
27  *
28  * File Description:
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include <util/distribution.hpp>
35 #include <util/random_gen.hpp>
36 
37 #include <connect/services/grid_worker_app.hpp>
38 
39 #include <corelib/ncbi_system.hpp>
40 
41 USING_NCBI_SCOPE;
42 
43 NCBI_PARAM_DECL(string, test, sleep_time_distr);
44 typedef NCBI_PARAM_TYPE(test, sleep_time_distr) TParam_SleepTimeDistr;
45 NCBI_PARAM_DEF_EX(string, test, sleep_time_distr, "0",
46     eParam_NoThread, GRID_WORKER_TEST_SLEEP_TIME_DISTR);
47 
48 NCBI_PARAM_DECL(double, test, failure_rate);
49 typedef NCBI_PARAM_TYPE(test, failure_rate) TParam_FailureRate;
50 NCBI_PARAM_DEF_EX(double, test, failure_rate, 0.0,
51     eParam_NoThread, GRID_WORKER_TEST_FAILURE_RATE);
52 
53 
54 /// NetSchedule sample job
55 ///
56 /// Job reads an array of doubles, sorts it and returns data back
57 /// to the client as a BLOB.
58 ///
59 class CGridWorkerTest : public IWorkerNodeJob
60 {
61 public:
CGridWorkerTest(const IWorkerNodeInitContext &)62     CGridWorkerTest(const IWorkerNodeInitContext&)
63     {
64         GetDiagContext().SetOldPostFormat(false);
65 
66         m_SleepTimeDistr.InitFromParameter("sleep_time_distr",
67             TParam_SleepTimeDistr::GetDefault().c_str(), &m_Random);
68     }
69 
~CGridWorkerTest()70     virtual ~CGridWorkerTest() {}
71 
Do(CWorkerNodeJobContext & context)72     int Do(CWorkerNodeJobContext& context)
73     {
74         CNcbiIstream& is = context.GetIStream();
75         string input_type;
76         is >> input_type;
77         if (input_type != "doubles") {
78             context.CommitJobWithFailure(
79                 "This worker node can only process the 'doubles' input type.");
80             return 1;
81         }
82         int vsize;
83         is >> vsize;
84         vector<double> v(vsize);
85         for (int i = 0; i < vsize; ++i)
86             is >> v[i];
87 
88         unsigned delay = m_SleepTimeDistr.GetNextValue();
89 
90         if (delay > 0)
91             SleepMilliSec(delay);
92 
93         if (m_Random.GetRand() <
94                 TParam_FailureRate::GetDefault() * m_Random.GetMax())
95             context.CommitJobWithFailure("FAILED");
96         else {
97             sort(v.begin(), v.end());
98 
99             CNcbiOstream& os = context.GetOStream();
100             os << vsize << ' ';
101             for (int i = 0; i < vsize; ++i)
102                 os << v[i] << ' ';
103             context.CommitJob();
104         }
105 
106         return 0;
107     }
108 
109 private:
110     CRandom m_Random;
111     CDiscreteDistributionImpl m_SleepTimeDistr;
112 };
113 
114 class CGridWorkerIdleTask : public IWorkerNodeIdleTask
115 {
116 public:
CGridWorkerIdleTask(const IWorkerNodeInitContext &)117     CGridWorkerIdleTask(const IWorkerNodeInitContext&) : m_Count(0)
118     {
119     }
120 
~CGridWorkerIdleTask()121     virtual ~CGridWorkerIdleTask() {}
122 
Run(CWorkerNodeIdleTaskContext & context)123     virtual void Run(CWorkerNodeIdleTaskContext& context)
124     {
125         LOG_POST("Staring idle task...");
126 
127         for (int i = 0; i < 3 && !context.IsShutdownRequested(); ++i) {
128             LOG_POST("Idle task: iteration: " << i);
129 
130             SleepSec(2);
131         }
132 
133         if (++m_Count % 3 == 0)
134             context.SetRunAgain();
135 
136         LOG_POST("Stopping idle task...");
137     }
138 
139 private:
140     int m_Count;
141 };
142 
143 // Use this macro to implement the main function for the CGridWorkerTest
144 // with idle task CGridWorkerIdleTask.
145 NCBI_WORKERNODE_MAIN_EX(CGridWorkerTest, CGridWorkerIdleTask, 1.0.0)
146