1// File Transfer model #3 2// 3// In which the client requests each chunk individually, using 4// command pipelining to give us a credit-based flow control. 5 6package main 7 8import ( 9 zmq "github.com/pebbe/zmq4" 10 11 "fmt" 12 "os" 13 "strconv" 14) 15 16const ( 17 CHUNK_SIZE = 250000 18 PIPELINE = 10 19) 20 21func client_thread(pipe chan<- string) { 22 dealer, _ := zmq.NewSocket(zmq.DEALER) 23 dealer.Connect("tcp://127.0.0.1:6000") 24 25 // Up to this many chunks in transit 26 credit := PIPELINE 27 28 total := 0 // Total bytes received 29 chunks := 0 // Total chunks received 30 offset := 0 // Offset of next chunk request 31 32 for { 33 for credit > 0 { 34 // Ask for next chunk 35 dealer.SendMessage("fetch", offset, CHUNK_SIZE) 36 offset += CHUNK_SIZE 37 credit-- 38 } 39 chunk, err := dealer.RecvBytes(0) 40 if err != nil { 41 break // Shutting down, quit 42 } 43 chunks++ 44 credit++ 45 size := len(chunk) 46 total += size 47 if size < CHUNK_SIZE { 48 break // Last chunk received; exit 49 } 50 } 51 fmt.Printf("%v chunks received, %v bytes\n", chunks, total) 52 pipe <- "OK" 53} 54 55// The rest of the code is exactly the same as in model 2, except 56// that we set the HWM on the server's ROUTER socket to PIPELINE 57// to act as a sanity check. 58 59// The server thread waits for a chunk request from a client, 60// reads that chunk and sends it back to the client: 61 62func server_thread() { 63 file, err := os.Open("testdata") 64 if err != nil { 65 panic(err) 66 } 67 68 router, _ := zmq.NewSocket(zmq.ROUTER) 69 router.SetRcvhwm(PIPELINE * 2) 70 router.SetSndhwm(PIPELINE * 2) 71 router.Bind("tcp://*:6000") 72 for { 73 msg, err := router.RecvMessage(0) 74 if err != nil { 75 break // Shutting down, quit 76 } 77 // First frame in each message is the sender identity 78 identity := msg[0] 79 80 // Second frame is "fetch" command 81 if msg[1] != "fetch" { 82 panic("command != \"fetch\"") 83 } 84 85 // Third frame is chunk offset in file 86 offset, _ := strconv.ParseInt(msg[2], 10, 64) 87 88 // Fourth frame is maximum chunk size 89 chunksz, _ := strconv.Atoi(msg[3]) 90 91 // Read chunk of data from file 92 chunk := make([]byte, chunksz) 93 n, _ := file.ReadAt(chunk, offset) 94 95 // Send resulting chunk to client 96 router.SendMessage(identity, chunk[:n]) 97 } 98 file.Close() 99} 100 101// The main task is just the same as in the first model. 102 103func main() { 104 pipe := make(chan string) 105 106 // Start child threads 107 go server_thread() 108 go client_thread(pipe) 109 // Loop until client tells us it's done 110 <-pipe 111} 112