1//  File Transfer model #2
2//
3//  In which the client requests each chunk individually, thus
4//  eliminating server queue overflows, but at a cost in speed.
5
6package main
7
8import (
9	zmq "github.com/pebbe/zmq4"
10
11	"fmt"
12	"os"
13	"strconv"
14)
15
16const (
17	CHUNK_SIZE = 250000
18)
19
20func client_thread(pipe chan<- string) {
21	dealer, _ := zmq.NewSocket(zmq.DEALER)
22	dealer.Connect("tcp://127.0.0.1:6000")
23
24	total := 0  //  Total bytes received
25	chunks := 0 //  Total chunks received
26
27	for {
28		//  Ask for next chunk
29		dealer.SendMessage("fetch", total, CHUNK_SIZE)
30
31		chunk, err := dealer.RecvBytes(0)
32		if err != nil {
33			break //  Shutting down, quit
34		}
35		chunks++
36		size := len(chunk)
37		total += size
38		if size < CHUNK_SIZE {
39			break //  Last chunk received; exit
40		}
41	}
42	fmt.Printf("%v chunks received, %v bytes\n", chunks, total)
43	pipe <- "OK"
44}
45
46//  The server thread waits for a chunk request from a client,
47//  reads that chunk and sends it back to the client:
48
49func server_thread() {
50	file, err := os.Open("testdata")
51	if err != nil {
52		panic(err)
53	}
54
55	router, _ := zmq.NewSocket(zmq.ROUTER)
56	router.SetRcvhwm(1)
57	router.SetSndhwm(1)
58	router.Bind("tcp://*:6000")
59	for {
60		msg, err := router.RecvMessage(0)
61		if err != nil {
62			break //  Shutting down, quit
63		}
64		//  First frame in each message is the sender identity
65		identity := msg[0]
66
67		//  Second frame is "fetch" command
68		if msg[1] != "fetch" {
69			panic("command != \"fetch\"")
70		}
71
72		//  Third frame is chunk offset in file
73		offset, _ := strconv.ParseInt(msg[2], 10, 64)
74
75		//  Fourth frame is maximum chunk size
76		chunksz, _ := strconv.Atoi(msg[3])
77
78		//  Read chunk of data from file
79		chunk := make([]byte, chunksz)
80		n, _ := file.ReadAt(chunk, offset)
81
82		//  Send resulting chunk to client
83		router.SendMessage(identity, chunk[:n])
84	}
85	file.Close()
86}
87
88//  The main task is just the same as in the first model.
89
90func main() {
91	pipe := make(chan string)
92
93	//  Start child threads
94	go server_thread()
95	go client_thread(pipe)
96	//  Loop until client tells us it's done
97	<-pipe
98}
99