1// 2// Freelance client - Model 2. 3// Uses DEALER socket to blast one or more services 4// 5 6package main 7 8import ( 9 zmq "github.com/pebbe/zmq4" 10 11 "errors" 12 "fmt" 13 "os" 14 "strconv" 15 "time" 16) 17 18const ( 19 20 // If not a single service replies within this time, give up 21 GLOBAL_TIMEOUT = 2500 * time.Millisecond 22) 23 24func main() { 25 if len(os.Args) == 1 { 26 fmt.Printf("I: syntax: %s <endpoint> ...\n", os.Args[0]) 27 return 28 } 29 // Create new freelance client object 30 client := new_flclient() 31 32 // Connect to each endpoint 33 for argn := 1; argn < len(os.Args); argn++ { 34 client.connect(os.Args[argn]) 35 } 36 37 // Send a bunch of name resolution 'requests', measure time 38 start := time.Now() 39 for requests := 10000; requests > 0; requests-- { 40 _, err := client.request("random name") 41 if err != nil { 42 fmt.Println("E: name service not available, aborting") 43 break 44 } 45 } 46 fmt.Println("Average round trip cost:", time.Now().Sub(start)) 47} 48 49// Here is the flclient class implementation. Each instance has 50// a DEALER socket it uses to talk to the servers, a counter of how many 51// servers it's connected to, and a request sequence number: 52 53type flclient_t struct { 54 socket *zmq.Socket // DEALER socket talking to servers 55 servers int // How many servers we have connected to 56 sequence int // Number of requests ever sent 57} 58 59// -------------------------------------------------------------------- 60// Constructor 61 62func new_flclient() (client *flclient_t) { 63 client = &flclient_t{} 64 65 client.socket, _ = zmq.NewSocket(zmq.DEALER) 66 return 67} 68 69// -------------------------------------------------------------------- 70// Connect to new server endpoint 71 72func (client *flclient_t) connect(endpoint string) { 73 client.socket.Connect(endpoint) 74 client.servers++ 75} 76 77// The request method does the hard work. It sends a request to all 78// connected servers in parallel (for this to work, all connections 79// have to be successful and completed by this time). It then waits 80// for a single successful reply, and returns that to the caller. 81// Any other replies are just dropped: 82 83func (client *flclient_t) request(request ...string) (reply []string, err error) { 84 reply = []string{} 85 86 // Prefix request with sequence number and empty envelope 87 client.sequence++ 88 89 // Blast the request to all connected servers 90 for server := 0; server < client.servers; server++ { 91 client.socket.SendMessage("", client.sequence, request) 92 } 93 // Wait for a matching reply to arrive from anywhere 94 // Since we can poll several times, calculate each one 95 endtime := time.Now().Add(GLOBAL_TIMEOUT) 96 poller := zmq.NewPoller() 97 poller.Add(client.socket, zmq.POLLIN) 98 for time.Now().Before(endtime) { 99 polled, err := poller.Poll(endtime.Sub(time.Now())) 100 if err == nil && len(polled) > 0 { 101 // Reply is [empty][sequence][OK] 102 reply, _ = client.socket.RecvMessage(0) 103 if len(reply) != 3 { 104 panic("len(reply) != 3") 105 } 106 sequence := reply[1] 107 reply = reply[2:] 108 sequence_nbr, _ := strconv.Atoi(sequence) 109 if sequence_nbr == client.sequence { 110 break 111 } 112 } 113 } 114 if len(reply) == 0 { 115 err = errors.New("No reply") 116 } 117 return 118} 119