1 // This is brl/bbas/bhdfs/pro/processes/bhdfs_fs_manager_processes.cxx 2 #include <bprb/bprb_func_process.h> 3 //: 4 // \file 5 // \brief Processes for creating HDFS manager and run various FS operations 6 // 7 // \author Ozge C. Ozcanli 8 // \date Dec 14, 2011 9 10 #include <bhdfs/bhdfs_manager.h> 11 12 13 namespace bhdfs_create_fs_manager_process_globals 14 { 15 constexpr unsigned n_inputs_ = 2; 16 constexpr unsigned n_outputs_ = 1; 17 } 18 bhdfs_create_fs_manager_process_cons(bprb_func_process & pro)19bool bhdfs_create_fs_manager_process_cons(bprb_func_process& pro) 20 { 21 using namespace bhdfs_create_fs_manager_process_globals; 22 23 //process takes 2 inputs 24 std::vector<std::string> input_types_(n_inputs_); 25 input_types_[0] = "vcl_string"; // host_name 26 input_types_[1] = "int"; // port 27 28 // process has 1 output 29 std::vector<std::string> output_types_(n_outputs_); 30 output_types_[0] = "bhdfs_manager_sptr"; 31 32 return pro.set_input_types(input_types_) && pro.set_output_types(output_types_); 33 } 34 bhdfs_create_fs_manager_process(bprb_func_process & pro)35bool bhdfs_create_fs_manager_process(bprb_func_process& pro) 36 { 37 using namespace bhdfs_create_fs_manager_process_globals; 38 39 if ( pro.n_inputs() < n_inputs_ ){ 40 std::cout << pro.name() << ": The number of inputs should be " << n_inputs_<< std::endl; 41 return false; 42 } 43 //get the inputs 44 unsigned i = 0; 45 std::string host_name= pro.get_input<std::string>(i++); 46 int port = pro.get_input<int>(i++); 47 48 if (!bhdfs_manager::exists()) 49 bhdfs_manager::create(host_name, port); 50 51 if (!bhdfs_manager::exists()) // if still doesn't exist, there is a problem 52 return false; 53 54 i=0; 55 // store smart pointer 56 pro.set_output_val<bhdfs_manager_sptr>(i++, bhdfs_manager::instance()); 57 return true; 58 } 59 60 namespace bhdfs_fs_create_dir_process_globals 61 { 62 constexpr unsigned n_inputs_ = 1; 63 constexpr unsigned n_outputs_ = 0; 64 } 65 bhdfs_fs_create_dir_process_cons(bprb_func_process & pro)66bool bhdfs_fs_create_dir_process_cons(bprb_func_process& pro) 67 { 68 using namespace bhdfs_fs_create_dir_process_globals; 69 70 //process takes 1 input 71 std::vector<std::string> input_types_(n_inputs_); 72 input_types_[0] = "vcl_string"; // path on current dir 73 74 // process has no output 75 std::vector<std::string> output_types_(n_outputs_); 76 77 return pro.set_input_types(input_types_) && pro.set_output_types(output_types_); 78 } 79 bhdfs_fs_create_dir_process(bprb_func_process & pro)80bool bhdfs_fs_create_dir_process(bprb_func_process& pro) 81 { 82 using namespace bhdfs_fs_create_dir_process_globals; 83 84 if ( pro.n_inputs() < n_inputs_ ){ 85 std::cout << pro.name() << ": The number of inputs should be " << n_inputs_<< std::endl; 86 return false; 87 } 88 //get the inputs 89 unsigned i = 0; 90 std::string path= pro.get_input<std::string>(i++); 91 92 if (!bhdfs_manager::exists()) 93 return false; 94 95 bhdfs_manager_sptr mins = bhdfs_manager::instance(); 96 std::string fpath = mins->get_working_dir() + "/" + path; 97 if (mins->exists(fpath)) 98 return true; 99 else 100 return mins->create_dir(fpath); 101 } 102 103 namespace bhdfs_fs_copy_file_process_globals 104 { 105 constexpr unsigned n_inputs_ = 2; 106 constexpr unsigned n_outputs_ = 0; 107 } 108 bhdfs_fs_copy_file_process_cons(bprb_func_process & pro)109bool bhdfs_fs_copy_file_process_cons(bprb_func_process& pro) 110 { 111 using namespace bhdfs_fs_copy_file_process_globals; 112 113 //process takes 2 inputs 114 std::vector<std::string> input_types_(n_inputs_); 115 input_types_[0] = "vcl_string"; // full path of file on local dir 116 input_types_[1] = "vcl_string"; // path on hdfs 117 118 // process has no output 119 std::vector<std::string> output_types_(n_outputs_); 120 121 return pro.set_input_types(input_types_) && pro.set_output_types(output_types_); 122 } 123 bhdfs_fs_copy_file_process(bprb_func_process & pro)124bool bhdfs_fs_copy_file_process(bprb_func_process& pro) 125 { 126 using namespace bhdfs_fs_copy_file_process_globals; 127 128 if ( pro.n_inputs() < n_inputs_ ){ 129 std::cout << pro.name() << ": The number of inputs should be " << n_inputs_<< std::endl; 130 return false; 131 } 132 if (!bhdfs_manager::exists()) 133 return false; 134 135 //get the inputs 136 unsigned i = 0; 137 std::string local_file= pro.get_input<std::string>(i++); 138 139 bhdfs_manager_sptr mins = bhdfs_manager::instance(); 140 std::string hdfs_path= mins->get_working_dir() + "/" + pro.get_input<std::string>(i++); 141 return mins->copy_to_hdfs(local_file, hdfs_path); 142 } 143 144 namespace bhdfs_fs_copy_files_to_local_process_globals 145 { 146 constexpr unsigned n_inputs_ = 3; 147 constexpr unsigned n_outputs_ = 0; 148 } 149 bhdfs_fs_copy_files_to_local_process_cons(bprb_func_process & pro)150bool bhdfs_fs_copy_files_to_local_process_cons(bprb_func_process& pro) 151 { 152 using namespace bhdfs_fs_copy_files_to_local_process_globals; 153 154 //process takes 3 inputs 155 std::vector<std::string> input_types_(n_inputs_); 156 input_types_[0] = "vcl_string"; // path on hdfs 157 input_types_[1] = "vcl_string"; // name ending 158 input_types_[2] = "vcl_string"; // local path 159 160 // process has no output 161 std::vector<std::string> output_types_(n_outputs_); 162 163 return pro.set_input_types(input_types_) && pro.set_output_types(output_types_); 164 } 165 bhdfs_fs_copy_files_to_local_process(bprb_func_process & pro)166bool bhdfs_fs_copy_files_to_local_process(bprb_func_process& pro) 167 { 168 using namespace bhdfs_fs_copy_files_to_local_process_globals; 169 170 if ( pro.n_inputs() < n_inputs_ ){ 171 std::cout << pro.name() << ": The number of inputs should be " << n_inputs_<< std::endl; 172 return false; 173 } 174 if (!bhdfs_manager::exists()) 175 return false; 176 bhdfs_manager_sptr mins = bhdfs_manager::instance(); 177 178 //get the inputs 179 unsigned i = 0; 180 std::string hdfs_path= mins->get_working_dir() + "/" + pro.get_input<std::string>(i++); 181 std::string name_ending= pro.get_input<std::string>(i++); 182 std::string local_folder= pro.get_input<std::string>(i++); 183 184 std::vector<std::string> dir_list; 185 if (!mins->get_dir_list(hdfs_path, dir_list)) { 186 std::cout << "In bhdfs_fs_copy_files_to_local_process() - cannot get dirlist!\n"; 187 return false; 188 } 189 bool ok = true; 190 for (unsigned i = 0; i < dir_list.size(); i++) { 191 std::string filename = dir_list[i]; 192 if (filename.find(name_ending) != std::string::npos) 193 ok = ok && mins->copy_from_hdfs(filename, local_folder); 194 } 195 if (!ok) { 196 std::cerr << "In bhdfs_fs_copy_files_to_local_process() - there were problems copying files!\n"; 197 } 198 return true; 199 } 200 201 202 namespace bhdfs_fs_get_working_dir_process_globals 203 { 204 constexpr unsigned n_inputs_ = 0; 205 constexpr unsigned n_outputs_ = 1; 206 } 207 bhdfs_fs_get_working_dir_process_cons(bprb_func_process & pro)208bool bhdfs_fs_get_working_dir_process_cons(bprb_func_process& pro) 209 { 210 using namespace bhdfs_fs_get_working_dir_process_globals; 211 212 //process takes no input 213 std::vector<std::string> input_types_(n_inputs_); 214 215 // process has 1 output 216 std::vector<std::string> output_types_(n_outputs_); 217 output_types_[0] = "vcl_string"; 218 219 return pro.set_input_types(input_types_) && pro.set_output_types(output_types_); 220 } 221 bhdfs_fs_get_working_dir_process(bprb_func_process & pro)222bool bhdfs_fs_get_working_dir_process(bprb_func_process& pro) 223 { 224 using namespace bhdfs_fs_get_working_dir_process_globals; 225 226 if ( pro.n_inputs() < n_inputs_ ){ 227 std::cout << pro.name() << ": The number of inputs should be " << n_inputs_<< std::endl; 228 return false; 229 } 230 if (!bhdfs_manager::exists()) 231 return false; 232 233 bhdfs_manager_sptr mins = bhdfs_manager::instance(); 234 std::string hdfs_path= mins->get_working_dir(); 235 pro.set_output_val<std::string>(0, hdfs_path); // fix the path names in stdin.txt file in the script s4 236 return true; 237 } 238