1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.ui.storage
19
20import org.mockito.Mockito._
21
22import org.apache.spark.SparkFunSuite
23import org.apache.spark.storage._
24
25class StoragePageSuite extends SparkFunSuite {
26
27  val storageTab = mock(classOf[StorageTab])
28  when(storageTab.basePath).thenReturn("http://localhost:4040")
29  val storagePage = new StoragePage(storageTab)
30
31  test("rddTable") {
32    val rdd1 = new RDDInfo(1,
33      "rdd1",
34      10,
35      StorageLevel.MEMORY_ONLY,
36      Seq.empty)
37    rdd1.memSize = 100
38    rdd1.numCachedPartitions = 10
39
40    val rdd2 = new RDDInfo(2,
41      "rdd2",
42      10,
43      StorageLevel.DISK_ONLY,
44      Seq.empty)
45    rdd2.diskSize = 200
46    rdd2.numCachedPartitions = 5
47
48    val rdd3 = new RDDInfo(3,
49      "rdd3",
50      10,
51      StorageLevel.MEMORY_AND_DISK_SER,
52      Seq.empty)
53    rdd3.memSize = 400
54    rdd3.diskSize = 500
55    rdd3.numCachedPartitions = 10
56
57    val xmlNodes = storagePage.rddTable(Seq(rdd1, rdd2, rdd3))
58
59    val headers = Seq(
60      "RDD Name",
61      "Storage Level",
62      "Cached Partitions",
63      "Fraction Cached",
64      "Size in Memory",
65      "Size on Disk")
66    assert((xmlNodes \\ "th").map(_.text) === headers)
67
68    assert((xmlNodes \\ "tr").size === 3)
69    assert(((xmlNodes \\ "tr")(0) \\ "td").map(_.text.trim) ===
70      Seq("rdd1", "Memory Deserialized 1x Replicated", "10", "100%", "100.0 B", "0.0 B"))
71    // Check the url
72    assert(((xmlNodes \\ "tr")(0) \\ "td" \ "a")(0).attribute("href").map(_.text) ===
73      Some("http://localhost:4040/storage/rdd?id=1"))
74
75    assert(((xmlNodes \\ "tr")(1) \\ "td").map(_.text.trim) ===
76      Seq("rdd2", "Disk Serialized 1x Replicated", "5", "50%", "0.0 B", "200.0 B"))
77    // Check the url
78    assert(((xmlNodes \\ "tr")(1) \\ "td" \ "a")(0).attribute("href").map(_.text) ===
79      Some("http://localhost:4040/storage/rdd?id=2"))
80
81    assert(((xmlNodes \\ "tr")(2) \\ "td").map(_.text.trim) ===
82      Seq("rdd3", "Disk Memory Serialized 1x Replicated", "10", "100%", "400.0 B", "500.0 B"))
83    // Check the url
84    assert(((xmlNodes \\ "tr")(2) \\ "td" \ "a")(0).attribute("href").map(_.text) ===
85      Some("http://localhost:4040/storage/rdd?id=3"))
86  }
87
88  test("empty rddTable") {
89    assert(storagePage.rddTable(Seq.empty).isEmpty)
90  }
91
92  test("streamBlockStorageLevelDescriptionAndSize") {
93    val memoryBlock = BlockUIData(StreamBlockId(0, 0),
94      "localhost:1111",
95      StorageLevel.MEMORY_ONLY,
96      memSize = 100,
97      diskSize = 0)
98    assert(("Memory", 100) === storagePage.streamBlockStorageLevelDescriptionAndSize(memoryBlock))
99
100    val memorySerializedBlock = BlockUIData(StreamBlockId(0, 0),
101      "localhost:1111",
102      StorageLevel.MEMORY_ONLY_SER,
103      memSize = 100,
104      diskSize = 0)
105    assert(("Memory Serialized", 100) ===
106      storagePage.streamBlockStorageLevelDescriptionAndSize(memorySerializedBlock))
107
108    val diskBlock = BlockUIData(StreamBlockId(0, 0),
109      "localhost:1111",
110      StorageLevel.DISK_ONLY,
111      memSize = 0,
112      diskSize = 100)
113    assert(("Disk", 100) === storagePage.streamBlockStorageLevelDescriptionAndSize(diskBlock))
114  }
115
116  test("receiverBlockTables") {
117    val blocksForExecutor0 = Seq(
118      BlockUIData(StreamBlockId(0, 0),
119        "localhost:10000",
120        StorageLevel.MEMORY_ONLY,
121        memSize = 100,
122        diskSize = 0),
123      BlockUIData(StreamBlockId(1, 1),
124        "localhost:10000",
125        StorageLevel.DISK_ONLY,
126        memSize = 0,
127        diskSize = 100)
128    )
129    val executor0 = ExecutorStreamBlockStatus("0", "localhost:10000", blocksForExecutor0)
130
131    val blocksForExecutor1 = Seq(
132      BlockUIData(StreamBlockId(0, 0),
133        "localhost:10001",
134        StorageLevel.MEMORY_ONLY,
135        memSize = 100,
136        diskSize = 0),
137      BlockUIData(StreamBlockId(1, 1),
138        "localhost:10001",
139        StorageLevel.MEMORY_ONLY_SER,
140        memSize = 100,
141        diskSize = 0)
142    )
143    val executor1 = ExecutorStreamBlockStatus("1", "localhost:10001", blocksForExecutor1)
144    val xmlNodes = storagePage.receiverBlockTables(Seq(executor0, executor1))
145
146    val executorTable = (xmlNodes \\ "table")(0)
147    val executorHeaders = Seq(
148      "Executor ID",
149      "Address",
150      "Total Size in Memory",
151      "Total Size on Disk",
152      "Stream Blocks")
153    assert((executorTable \\ "th").map(_.text) === executorHeaders)
154
155    assert((executorTable \\ "tr").size === 2)
156    assert(((executorTable \\ "tr")(0) \\ "td").map(_.text.trim) ===
157      Seq("0", "localhost:10000", "100.0 B", "100.0 B", "2"))
158    assert(((executorTable \\ "tr")(1) \\ "td").map(_.text.trim) ===
159      Seq("1", "localhost:10001", "200.0 B", "0.0 B", "2"))
160
161    val blockTable = (xmlNodes \\ "table")(1)
162    val blockHeaders = Seq(
163      "Block ID",
164      "Replication Level",
165      "Location",
166      "Storage Level",
167      "Size")
168    assert((blockTable \\ "th").map(_.text) === blockHeaders)
169
170    assert((blockTable \\ "tr").size === 4)
171    assert(((blockTable \\ "tr")(0) \\ "td").map(_.text.trim) ===
172      Seq("input-0-0", "2", "localhost:10000", "Memory", "100.0 B"))
173    // Check "rowspan=2" for the first 2 columns
174    assert(((blockTable \\ "tr")(0) \\ "td")(0).attribute("rowspan").map(_.text) === Some("2"))
175    assert(((blockTable \\ "tr")(0) \\ "td")(1).attribute("rowspan").map(_.text) === Some("2"))
176
177    assert(((blockTable \\ "tr")(1) \\ "td").map(_.text.trim) ===
178      Seq("localhost:10001", "Memory", "100.0 B"))
179
180    assert(((blockTable \\ "tr")(2) \\ "td").map(_.text.trim) ===
181      Seq("input-1-1", "2", "localhost:10000", "Disk", "100.0 B"))
182    // Check "rowspan=2" for the first 2 columns
183    assert(((blockTable \\ "tr")(2) \\ "td")(0).attribute("rowspan").map(_.text) === Some("2"))
184    assert(((blockTable \\ "tr")(2) \\ "td")(1).attribute("rowspan").map(_.text) === Some("2"))
185
186    assert(((blockTable \\ "tr")(3) \\ "td").map(_.text.trim) ===
187      Seq("localhost:10001", "Memory Serialized", "100.0 B"))
188  }
189
190  test("empty receiverBlockTables") {
191    assert(storagePage.receiverBlockTables(Seq.empty).isEmpty)
192    val executor0 = ExecutorStreamBlockStatus("0", "localhost:10000", Seq.empty)
193    val executor1 = ExecutorStreamBlockStatus("1", "localhost:10001", Seq.empty)
194    assert(storagePage.receiverBlockTables(Seq(executor0, executor1)).isEmpty)
195  }
196}
197