1// File Transfer model #2 2// 3// In which the client requests each chunk individually, thus 4// eliminating server queue overflows, but at a cost in speed. 5 6package main 7 8import ( 9 zmq "github.com/pebbe/zmq4" 10 11 "fmt" 12 "os" 13 "strconv" 14) 15 16const ( 17 CHUNK_SIZE = 250000 18) 19 20func client_thread(pipe chan<- string) { 21 dealer, _ := zmq.NewSocket(zmq.DEALER) 22 dealer.Connect("tcp://127.0.0.1:6000") 23 24 total := 0 // Total bytes received 25 chunks := 0 // Total chunks received 26 27 for { 28 // Ask for next chunk 29 dealer.SendMessage("fetch", total, CHUNK_SIZE) 30 31 chunk, err := dealer.RecvBytes(0) 32 if err != nil { 33 break // Shutting down, quit 34 } 35 chunks++ 36 size := len(chunk) 37 total += size 38 if size < CHUNK_SIZE { 39 break // Last chunk received; exit 40 } 41 } 42 fmt.Printf("%v chunks received, %v bytes\n", chunks, total) 43 pipe <- "OK" 44} 45 46// The server thread waits for a chunk request from a client, 47// reads that chunk and sends it back to the client: 48 49func server_thread() { 50 file, err := os.Open("testdata") 51 if err != nil { 52 panic(err) 53 } 54 55 router, _ := zmq.NewSocket(zmq.ROUTER) 56 router.SetRcvhwm(1) 57 router.SetSndhwm(1) 58 router.Bind("tcp://*:6000") 59 for { 60 msg, err := router.RecvMessage(0) 61 if err != nil { 62 break // Shutting down, quit 63 } 64 // First frame in each message is the sender identity 65 identity := msg[0] 66 67 // Second frame is "fetch" command 68 if msg[1] != "fetch" { 69 panic("command != \"fetch\"") 70 } 71 72 // Third frame is chunk offset in file 73 offset, _ := strconv.ParseInt(msg[2], 10, 64) 74 75 // Fourth frame is maximum chunk size 76 chunksz, _ := strconv.Atoi(msg[3]) 77 78 // Read chunk of data from file 79 chunk := make([]byte, chunksz) 80 n, _ := file.ReadAt(chunk, offset) 81 82 // Send resulting chunk to client 83 router.SendMessage(identity, chunk[:n]) 84 } 85 file.Close() 86} 87 88// The main task is just the same as in the first model. 89 90func main() { 91 pipe := make(chan string) 92 93 // Start child threads 94 go server_thread() 95 go client_thread(pipe) 96 // Loop until client tells us it's done 97 <-pipe 98} 99