1package zmq4_test 2 3import ( 4 zmq "github.com/pebbe/zmq4" 5 6 "fmt" 7 "testing" 8 "time" 9) 10 11func rep_socket_monitor(addr string, chMsg chan<- string) { 12 13 defer close(chMsg) 14 15 s, err := zmq.NewSocket(zmq.PAIR) 16 if err != nil { 17 chMsg <- fmt.Sprint("NewSocket:", err) 18 return 19 } 20 defer func() { 21 s.SetLinger(0) 22 s.Close() 23 }() 24 25 err = s.Connect(addr) 26 if err != nil { 27 chMsg <- fmt.Sprint("s.Connect:", err) 28 return 29 } 30 31 for { 32 a, b, _, err := s.RecvEvent(0) 33 if err != nil { 34 chMsg <- fmt.Sprint("s.RecvEvent:", err) 35 return 36 } 37 chMsg <- fmt.Sprint(a, " ", b) 38 if a == zmq.EVENT_CLOSED { 39 break 40 } 41 } 42 chMsg <- "Done" 43} 44 45func TestSocketEvent(t *testing.T) { 46 47 var rep *zmq.Socket 48 defer func() { 49 if rep != nil { 50 rep.SetLinger(0) 51 rep.Close() 52 } 53 }() 54 55 // REP socket 56 rep, err := zmq.NewSocket(zmq.REP) 57 if err != nil { 58 t.Fatal("NewSocket:", err) 59 } 60 61 // REP socket monitor, all events 62 err = rep.Monitor("inproc://monitor.rep", zmq.EVENT_ALL) 63 if err != nil { 64 t.Fatal("rep.Monitor:", err) 65 } 66 chMsg := make(chan string, 10) 67 go rep_socket_monitor("inproc://monitor.rep", chMsg) 68 time.Sleep(time.Second) 69 70 // Generate an event 71 err = rep.Bind("tcp://*:9689") 72 if err != nil { 73 t.Fatal("rep.Bind:", err) 74 } 75 76 rep.Close() 77 rep = nil 78 79 expect := []string{ 80 "EVENT_LISTENING tcp://0.0.0.0:9689", 81 "EVENT_CLOSED tcp://0.0.0.0:9689", 82 "Done", 83 } 84 i := 0 85 for msg := range chMsg { 86 if i < len(expect) { 87 if msg != expect[i] { 88 t.Errorf("Expected message %q, got %q", expect[i], msg) 89 } 90 i++ 91 } else { 92 t.Errorf("Unexpected message: %q", msg) 93 } 94 } 95 for ; i < len(expect); i++ { 96 t.Errorf("Expected message %q, got nothing", expect[i]) 97 } 98} 99