1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 #include "expect.h"
20 #include "hdfs.h"
21 #include "native_mini_dfs.h"
22 
23 #include <errno.h>
24 #include <semaphore.h>
25 #include <pthread.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 
30 #define TLH_MAX_THREADS 100
31 
32 static struct NativeMiniDfsCluster* cluster;
33 
34 static const char *user;
35 
36 struct tlhThreadInfo {
37     /** Thread index */
38     int threadIdx;
39     /** 0 = thread was successful; error code otherwise */
40     int success;
41     /** pthread identifier */
42     pthread_t thread;
43 };
44 
hdfsSingleNameNodeConnect(struct NativeMiniDfsCluster * cluster,hdfsFS * fs)45 static int hdfsSingleNameNodeConnect(struct NativeMiniDfsCluster *cluster,
46                                      hdfsFS *fs)
47 {
48     int nnPort;
49     const char *nnHost;
50     hdfsFS hdfs;
51 
52     if (nmdGetNameNodeHttpAddress(cluster, &nnPort, &nnHost)) {
53         fprintf(stderr, "Error when retrieving namenode host address.\n");
54         return 1;
55     }
56 
57     hdfs = hdfsConnectAsUser(nnHost, nnPort, user);
58     if(!hdfs) {
59         fprintf(stderr, "Oops! Failed to connect to hdfs!\n");
60         return 1;
61     }
62 
63     *fs = hdfs;
64     return 0;
65 }
66 
doTestHdfsOperations(struct tlhThreadInfo * ti,hdfsFS fs)67 static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs)
68 {
69     char prefix[256], tmp[256];
70     hdfsFile file;
71     int ret, expected;
72     hdfsFileInfo *fileInfo;
73 
74     snprintf(prefix, sizeof(prefix), "/tlhData%04d", ti->threadIdx);
75 
76     if (hdfsExists(fs, prefix) == 0) {
77         EXPECT_ZERO(hdfsDelete(fs, prefix, 1));
78     }
79     EXPECT_ZERO(hdfsCreateDirectory(fs, prefix));
80     snprintf(tmp, sizeof(tmp), "%s/file", prefix);
81 
82     EXPECT_NONNULL(hdfsOpenFile(fs, tmp, O_RDONLY, 0, 0, 0));
83 
84     file = hdfsOpenFile(fs, tmp, O_WRONLY, 0, 0, 0);
85     EXPECT_NONNULL(file);
86 
87     /* TODO: implement writeFully and use it here */
88     expected = (int)strlen(prefix);
89     ret = hdfsWrite(fs, file, prefix, expected);
90     if (ret < 0) {
91         ret = errno;
92         fprintf(stderr, "hdfsWrite failed and set errno %d\n", ret);
93         return ret;
94     }
95     if (ret != expected) {
96         fprintf(stderr, "hdfsWrite was supposed to write %d bytes, but "
97                 "it wrote %d\n", ret, expected);
98         return EIO;
99     }
100     EXPECT_ZERO(hdfsFlush(fs, file));
101     EXPECT_ZERO(hdfsCloseFile(fs, file));
102 
103     /* Let's re-open the file for reading */
104     file = hdfsOpenFile(fs, tmp, O_RDONLY, 0, 0, 0);
105     EXPECT_NONNULL(file);
106 
107     /* TODO: implement readFully and use it here */
108     ret = hdfsRead(fs, file, tmp, sizeof(tmp));
109     if (ret < 0) {
110         ret = errno;
111         fprintf(stderr, "hdfsRead failed and set errno %d\n", ret);
112         return ret;
113     }
114     if (ret != expected) {
115         fprintf(stderr, "hdfsRead was supposed to read %d bytes, but "
116                 "it read %d\n", ret, expected);
117         return EIO;
118     }
119     EXPECT_ZERO(memcmp(prefix, tmp, expected));
120     EXPECT_ZERO(hdfsCloseFile(fs, file));
121 
122     snprintf(tmp, sizeof(tmp), "%s/file", prefix);
123     EXPECT_NONZERO(hdfsChown(fs, tmp, NULL, NULL));
124     EXPECT_ZERO(hdfsChown(fs, tmp, NULL, "doop"));
125     fileInfo = hdfsGetPathInfo(fs, tmp);
126     EXPECT_NONNULL(fileInfo);
127     EXPECT_ZERO(strcmp("doop", fileInfo->mGroup));
128     hdfsFreeFileInfo(fileInfo, 1);
129 
130     EXPECT_ZERO(hdfsChown(fs, tmp, "ha", "doop2"));
131     fileInfo = hdfsGetPathInfo(fs, tmp);
132     EXPECT_NONNULL(fileInfo);
133     EXPECT_ZERO(strcmp("ha", fileInfo->mOwner));
134     EXPECT_ZERO(strcmp("doop2", fileInfo->mGroup));
135     hdfsFreeFileInfo(fileInfo, 1);
136 
137     EXPECT_ZERO(hdfsChown(fs, tmp, "ha2", NULL));
138     fileInfo = hdfsGetPathInfo(fs, tmp);
139     EXPECT_NONNULL(fileInfo);
140     EXPECT_ZERO(strcmp("ha2", fileInfo->mOwner));
141     EXPECT_ZERO(strcmp("doop2", fileInfo->mGroup));
142     hdfsFreeFileInfo(fileInfo, 1);
143 
144     EXPECT_ZERO(hdfsDelete(fs, prefix, 1));
145     return 0;
146 }
147 
testHdfsOperations(void * v)148 static void *testHdfsOperations(void *v)
149 {
150     struct tlhThreadInfo *ti = (struct tlhThreadInfo*)v;
151     hdfsFS fs = NULL;
152     int ret;
153 
154     fprintf(stderr, "testHdfsOperations(threadIdx=%d): starting\n",
155             ti->threadIdx);
156     ret = hdfsSingleNameNodeConnect(cluster, &fs);
157     if (ret) {
158         fprintf(stderr, "testHdfsOperations(threadIdx=%d): "
159                 "hdfsSingleNameNodeConnect failed with error %d.\n",
160                 ti->threadIdx, ret);
161         ti->success = EIO;
162         return NULL;
163     }
164     ti->success = doTestHdfsOperations(ti, fs);
165     if (hdfsDisconnect(fs)) {
166         ret = errno;
167         fprintf(stderr, "hdfsDisconnect error %d\n", ret);
168         ti->success = ret;
169     }
170     return NULL;
171 }
172 
checkFailures(struct tlhThreadInfo * ti,int tlhNumThreads)173 static int checkFailures(struct tlhThreadInfo *ti, int tlhNumThreads)
174 {
175     int i, threadsFailed = 0;
176     const char *sep = "";
177 
178     for (i = 0; i < tlhNumThreads; i++) {
179         if (ti[i].success != 0) {
180             threadsFailed = 1;
181         }
182     }
183     if (!threadsFailed) {
184         fprintf(stderr, "testLibHdfs: all threads succeeded.  SUCCESS.\n");
185         return EXIT_SUCCESS;
186     }
187     fprintf(stderr, "testLibHdfs: some threads failed: [");
188     for (i = 0; i < tlhNumThreads; i++) {
189         if (ti[i].success != 0) {
190             fprintf(stderr, "%s%d", sep, i);
191             sep = ", ";
192         }
193     }
194     fprintf(stderr, "].  FAILURE.\n");
195     return EXIT_FAILURE;
196 }
197 
198 /**
199  * Test that we can write a file with libhdfs and then read it back
200  */
main(int argc,const char * args[])201 int main(int argc, const char *args[])
202 {
203     int i, tlhNumThreads;
204     const char *tlhNumThreadsStr;
205     struct tlhThreadInfo ti[TLH_MAX_THREADS];
206 
207     if (argc != 2) {
208         fprintf(stderr, "usage: test_libwebhdfs_threaded <username>\n");
209         exit(1);
210     }
211     user = args[1];
212 
213     struct NativeMiniDfsConf conf = {
214         .doFormat = 1, .webhdfsEnabled = 1, .namenodeHttpPort = 50070,
215     };
216     cluster = nmdCreate(&conf);
217     EXPECT_NONNULL(cluster);
218     EXPECT_ZERO(nmdWaitClusterUp(cluster));
219 
220     tlhNumThreadsStr = getenv("TLH_NUM_THREADS");
221     if (!tlhNumThreadsStr) {
222         tlhNumThreadsStr = "3";
223     }
224     tlhNumThreads = atoi(tlhNumThreadsStr);
225     if ((tlhNumThreads <= 0) || (tlhNumThreads > TLH_MAX_THREADS)) {
226         fprintf(stderr, "testLibHdfs: must have a number of threads "
227                 "between 1 and %d inclusive, not %d\n",
228                 TLH_MAX_THREADS, tlhNumThreads);
229         return EXIT_FAILURE;
230     }
231     memset(&ti[0], 0, sizeof(ti));
232     for (i = 0; i < tlhNumThreads; i++) {
233         ti[i].threadIdx = i;
234     }
235 
236     for (i = 0; i < tlhNumThreads; i++) {
237         EXPECT_ZERO(pthread_create(&ti[i].thread, NULL,
238                                    testHdfsOperations, &ti[i]));
239     }
240     for (i = 0; i < tlhNumThreads; i++) {
241         EXPECT_ZERO(pthread_join(ti[i].thread, NULL));
242     }
243 
244     EXPECT_ZERO(nmdShutdown(cluster));
245     nmdFree(cluster);
246     return checkFailures(ti, tlhNumThreads);
247 }
248