1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.util
19
20import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch}
21
22import scala.collection.JavaConverters._
23import scala.concurrent.duration._
24import scala.language.postfixOps
25
26import org.scalatest.concurrent.Eventually._
27import org.scalatest.concurrent.Timeouts
28
29import org.apache.spark.SparkFunSuite
30
31class EventLoopSuite extends SparkFunSuite with Timeouts {
32
33  test("EventLoop") {
34    val buffer = new ConcurrentLinkedQueue[Int]
35    val eventLoop = new EventLoop[Int]("test") {
36
37      override def onReceive(event: Int): Unit = {
38        buffer.add(event)
39      }
40
41      override def onError(e: Throwable): Unit = {}
42    }
43    eventLoop.start()
44    (1 to 100).foreach(eventLoop.post)
45    eventually(timeout(5 seconds), interval(5 millis)) {
46      assert((1 to 100) === buffer.asScala.toSeq)
47    }
48    eventLoop.stop()
49  }
50
51  test("EventLoop: start and stop") {
52    val eventLoop = new EventLoop[Int]("test") {
53
54      override def onReceive(event: Int): Unit = {}
55
56      override def onError(e: Throwable): Unit = {}
57    }
58    assert(false === eventLoop.isActive)
59    eventLoop.start()
60    assert(true === eventLoop.isActive)
61    eventLoop.stop()
62    assert(false === eventLoop.isActive)
63  }
64
65  test("EventLoop: onError") {
66    val e = new RuntimeException("Oops")
67    @volatile var receivedError: Throwable = null
68    val eventLoop = new EventLoop[Int]("test") {
69
70      override def onReceive(event: Int): Unit = {
71        throw e
72      }
73
74      override def onError(e: Throwable): Unit = {
75        receivedError = e
76      }
77    }
78    eventLoop.start()
79    eventLoop.post(1)
80    eventually(timeout(5 seconds), interval(5 millis)) {
81      assert(e === receivedError)
82    }
83    eventLoop.stop()
84  }
85
86  test("EventLoop: error thrown from onError should not crash the event thread") {
87    val e = new RuntimeException("Oops")
88    @volatile var receivedError: Throwable = null
89    val eventLoop = new EventLoop[Int]("test") {
90
91      override def onReceive(event: Int): Unit = {
92        throw e
93      }
94
95      override def onError(e: Throwable): Unit = {
96        receivedError = e
97        throw new RuntimeException("Oops")
98      }
99    }
100    eventLoop.start()
101    eventLoop.post(1)
102    eventually(timeout(5 seconds), interval(5 millis)) {
103      assert(e === receivedError)
104      assert(eventLoop.isActive)
105    }
106    eventLoop.stop()
107  }
108
109  test("EventLoop: calling stop multiple times should only call onStop once") {
110    var onStopTimes = 0
111    val eventLoop = new EventLoop[Int]("test") {
112
113      override def onReceive(event: Int): Unit = {
114      }
115
116      override def onError(e: Throwable): Unit = {
117      }
118
119      override def onStop(): Unit = {
120        onStopTimes += 1
121      }
122    }
123
124    eventLoop.start()
125
126    eventLoop.stop()
127    eventLoop.stop()
128    eventLoop.stop()
129
130    assert(1 === onStopTimes)
131  }
132
133  test("EventLoop: post event in multiple threads") {
134    @volatile var receivedEventsCount = 0
135    val eventLoop = new EventLoop[Int]("test") {
136
137      override def onReceive(event: Int): Unit = {
138        receivedEventsCount += 1
139      }
140
141      override def onError(e: Throwable): Unit = {
142      }
143
144    }
145    eventLoop.start()
146
147    val threadNum = 5
148    val eventsFromEachThread = 100
149    (1 to threadNum).foreach { _ =>
150      new Thread() {
151        override def run(): Unit = {
152          (1 to eventsFromEachThread).foreach(eventLoop.post)
153        }
154      }.start()
155    }
156
157    eventually(timeout(5 seconds), interval(5 millis)) {
158      assert(threadNum * eventsFromEachThread === receivedEventsCount)
159    }
160    eventLoop.stop()
161  }
162
163  test("EventLoop: onReceive swallows InterruptException") {
164    val onReceiveLatch = new CountDownLatch(1)
165    val eventLoop = new EventLoop[Int]("test") {
166
167      override def onReceive(event: Int): Unit = {
168        onReceiveLatch.countDown()
169        try {
170          Thread.sleep(5000)
171        } catch {
172          case ie: InterruptedException => // swallow
173        }
174      }
175
176      override def onError(e: Throwable): Unit = {
177      }
178
179    }
180    eventLoop.start()
181    eventLoop.post(1)
182    failAfter(5 seconds) {
183      // Wait until we enter `onReceive`
184      onReceiveLatch.await()
185      eventLoop.stop()
186    }
187    assert(false === eventLoop.isActive)
188  }
189
190  test("EventLoop: stop in eventThread") {
191    val eventLoop = new EventLoop[Int]("test") {
192
193      override def onReceive(event: Int): Unit = {
194        stop()
195      }
196
197      override def onError(e: Throwable): Unit = {
198      }
199
200    }
201    eventLoop.start()
202    eventLoop.post(1)
203    eventually(timeout(5 seconds), interval(5 millis)) {
204      assert(!eventLoop.isActive)
205    }
206  }
207
208  test("EventLoop: stop() in onStart should call onStop") {
209    @volatile var onStopCalled: Boolean = false
210    val eventLoop = new EventLoop[Int]("test") {
211
212      override def onStart(): Unit = {
213        stop()
214      }
215
216      override def onReceive(event: Int): Unit = {
217      }
218
219      override def onError(e: Throwable): Unit = {
220      }
221
222      override def onStop(): Unit = {
223        onStopCalled = true
224      }
225    }
226    eventLoop.start()
227    eventually(timeout(5 seconds), interval(5 millis)) {
228      assert(!eventLoop.isActive)
229    }
230    assert(onStopCalled)
231  }
232
233  test("EventLoop: stop() in onReceive should call onStop") {
234    @volatile var onStopCalled: Boolean = false
235    val eventLoop = new EventLoop[Int]("test") {
236
237      override def onReceive(event: Int): Unit = {
238        stop()
239      }
240
241      override def onError(e: Throwable): Unit = {
242      }
243
244      override def onStop(): Unit = {
245        onStopCalled = true
246      }
247    }
248    eventLoop.start()
249    eventLoop.post(1)
250    eventually(timeout(5 seconds), interval(5 millis)) {
251      assert(!eventLoop.isActive)
252    }
253    assert(onStopCalled)
254  }
255
256  test("EventLoop: stop() in onError should call onStop") {
257    @volatile var onStopCalled: Boolean = false
258    val eventLoop = new EventLoop[Int]("test") {
259
260      override def onReceive(event: Int): Unit = {
261        throw new RuntimeException("Oops")
262      }
263
264      override def onError(e: Throwable): Unit = {
265        stop()
266      }
267
268      override def onStop(): Unit = {
269        onStopCalled = true
270      }
271    }
272    eventLoop.start()
273    eventLoop.post(1)
274    eventually(timeout(5 seconds), interval(5 millis)) {
275      assert(!eventLoop.isActive)
276    }
277    assert(onStopCalled)
278  }
279}
280