1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.ui.storage 19 20import org.mockito.Mockito._ 21 22import org.apache.spark.SparkFunSuite 23import org.apache.spark.storage._ 24 25class StoragePageSuite extends SparkFunSuite { 26 27 val storageTab = mock(classOf[StorageTab]) 28 when(storageTab.basePath).thenReturn("http://localhost:4040") 29 val storagePage = new StoragePage(storageTab) 30 31 test("rddTable") { 32 val rdd1 = new RDDInfo(1, 33 "rdd1", 34 10, 35 StorageLevel.MEMORY_ONLY, 36 Seq.empty) 37 rdd1.memSize = 100 38 rdd1.numCachedPartitions = 10 39 40 val rdd2 = new RDDInfo(2, 41 "rdd2", 42 10, 43 StorageLevel.DISK_ONLY, 44 Seq.empty) 45 rdd2.diskSize = 200 46 rdd2.numCachedPartitions = 5 47 48 val rdd3 = new RDDInfo(3, 49 "rdd3", 50 10, 51 StorageLevel.MEMORY_AND_DISK_SER, 52 Seq.empty) 53 rdd3.memSize = 400 54 rdd3.diskSize = 500 55 rdd3.numCachedPartitions = 10 56 57 val xmlNodes = storagePage.rddTable(Seq(rdd1, rdd2, rdd3)) 58 59 val headers = Seq( 60 "RDD Name", 61 "Storage Level", 62 "Cached Partitions", 63 "Fraction Cached", 64 "Size in Memory", 65 "Size on Disk") 66 assert((xmlNodes \\ "th").map(_.text) === headers) 67 68 assert((xmlNodes \\ "tr").size === 3) 69 assert(((xmlNodes \\ "tr")(0) \\ "td").map(_.text.trim) === 70 Seq("rdd1", "Memory Deserialized 1x Replicated", "10", "100%", "100.0 B", "0.0 B")) 71 // Check the url 72 assert(((xmlNodes \\ "tr")(0) \\ "td" \ "a")(0).attribute("href").map(_.text) === 73 Some("http://localhost:4040/storage/rdd?id=1")) 74 75 assert(((xmlNodes \\ "tr")(1) \\ "td").map(_.text.trim) === 76 Seq("rdd2", "Disk Serialized 1x Replicated", "5", "50%", "0.0 B", "200.0 B")) 77 // Check the url 78 assert(((xmlNodes \\ "tr")(1) \\ "td" \ "a")(0).attribute("href").map(_.text) === 79 Some("http://localhost:4040/storage/rdd?id=2")) 80 81 assert(((xmlNodes \\ "tr")(2) \\ "td").map(_.text.trim) === 82 Seq("rdd3", "Disk Memory Serialized 1x Replicated", "10", "100%", "400.0 B", "500.0 B")) 83 // Check the url 84 assert(((xmlNodes \\ "tr")(2) \\ "td" \ "a")(0).attribute("href").map(_.text) === 85 Some("http://localhost:4040/storage/rdd?id=3")) 86 } 87 88 test("empty rddTable") { 89 assert(storagePage.rddTable(Seq.empty).isEmpty) 90 } 91 92 test("streamBlockStorageLevelDescriptionAndSize") { 93 val memoryBlock = BlockUIData(StreamBlockId(0, 0), 94 "localhost:1111", 95 StorageLevel.MEMORY_ONLY, 96 memSize = 100, 97 diskSize = 0) 98 assert(("Memory", 100) === storagePage.streamBlockStorageLevelDescriptionAndSize(memoryBlock)) 99 100 val memorySerializedBlock = BlockUIData(StreamBlockId(0, 0), 101 "localhost:1111", 102 StorageLevel.MEMORY_ONLY_SER, 103 memSize = 100, 104 diskSize = 0) 105 assert(("Memory Serialized", 100) === 106 storagePage.streamBlockStorageLevelDescriptionAndSize(memorySerializedBlock)) 107 108 val diskBlock = BlockUIData(StreamBlockId(0, 0), 109 "localhost:1111", 110 StorageLevel.DISK_ONLY, 111 memSize = 0, 112 diskSize = 100) 113 assert(("Disk", 100) === storagePage.streamBlockStorageLevelDescriptionAndSize(diskBlock)) 114 } 115 116 test("receiverBlockTables") { 117 val blocksForExecutor0 = Seq( 118 BlockUIData(StreamBlockId(0, 0), 119 "localhost:10000", 120 StorageLevel.MEMORY_ONLY, 121 memSize = 100, 122 diskSize = 0), 123 BlockUIData(StreamBlockId(1, 1), 124 "localhost:10000", 125 StorageLevel.DISK_ONLY, 126 memSize = 0, 127 diskSize = 100) 128 ) 129 val executor0 = ExecutorStreamBlockStatus("0", "localhost:10000", blocksForExecutor0) 130 131 val blocksForExecutor1 = Seq( 132 BlockUIData(StreamBlockId(0, 0), 133 "localhost:10001", 134 StorageLevel.MEMORY_ONLY, 135 memSize = 100, 136 diskSize = 0), 137 BlockUIData(StreamBlockId(1, 1), 138 "localhost:10001", 139 StorageLevel.MEMORY_ONLY_SER, 140 memSize = 100, 141 diskSize = 0) 142 ) 143 val executor1 = ExecutorStreamBlockStatus("1", "localhost:10001", blocksForExecutor1) 144 val xmlNodes = storagePage.receiverBlockTables(Seq(executor0, executor1)) 145 146 val executorTable = (xmlNodes \\ "table")(0) 147 val executorHeaders = Seq( 148 "Executor ID", 149 "Address", 150 "Total Size in Memory", 151 "Total Size on Disk", 152 "Stream Blocks") 153 assert((executorTable \\ "th").map(_.text) === executorHeaders) 154 155 assert((executorTable \\ "tr").size === 2) 156 assert(((executorTable \\ "tr")(0) \\ "td").map(_.text.trim) === 157 Seq("0", "localhost:10000", "100.0 B", "100.0 B", "2")) 158 assert(((executorTable \\ "tr")(1) \\ "td").map(_.text.trim) === 159 Seq("1", "localhost:10001", "200.0 B", "0.0 B", "2")) 160 161 val blockTable = (xmlNodes \\ "table")(1) 162 val blockHeaders = Seq( 163 "Block ID", 164 "Replication Level", 165 "Location", 166 "Storage Level", 167 "Size") 168 assert((blockTable \\ "th").map(_.text) === blockHeaders) 169 170 assert((blockTable \\ "tr").size === 4) 171 assert(((blockTable \\ "tr")(0) \\ "td").map(_.text.trim) === 172 Seq("input-0-0", "2", "localhost:10000", "Memory", "100.0 B")) 173 // Check "rowspan=2" for the first 2 columns 174 assert(((blockTable \\ "tr")(0) \\ "td")(0).attribute("rowspan").map(_.text) === Some("2")) 175 assert(((blockTable \\ "tr")(0) \\ "td")(1).attribute("rowspan").map(_.text) === Some("2")) 176 177 assert(((blockTable \\ "tr")(1) \\ "td").map(_.text.trim) === 178 Seq("localhost:10001", "Memory", "100.0 B")) 179 180 assert(((blockTable \\ "tr")(2) \\ "td").map(_.text.trim) === 181 Seq("input-1-1", "2", "localhost:10000", "Disk", "100.0 B")) 182 // Check "rowspan=2" for the first 2 columns 183 assert(((blockTable \\ "tr")(2) \\ "td")(0).attribute("rowspan").map(_.text) === Some("2")) 184 assert(((blockTable \\ "tr")(2) \\ "td")(1).attribute("rowspan").map(_.text) === Some("2")) 185 186 assert(((blockTable \\ "tr")(3) \\ "td").map(_.text.trim) === 187 Seq("localhost:10001", "Memory Serialized", "100.0 B")) 188 } 189 190 test("empty receiverBlockTables") { 191 assert(storagePage.receiverBlockTables(Seq.empty).isEmpty) 192 val executor0 = ExecutorStreamBlockStatus("0", "localhost:10000", Seq.empty) 193 val executor1 = ExecutorStreamBlockStatus("1", "localhost:10001", Seq.empty) 194 assert(storagePage.receiverBlockTables(Seq(executor0, executor1)).isEmpty) 195 } 196} 197