1//  File Transfer model #1
2//
3//  In which the server sends the entire file to the client in
4//  large chunks with no attempt at flow control.
5
6package main
7
8import (
9	zmq "github.com/pebbe/zmq4"
10
11	"fmt"
12	"io"
13	"os"
14)
15
16const (
17	CHUNK_SIZE = 250000
18)
19
20func client_thread(pipe chan<- string) {
21	dealer, _ := zmq.NewSocket(zmq.DEALER)
22	dealer.Connect("tcp://127.0.0.1:6000")
23
24	dealer.Send("fetch", 0)
25	total := 0  //  Total bytes received
26	chunks := 0 //  Total chunks received
27
28	for {
29		frame, err := dealer.RecvBytes(0)
30		if err != nil {
31			break //  Shutting down, quit
32		}
33		chunks++
34		size := len(frame)
35		total += size
36		if size == 0 {
37			break //  Whole file received
38		}
39	}
40	fmt.Printf("%v chunks received, %v bytes\n", chunks, total)
41	pipe <- "OK"
42}
43
44//  The server thread reads the file from disk in chunks, and sends
45//  each chunk to the client as a separate message. We only have one
46//  test file, so open that once and then serve it out as needed:
47
48func server_thread() {
49	file, err := os.Open("testdata")
50	if err != nil {
51		panic(err)
52	}
53
54	router, _ := zmq.NewSocket(zmq.ROUTER)
55	//  Default HWM is 1000, which will drop messages here
56	//  since we send more than 1,000 chunks of test data,
57	//  so set an infinite HWM as a simple, stupid solution:
58	router.SetRcvhwm(0)
59	router.SetSndhwm(0)
60	router.Bind("tcp://*:6000")
61	for {
62		//  First frame in each message is the sender identity
63		identity, err := router.Recv(0)
64		if err != nil {
65			break //  Shutting down, quit
66		}
67
68		//  Second frame is "fetch" command
69		command, _ := router.Recv(0)
70		if command != "fetch" {
71			panic("command != \"fetch\"")
72		}
73
74		chunk := make([]byte, CHUNK_SIZE)
75		for {
76			n, _ := io.ReadFull(file, chunk)
77			router.SendMessage(identity, chunk[:n])
78			if n == 0 {
79				break //  Always end with a zero-size frame
80			}
81		}
82	}
83	file.Close()
84}
85
86//  The main task starts the client and server threads; it's easier
87//  to test this as a single process with threads, than as multiple
88//  processes:
89
90func main() {
91	pipe := make(chan string)
92
93	//  Start child threads
94	go server_thread()
95	go client_thread(pipe)
96	//  Loop until client tells us it's done
97	<-pipe
98}
99