1 // This is brl/bbas/bhdfs/pro/processes/bhdfs_fs_manager_processes.cxx
2 #include <bprb/bprb_func_process.h>
3 //:
4 // \file
5 // \brief  Processes for creating HDFS manager and run various FS operations
6 //
7 // \author Ozge C. Ozcanli
8 // \date Dec 14, 2011
9 
10 #include <bhdfs/bhdfs_manager.h>
11 
12 
13 namespace bhdfs_create_fs_manager_process_globals
14 {
15   constexpr unsigned n_inputs_ = 2;
16   constexpr unsigned n_outputs_ = 1;
17 }
18 
bhdfs_create_fs_manager_process_cons(bprb_func_process & pro)19 bool bhdfs_create_fs_manager_process_cons(bprb_func_process& pro)
20 {
21   using namespace bhdfs_create_fs_manager_process_globals;
22 
23   //process takes 2 inputs
24   std::vector<std::string> input_types_(n_inputs_);
25   input_types_[0] = "vcl_string";  // host_name
26   input_types_[1] = "int";         // port
27 
28   // process has 1 output
29   std::vector<std::string>  output_types_(n_outputs_);
30   output_types_[0] = "bhdfs_manager_sptr";
31 
32   return pro.set_input_types(input_types_) && pro.set_output_types(output_types_);
33 }
34 
bhdfs_create_fs_manager_process(bprb_func_process & pro)35 bool bhdfs_create_fs_manager_process(bprb_func_process& pro)
36 {
37   using namespace bhdfs_create_fs_manager_process_globals;
38 
39   if ( pro.n_inputs() < n_inputs_ ){
40     std::cout << pro.name() << ": The number of inputs should be " << n_inputs_<< std::endl;
41     return false;
42   }
43   //get the inputs
44   unsigned i = 0;
45   std::string host_name= pro.get_input<std::string>(i++);
46   int port = pro.get_input<int>(i++);
47 
48   if (!bhdfs_manager::exists())
49     bhdfs_manager::create(host_name, port);
50 
51   if (!bhdfs_manager::exists())  // if still doesn't exist, there is a problem
52     return false;
53 
54   i=0;
55   // store smart pointer
56   pro.set_output_val<bhdfs_manager_sptr>(i++, bhdfs_manager::instance());
57   return true;
58 }
59 
60 namespace bhdfs_fs_create_dir_process_globals
61 {
62   constexpr unsigned n_inputs_ = 1;
63   constexpr unsigned n_outputs_ = 0;
64 }
65 
bhdfs_fs_create_dir_process_cons(bprb_func_process & pro)66 bool bhdfs_fs_create_dir_process_cons(bprb_func_process& pro)
67 {
68   using namespace bhdfs_fs_create_dir_process_globals;
69 
70   //process takes 1 input
71   std::vector<std::string> input_types_(n_inputs_);
72   input_types_[0] = "vcl_string";  // path on current dir
73 
74   // process has no output
75   std::vector<std::string>  output_types_(n_outputs_);
76 
77   return pro.set_input_types(input_types_) && pro.set_output_types(output_types_);
78 }
79 
bhdfs_fs_create_dir_process(bprb_func_process & pro)80 bool bhdfs_fs_create_dir_process(bprb_func_process& pro)
81 {
82   using namespace bhdfs_fs_create_dir_process_globals;
83 
84   if ( pro.n_inputs() < n_inputs_ ){
85     std::cout << pro.name() << ": The number of inputs should be " << n_inputs_<< std::endl;
86     return false;
87   }
88   //get the inputs
89   unsigned i = 0;
90   std::string path= pro.get_input<std::string>(i++);
91 
92   if (!bhdfs_manager::exists())
93     return false;
94 
95   bhdfs_manager_sptr mins = bhdfs_manager::instance();
96   std::string fpath = mins->get_working_dir() + "/" + path;
97   if (mins->exists(fpath))
98     return true;
99   else
100     return mins->create_dir(fpath);
101 }
102 
103 namespace bhdfs_fs_copy_file_process_globals
104 {
105   constexpr unsigned n_inputs_ = 2;
106   constexpr unsigned n_outputs_ = 0;
107 }
108 
bhdfs_fs_copy_file_process_cons(bprb_func_process & pro)109 bool bhdfs_fs_copy_file_process_cons(bprb_func_process& pro)
110 {
111   using namespace bhdfs_fs_copy_file_process_globals;
112 
113   //process takes 2 inputs
114   std::vector<std::string> input_types_(n_inputs_);
115   input_types_[0] = "vcl_string";  // full path of file on local dir
116   input_types_[1] = "vcl_string";  // path on hdfs
117 
118   // process has no output
119   std::vector<std::string>  output_types_(n_outputs_);
120 
121   return pro.set_input_types(input_types_) && pro.set_output_types(output_types_);
122 }
123 
bhdfs_fs_copy_file_process(bprb_func_process & pro)124 bool bhdfs_fs_copy_file_process(bprb_func_process& pro)
125 {
126   using namespace bhdfs_fs_copy_file_process_globals;
127 
128   if ( pro.n_inputs() < n_inputs_ ){
129     std::cout << pro.name() << ": The number of inputs should be " << n_inputs_<< std::endl;
130     return false;
131   }
132   if (!bhdfs_manager::exists())
133     return false;
134 
135   //get the inputs
136   unsigned i = 0;
137   std::string local_file= pro.get_input<std::string>(i++);
138 
139   bhdfs_manager_sptr mins = bhdfs_manager::instance();
140   std::string hdfs_path= mins->get_working_dir() + "/" + pro.get_input<std::string>(i++);
141   return mins->copy_to_hdfs(local_file, hdfs_path);
142 }
143 
144 namespace bhdfs_fs_copy_files_to_local_process_globals
145 {
146   constexpr unsigned n_inputs_ = 3;
147   constexpr unsigned n_outputs_ = 0;
148 }
149 
bhdfs_fs_copy_files_to_local_process_cons(bprb_func_process & pro)150 bool bhdfs_fs_copy_files_to_local_process_cons(bprb_func_process& pro)
151 {
152   using namespace bhdfs_fs_copy_files_to_local_process_globals;
153 
154   //process takes 3 inputs
155   std::vector<std::string> input_types_(n_inputs_);
156   input_types_[0] = "vcl_string";  // path on hdfs
157   input_types_[1] = "vcl_string";  // name ending
158   input_types_[2] = "vcl_string";  // local path
159 
160   // process has no output
161   std::vector<std::string>  output_types_(n_outputs_);
162 
163   return pro.set_input_types(input_types_) && pro.set_output_types(output_types_);
164 }
165 
bhdfs_fs_copy_files_to_local_process(bprb_func_process & pro)166 bool bhdfs_fs_copy_files_to_local_process(bprb_func_process& pro)
167 {
168   using namespace bhdfs_fs_copy_files_to_local_process_globals;
169 
170   if ( pro.n_inputs() < n_inputs_ ){
171     std::cout << pro.name() << ": The number of inputs should be " << n_inputs_<< std::endl;
172     return false;
173   }
174   if (!bhdfs_manager::exists())
175     return false;
176   bhdfs_manager_sptr mins = bhdfs_manager::instance();
177 
178   //get the inputs
179   unsigned i = 0;
180   std::string hdfs_path= mins->get_working_dir() + "/" + pro.get_input<std::string>(i++);
181   std::string name_ending= pro.get_input<std::string>(i++);
182   std::string local_folder= pro.get_input<std::string>(i++);
183 
184   std::vector<std::string> dir_list;
185   if (!mins->get_dir_list(hdfs_path, dir_list)) {
186     std::cout << "In bhdfs_fs_copy_files_to_local_process() - cannot get dirlist!\n";
187     return false;
188   }
189   bool ok = true;
190   for (unsigned i = 0; i < dir_list.size(); i++) {
191     std::string filename = dir_list[i];
192     if (filename.find(name_ending) != std::string::npos)
193       ok = ok && mins->copy_from_hdfs(filename, local_folder);
194   }
195   if (!ok) {
196     std::cerr << "In bhdfs_fs_copy_files_to_local_process() - there were problems copying files!\n";
197   }
198   return true;
199 }
200 
201 
202 namespace bhdfs_fs_get_working_dir_process_globals
203 {
204   constexpr unsigned n_inputs_ = 0;
205   constexpr unsigned n_outputs_ = 1;
206 }
207 
bhdfs_fs_get_working_dir_process_cons(bprb_func_process & pro)208 bool bhdfs_fs_get_working_dir_process_cons(bprb_func_process& pro)
209 {
210   using namespace bhdfs_fs_get_working_dir_process_globals;
211 
212   //process takes no input
213   std::vector<std::string> input_types_(n_inputs_);
214 
215   // process has 1 output
216   std::vector<std::string>  output_types_(n_outputs_);
217   output_types_[0] = "vcl_string";
218 
219   return pro.set_input_types(input_types_) && pro.set_output_types(output_types_);
220 }
221 
bhdfs_fs_get_working_dir_process(bprb_func_process & pro)222 bool bhdfs_fs_get_working_dir_process(bprb_func_process& pro)
223 {
224   using namespace bhdfs_fs_get_working_dir_process_globals;
225 
226   if ( pro.n_inputs() < n_inputs_ ){
227     std::cout << pro.name() << ": The number of inputs should be " << n_inputs_<< std::endl;
228     return false;
229   }
230   if (!bhdfs_manager::exists())
231     return false;
232 
233   bhdfs_manager_sptr mins = bhdfs_manager::instance();
234   std::string hdfs_path= mins->get_working_dir();
235   pro.set_output_val<std::string>(0, hdfs_path); //  fix the path names in stdin.txt file in the script s4
236   return true;
237 }
238