1// File Transfer model #1 2// 3// In which the server sends the entire file to the client in 4// large chunks with no attempt at flow control. 5 6package main 7 8import ( 9 zmq "github.com/pebbe/zmq4" 10 11 "fmt" 12 "io" 13 "os" 14) 15 16const ( 17 CHUNK_SIZE = 250000 18) 19 20func client_thread(pipe chan<- string) { 21 dealer, _ := zmq.NewSocket(zmq.DEALER) 22 dealer.Connect("tcp://127.0.0.1:6000") 23 24 dealer.Send("fetch", 0) 25 total := 0 // Total bytes received 26 chunks := 0 // Total chunks received 27 28 for { 29 frame, err := dealer.RecvBytes(0) 30 if err != nil { 31 break // Shutting down, quit 32 } 33 chunks++ 34 size := len(frame) 35 total += size 36 if size == 0 { 37 break // Whole file received 38 } 39 } 40 fmt.Printf("%v chunks received, %v bytes\n", chunks, total) 41 pipe <- "OK" 42} 43 44// The server thread reads the file from disk in chunks, and sends 45// each chunk to the client as a separate message. We only have one 46// test file, so open that once and then serve it out as needed: 47 48func server_thread() { 49 file, err := os.Open("testdata") 50 if err != nil { 51 panic(err) 52 } 53 54 router, _ := zmq.NewSocket(zmq.ROUTER) 55 // Default HWM is 1000, which will drop messages here 56 // since we send more than 1,000 chunks of test data, 57 // so set an infinite HWM as a simple, stupid solution: 58 router.SetRcvhwm(0) 59 router.SetSndhwm(0) 60 router.Bind("tcp://*:6000") 61 for { 62 // First frame in each message is the sender identity 63 identity, err := router.Recv(0) 64 if err != nil { 65 break // Shutting down, quit 66 } 67 68 // Second frame is "fetch" command 69 command, _ := router.Recv(0) 70 if command != "fetch" { 71 panic("command != \"fetch\"") 72 } 73 74 chunk := make([]byte, CHUNK_SIZE) 75 for { 76 n, _ := io.ReadFull(file, chunk) 77 router.SendMessage(identity, chunk[:n]) 78 if n == 0 { 79 break // Always end with a zero-size frame 80 } 81 } 82 } 83 file.Close() 84} 85 86// The main task starts the client and server threads; it's easier 87// to test this as a single process with threads, than as multiple 88// processes: 89 90func main() { 91 pipe := make(chan string) 92 93 // Start child threads 94 go server_thread() 95 go client_thread(pipe) 96 // Loop until client tells us it's done 97 <-pipe 98} 99