1 /* $Id: test_grid_worker.cpp 595895 2019-10-29 17:07:52Z sadyrovr $ 2 * =========================================================================== 3 * 4 * PUBLIC DOMAIN NOTICE 5 * National Center for Biotechnology Information 6 * 7 * This software/database is a "United States Government Work" under the 8 * terms of the United States Copyright Act. It was written as part of 9 * the author's official duties as a United States Government employee and 10 * thus cannot be copyrighted. This software/database is freely available 11 * to the public for use. The National Library of Medicine and the U.S. 12 * Government have not placed any restriction on its use or reproduction. 13 * 14 * Although all reasonable efforts have been taken to ensure the accuracy 15 * and reliability of the software and data, the NLM and the U.S. 16 * Government do not and cannot warrant the performance or results that 17 * may be obtained by using this software or data. The NLM and the U.S. 18 * Government disclaim all warranties, express or implied, including 19 * warranties of performance, merchantability or fitness for any particular 20 * purpose. 21 * 22 * Please cite the author in any work or product based on this material. 23 * 24 * =========================================================================== 25 * 26 * Authors: Dmitry Kazimirov 27 * 28 * File Description: 29 * 30 */ 31 32 #include <ncbi_pch.hpp> 33 34 #include <util/distribution.hpp> 35 #include <util/random_gen.hpp> 36 37 #include <connect/services/grid_worker_app.hpp> 38 39 #include <corelib/ncbi_system.hpp> 40 41 USING_NCBI_SCOPE; 42 43 NCBI_PARAM_DECL(string, test, sleep_time_distr); 44 typedef NCBI_PARAM_TYPE(test, sleep_time_distr) TParam_SleepTimeDistr; 45 NCBI_PARAM_DEF_EX(string, test, sleep_time_distr, "0", 46 eParam_NoThread, GRID_WORKER_TEST_SLEEP_TIME_DISTR); 47 48 NCBI_PARAM_DECL(double, test, failure_rate); 49 typedef NCBI_PARAM_TYPE(test, failure_rate) TParam_FailureRate; 50 NCBI_PARAM_DEF_EX(double, test, failure_rate, 0.0, 51 eParam_NoThread, GRID_WORKER_TEST_FAILURE_RATE); 52 53 54 /// NetSchedule sample job 55 /// 56 /// Job reads an array of doubles, sorts it and returns data back 57 /// to the client as a BLOB. 58 /// 59 class CGridWorkerTest : public IWorkerNodeJob 60 { 61 public: CGridWorkerTest(const IWorkerNodeInitContext &)62 CGridWorkerTest(const IWorkerNodeInitContext&) 63 { 64 GetDiagContext().SetOldPostFormat(false); 65 66 m_SleepTimeDistr.InitFromParameter("sleep_time_distr", 67 TParam_SleepTimeDistr::GetDefault().c_str(), &m_Random); 68 } 69 ~CGridWorkerTest()70 virtual ~CGridWorkerTest() {} 71 Do(CWorkerNodeJobContext & context)72 int Do(CWorkerNodeJobContext& context) 73 { 74 CNcbiIstream& is = context.GetIStream(); 75 string input_type; 76 is >> input_type; 77 if (input_type != "doubles") { 78 context.CommitJobWithFailure( 79 "This worker node can only process the 'doubles' input type."); 80 return 1; 81 } 82 int vsize; 83 is >> vsize; 84 vector<double> v(vsize); 85 for (int i = 0; i < vsize; ++i) 86 is >> v[i]; 87 88 unsigned delay = m_SleepTimeDistr.GetNextValue(); 89 90 if (delay > 0) 91 SleepMilliSec(delay); 92 93 if (m_Random.GetRand() < 94 TParam_FailureRate::GetDefault() * m_Random.GetMax()) 95 context.CommitJobWithFailure("FAILED"); 96 else { 97 sort(v.begin(), v.end()); 98 99 CNcbiOstream& os = context.GetOStream(); 100 os << vsize << ' '; 101 for (int i = 0; i < vsize; ++i) 102 os << v[i] << ' '; 103 context.CommitJob(); 104 } 105 106 return 0; 107 } 108 109 private: 110 CRandom m_Random; 111 CDiscreteDistributionImpl m_SleepTimeDistr; 112 }; 113 114 class CGridWorkerIdleTask : public IWorkerNodeIdleTask 115 { 116 public: CGridWorkerIdleTask(const IWorkerNodeInitContext &)117 CGridWorkerIdleTask(const IWorkerNodeInitContext&) : m_Count(0) 118 { 119 } 120 ~CGridWorkerIdleTask()121 virtual ~CGridWorkerIdleTask() {} 122 Run(CWorkerNodeIdleTaskContext & context)123 virtual void Run(CWorkerNodeIdleTaskContext& context) 124 { 125 LOG_POST("Staring idle task..."); 126 127 for (int i = 0; i < 3 && !context.IsShutdownRequested(); ++i) { 128 LOG_POST("Idle task: iteration: " << i); 129 130 SleepSec(2); 131 } 132 133 if (++m_Count % 3 == 0) 134 context.SetRunAgain(); 135 136 LOG_POST("Stopping idle task..."); 137 } 138 139 private: 140 int m_Count; 141 }; 142 143 // Use this macro to implement the main function for the CGridWorkerTest 144 // with idle task CGridWorkerIdleTask. 145 NCBI_WORKERNODE_MAIN_EX(CGridWorkerTest, CGridWorkerIdleTask, 1.0.0) 146