1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 #include "expect.h"
20 #include "hdfs.h"
21 #include "native_mini_dfs.h"
22
23 #include <errno.h>
24 #include <semaphore.h>
25 #include <pthread.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29
30 #define TLH_MAX_THREADS 100
31
32 static struct NativeMiniDfsCluster* cluster;
33
34 static const char *user;
35
36 struct tlhThreadInfo {
37 /** Thread index */
38 int threadIdx;
39 /** 0 = thread was successful; error code otherwise */
40 int success;
41 /** pthread identifier */
42 pthread_t thread;
43 };
44
hdfsSingleNameNodeConnect(struct NativeMiniDfsCluster * cluster,hdfsFS * fs)45 static int hdfsSingleNameNodeConnect(struct NativeMiniDfsCluster *cluster,
46 hdfsFS *fs)
47 {
48 int nnPort;
49 const char *nnHost;
50 hdfsFS hdfs;
51
52 if (nmdGetNameNodeHttpAddress(cluster, &nnPort, &nnHost)) {
53 fprintf(stderr, "Error when retrieving namenode host address.\n");
54 return 1;
55 }
56
57 hdfs = hdfsConnectAsUser(nnHost, nnPort, user);
58 if(!hdfs) {
59 fprintf(stderr, "Oops! Failed to connect to hdfs!\n");
60 return 1;
61 }
62
63 *fs = hdfs;
64 return 0;
65 }
66
doTestHdfsOperations(struct tlhThreadInfo * ti,hdfsFS fs)67 static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs)
68 {
69 char prefix[256], tmp[256];
70 hdfsFile file;
71 int ret, expected;
72 hdfsFileInfo *fileInfo;
73
74 snprintf(prefix, sizeof(prefix), "/tlhData%04d", ti->threadIdx);
75
76 if (hdfsExists(fs, prefix) == 0) {
77 EXPECT_ZERO(hdfsDelete(fs, prefix, 1));
78 }
79 EXPECT_ZERO(hdfsCreateDirectory(fs, prefix));
80 snprintf(tmp, sizeof(tmp), "%s/file", prefix);
81
82 EXPECT_NONNULL(hdfsOpenFile(fs, tmp, O_RDONLY, 0, 0, 0));
83
84 file = hdfsOpenFile(fs, tmp, O_WRONLY, 0, 0, 0);
85 EXPECT_NONNULL(file);
86
87 /* TODO: implement writeFully and use it here */
88 expected = (int)strlen(prefix);
89 ret = hdfsWrite(fs, file, prefix, expected);
90 if (ret < 0) {
91 ret = errno;
92 fprintf(stderr, "hdfsWrite failed and set errno %d\n", ret);
93 return ret;
94 }
95 if (ret != expected) {
96 fprintf(stderr, "hdfsWrite was supposed to write %d bytes, but "
97 "it wrote %d\n", ret, expected);
98 return EIO;
99 }
100 EXPECT_ZERO(hdfsFlush(fs, file));
101 EXPECT_ZERO(hdfsCloseFile(fs, file));
102
103 /* Let's re-open the file for reading */
104 file = hdfsOpenFile(fs, tmp, O_RDONLY, 0, 0, 0);
105 EXPECT_NONNULL(file);
106
107 /* TODO: implement readFully and use it here */
108 ret = hdfsRead(fs, file, tmp, sizeof(tmp));
109 if (ret < 0) {
110 ret = errno;
111 fprintf(stderr, "hdfsRead failed and set errno %d\n", ret);
112 return ret;
113 }
114 if (ret != expected) {
115 fprintf(stderr, "hdfsRead was supposed to read %d bytes, but "
116 "it read %d\n", ret, expected);
117 return EIO;
118 }
119 EXPECT_ZERO(memcmp(prefix, tmp, expected));
120 EXPECT_ZERO(hdfsCloseFile(fs, file));
121
122 snprintf(tmp, sizeof(tmp), "%s/file", prefix);
123 EXPECT_NONZERO(hdfsChown(fs, tmp, NULL, NULL));
124 EXPECT_ZERO(hdfsChown(fs, tmp, NULL, "doop"));
125 fileInfo = hdfsGetPathInfo(fs, tmp);
126 EXPECT_NONNULL(fileInfo);
127 EXPECT_ZERO(strcmp("doop", fileInfo->mGroup));
128 hdfsFreeFileInfo(fileInfo, 1);
129
130 EXPECT_ZERO(hdfsChown(fs, tmp, "ha", "doop2"));
131 fileInfo = hdfsGetPathInfo(fs, tmp);
132 EXPECT_NONNULL(fileInfo);
133 EXPECT_ZERO(strcmp("ha", fileInfo->mOwner));
134 EXPECT_ZERO(strcmp("doop2", fileInfo->mGroup));
135 hdfsFreeFileInfo(fileInfo, 1);
136
137 EXPECT_ZERO(hdfsChown(fs, tmp, "ha2", NULL));
138 fileInfo = hdfsGetPathInfo(fs, tmp);
139 EXPECT_NONNULL(fileInfo);
140 EXPECT_ZERO(strcmp("ha2", fileInfo->mOwner));
141 EXPECT_ZERO(strcmp("doop2", fileInfo->mGroup));
142 hdfsFreeFileInfo(fileInfo, 1);
143
144 EXPECT_ZERO(hdfsDelete(fs, prefix, 1));
145 return 0;
146 }
147
testHdfsOperations(void * v)148 static void *testHdfsOperations(void *v)
149 {
150 struct tlhThreadInfo *ti = (struct tlhThreadInfo*)v;
151 hdfsFS fs = NULL;
152 int ret;
153
154 fprintf(stderr, "testHdfsOperations(threadIdx=%d): starting\n",
155 ti->threadIdx);
156 ret = hdfsSingleNameNodeConnect(cluster, &fs);
157 if (ret) {
158 fprintf(stderr, "testHdfsOperations(threadIdx=%d): "
159 "hdfsSingleNameNodeConnect failed with error %d.\n",
160 ti->threadIdx, ret);
161 ti->success = EIO;
162 return NULL;
163 }
164 ti->success = doTestHdfsOperations(ti, fs);
165 if (hdfsDisconnect(fs)) {
166 ret = errno;
167 fprintf(stderr, "hdfsDisconnect error %d\n", ret);
168 ti->success = ret;
169 }
170 return NULL;
171 }
172
checkFailures(struct tlhThreadInfo * ti,int tlhNumThreads)173 static int checkFailures(struct tlhThreadInfo *ti, int tlhNumThreads)
174 {
175 int i, threadsFailed = 0;
176 const char *sep = "";
177
178 for (i = 0; i < tlhNumThreads; i++) {
179 if (ti[i].success != 0) {
180 threadsFailed = 1;
181 }
182 }
183 if (!threadsFailed) {
184 fprintf(stderr, "testLibHdfs: all threads succeeded. SUCCESS.\n");
185 return EXIT_SUCCESS;
186 }
187 fprintf(stderr, "testLibHdfs: some threads failed: [");
188 for (i = 0; i < tlhNumThreads; i++) {
189 if (ti[i].success != 0) {
190 fprintf(stderr, "%s%d", sep, i);
191 sep = ", ";
192 }
193 }
194 fprintf(stderr, "]. FAILURE.\n");
195 return EXIT_FAILURE;
196 }
197
198 /**
199 * Test that we can write a file with libhdfs and then read it back
200 */
main(int argc,const char * args[])201 int main(int argc, const char *args[])
202 {
203 int i, tlhNumThreads;
204 const char *tlhNumThreadsStr;
205 struct tlhThreadInfo ti[TLH_MAX_THREADS];
206
207 if (argc != 2) {
208 fprintf(stderr, "usage: test_libwebhdfs_threaded <username>\n");
209 exit(1);
210 }
211 user = args[1];
212
213 struct NativeMiniDfsConf conf = {
214 .doFormat = 1, .webhdfsEnabled = 1, .namenodeHttpPort = 50070,
215 };
216 cluster = nmdCreate(&conf);
217 EXPECT_NONNULL(cluster);
218 EXPECT_ZERO(nmdWaitClusterUp(cluster));
219
220 tlhNumThreadsStr = getenv("TLH_NUM_THREADS");
221 if (!tlhNumThreadsStr) {
222 tlhNumThreadsStr = "3";
223 }
224 tlhNumThreads = atoi(tlhNumThreadsStr);
225 if ((tlhNumThreads <= 0) || (tlhNumThreads > TLH_MAX_THREADS)) {
226 fprintf(stderr, "testLibHdfs: must have a number of threads "
227 "between 1 and %d inclusive, not %d\n",
228 TLH_MAX_THREADS, tlhNumThreads);
229 return EXIT_FAILURE;
230 }
231 memset(&ti[0], 0, sizeof(ti));
232 for (i = 0; i < tlhNumThreads; i++) {
233 ti[i].threadIdx = i;
234 }
235
236 for (i = 0; i < tlhNumThreads; i++) {
237 EXPECT_ZERO(pthread_create(&ti[i].thread, NULL,
238 testHdfsOperations, &ti[i]));
239 }
240 for (i = 0; i < tlhNumThreads; i++) {
241 EXPECT_ZERO(pthread_join(ti[i].thread, NULL));
242 }
243
244 EXPECT_ZERO(nmdShutdown(cluster));
245 nmdFree(cluster);
246 return checkFailures(ti, tlhNumThreads);
247 }
248