1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.util 19 20import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch} 21 22import scala.collection.JavaConverters._ 23import scala.concurrent.duration._ 24import scala.language.postfixOps 25 26import org.scalatest.concurrent.Eventually._ 27import org.scalatest.concurrent.Timeouts 28 29import org.apache.spark.SparkFunSuite 30 31class EventLoopSuite extends SparkFunSuite with Timeouts { 32 33 test("EventLoop") { 34 val buffer = new ConcurrentLinkedQueue[Int] 35 val eventLoop = new EventLoop[Int]("test") { 36 37 override def onReceive(event: Int): Unit = { 38 buffer.add(event) 39 } 40 41 override def onError(e: Throwable): Unit = {} 42 } 43 eventLoop.start() 44 (1 to 100).foreach(eventLoop.post) 45 eventually(timeout(5 seconds), interval(5 millis)) { 46 assert((1 to 100) === buffer.asScala.toSeq) 47 } 48 eventLoop.stop() 49 } 50 51 test("EventLoop: start and stop") { 52 val eventLoop = new EventLoop[Int]("test") { 53 54 override def onReceive(event: Int): Unit = {} 55 56 override def onError(e: Throwable): Unit = {} 57 } 58 assert(false === eventLoop.isActive) 59 eventLoop.start() 60 assert(true === eventLoop.isActive) 61 eventLoop.stop() 62 assert(false === eventLoop.isActive) 63 } 64 65 test("EventLoop: onError") { 66 val e = new RuntimeException("Oops") 67 @volatile var receivedError: Throwable = null 68 val eventLoop = new EventLoop[Int]("test") { 69 70 override def onReceive(event: Int): Unit = { 71 throw e 72 } 73 74 override def onError(e: Throwable): Unit = { 75 receivedError = e 76 } 77 } 78 eventLoop.start() 79 eventLoop.post(1) 80 eventually(timeout(5 seconds), interval(5 millis)) { 81 assert(e === receivedError) 82 } 83 eventLoop.stop() 84 } 85 86 test("EventLoop: error thrown from onError should not crash the event thread") { 87 val e = new RuntimeException("Oops") 88 @volatile var receivedError: Throwable = null 89 val eventLoop = new EventLoop[Int]("test") { 90 91 override def onReceive(event: Int): Unit = { 92 throw e 93 } 94 95 override def onError(e: Throwable): Unit = { 96 receivedError = e 97 throw new RuntimeException("Oops") 98 } 99 } 100 eventLoop.start() 101 eventLoop.post(1) 102 eventually(timeout(5 seconds), interval(5 millis)) { 103 assert(e === receivedError) 104 assert(eventLoop.isActive) 105 } 106 eventLoop.stop() 107 } 108 109 test("EventLoop: calling stop multiple times should only call onStop once") { 110 var onStopTimes = 0 111 val eventLoop = new EventLoop[Int]("test") { 112 113 override def onReceive(event: Int): Unit = { 114 } 115 116 override def onError(e: Throwable): Unit = { 117 } 118 119 override def onStop(): Unit = { 120 onStopTimes += 1 121 } 122 } 123 124 eventLoop.start() 125 126 eventLoop.stop() 127 eventLoop.stop() 128 eventLoop.stop() 129 130 assert(1 === onStopTimes) 131 } 132 133 test("EventLoop: post event in multiple threads") { 134 @volatile var receivedEventsCount = 0 135 val eventLoop = new EventLoop[Int]("test") { 136 137 override def onReceive(event: Int): Unit = { 138 receivedEventsCount += 1 139 } 140 141 override def onError(e: Throwable): Unit = { 142 } 143 144 } 145 eventLoop.start() 146 147 val threadNum = 5 148 val eventsFromEachThread = 100 149 (1 to threadNum).foreach { _ => 150 new Thread() { 151 override def run(): Unit = { 152 (1 to eventsFromEachThread).foreach(eventLoop.post) 153 } 154 }.start() 155 } 156 157 eventually(timeout(5 seconds), interval(5 millis)) { 158 assert(threadNum * eventsFromEachThread === receivedEventsCount) 159 } 160 eventLoop.stop() 161 } 162 163 test("EventLoop: onReceive swallows InterruptException") { 164 val onReceiveLatch = new CountDownLatch(1) 165 val eventLoop = new EventLoop[Int]("test") { 166 167 override def onReceive(event: Int): Unit = { 168 onReceiveLatch.countDown() 169 try { 170 Thread.sleep(5000) 171 } catch { 172 case ie: InterruptedException => // swallow 173 } 174 } 175 176 override def onError(e: Throwable): Unit = { 177 } 178 179 } 180 eventLoop.start() 181 eventLoop.post(1) 182 failAfter(5 seconds) { 183 // Wait until we enter `onReceive` 184 onReceiveLatch.await() 185 eventLoop.stop() 186 } 187 assert(false === eventLoop.isActive) 188 } 189 190 test("EventLoop: stop in eventThread") { 191 val eventLoop = new EventLoop[Int]("test") { 192 193 override def onReceive(event: Int): Unit = { 194 stop() 195 } 196 197 override def onError(e: Throwable): Unit = { 198 } 199 200 } 201 eventLoop.start() 202 eventLoop.post(1) 203 eventually(timeout(5 seconds), interval(5 millis)) { 204 assert(!eventLoop.isActive) 205 } 206 } 207 208 test("EventLoop: stop() in onStart should call onStop") { 209 @volatile var onStopCalled: Boolean = false 210 val eventLoop = new EventLoop[Int]("test") { 211 212 override def onStart(): Unit = { 213 stop() 214 } 215 216 override def onReceive(event: Int): Unit = { 217 } 218 219 override def onError(e: Throwable): Unit = { 220 } 221 222 override def onStop(): Unit = { 223 onStopCalled = true 224 } 225 } 226 eventLoop.start() 227 eventually(timeout(5 seconds), interval(5 millis)) { 228 assert(!eventLoop.isActive) 229 } 230 assert(onStopCalled) 231 } 232 233 test("EventLoop: stop() in onReceive should call onStop") { 234 @volatile var onStopCalled: Boolean = false 235 val eventLoop = new EventLoop[Int]("test") { 236 237 override def onReceive(event: Int): Unit = { 238 stop() 239 } 240 241 override def onError(e: Throwable): Unit = { 242 } 243 244 override def onStop(): Unit = { 245 onStopCalled = true 246 } 247 } 248 eventLoop.start() 249 eventLoop.post(1) 250 eventually(timeout(5 seconds), interval(5 millis)) { 251 assert(!eventLoop.isActive) 252 } 253 assert(onStopCalled) 254 } 255 256 test("EventLoop: stop() in onError should call onStop") { 257 @volatile var onStopCalled: Boolean = false 258 val eventLoop = new EventLoop[Int]("test") { 259 260 override def onReceive(event: Int): Unit = { 261 throw new RuntimeException("Oops") 262 } 263 264 override def onError(e: Throwable): Unit = { 265 stop() 266 } 267 268 override def onStop(): Unit = { 269 onStopCalled = true 270 } 271 } 272 eventLoop.start() 273 eventLoop.post(1) 274 eventually(timeout(5 seconds), interval(5 millis)) { 275 assert(!eventLoop.isActive) 276 } 277 assert(onStopCalled) 278 } 279} 280