1//
2//  Freelance client - Model 2.
3//  Uses DEALER socket to blast one or more services
4//
5
6package main
7
8import (
9	zmq "github.com/pebbe/zmq4"
10
11	"errors"
12	"fmt"
13	"os"
14	"strconv"
15	"time"
16)
17
18const (
19
20	//  If not a single service replies within this time, give up
21	GLOBAL_TIMEOUT = 2500 * time.Millisecond
22)
23
24func main() {
25	if len(os.Args) == 1 {
26		fmt.Printf("I: syntax: %s <endpoint> ...\n", os.Args[0])
27		return
28	}
29	//  Create new freelance client object
30	client := new_flclient()
31
32	//  Connect to each endpoint
33	for argn := 1; argn < len(os.Args); argn++ {
34		client.connect(os.Args[argn])
35	}
36
37	//  Send a bunch of name resolution 'requests', measure time
38	start := time.Now()
39	for requests := 10000; requests > 0; requests-- {
40		_, err := client.request("random name")
41		if err != nil {
42			fmt.Println("E: name service not available, aborting")
43			break
44		}
45	}
46	fmt.Println("Average round trip cost:", time.Now().Sub(start))
47}
48
49//  Here is the flclient class implementation. Each instance has
50//  a DEALER socket it uses to talk to the servers, a counter of how many
51//  servers it's connected to, and a request sequence number:
52
53type flclient_t struct {
54	socket   *zmq.Socket //  DEALER socket talking to servers
55	servers  int         //  How many servers we have connected to
56	sequence int         //  Number of requests ever sent
57}
58
59//  --------------------------------------------------------------------
60//  Constructor
61
62func new_flclient() (client *flclient_t) {
63	client = &flclient_t{}
64
65	client.socket, _ = zmq.NewSocket(zmq.DEALER)
66	return
67}
68
69//  --------------------------------------------------------------------
70//  Connect to new server endpoint
71
72func (client *flclient_t) connect(endpoint string) {
73	client.socket.Connect(endpoint)
74	client.servers++
75}
76
77//  The request method does the hard work. It sends a request to all
78//  connected servers in parallel (for this to work, all connections
79//  have to be successful and completed by this time). It then waits
80//  for a single successful reply, and returns that to the caller.
81//  Any other replies are just dropped:
82
83func (client *flclient_t) request(request ...string) (reply []string, err error) {
84	reply = []string{}
85
86	//  Prefix request with sequence number and empty envelope
87	client.sequence++
88
89	//  Blast the request to all connected servers
90	for server := 0; server < client.servers; server++ {
91		client.socket.SendMessage("", client.sequence, request)
92	}
93	//  Wait for a matching reply to arrive from anywhere
94	//  Since we can poll several times, calculate each one
95	endtime := time.Now().Add(GLOBAL_TIMEOUT)
96	poller := zmq.NewPoller()
97	poller.Add(client.socket, zmq.POLLIN)
98	for time.Now().Before(endtime) {
99		polled, err := poller.Poll(endtime.Sub(time.Now()))
100		if err == nil && len(polled) > 0 {
101			//  Reply is [empty][sequence][OK]
102			reply, _ = client.socket.RecvMessage(0)
103			if len(reply) != 3 {
104				panic("len(reply) != 3")
105			}
106			sequence := reply[1]
107			reply = reply[2:]
108			sequence_nbr, _ := strconv.Atoi(sequence)
109			if sequence_nbr == client.sequence {
110				break
111			}
112		}
113	}
114	if len(reply) == 0 {
115		err = errors.New("No reply")
116	}
117	return
118}
119