1# pipeliner 2 3[![Build Status](https://travis-ci.org/keybase/pipeliner.svg?branch=master)](https://travis-ci.org/keybase/pipeliner) 4[![GoDoc](https://godoc.org/github.com/keybase/pipeliner?status.svg)](https://godoc.org/github.com/keybase/pipeliner) 5 6A simplified pipline library, for parallel requests with bounded parallelism. 7 8## Getting 9 10```sh 11go get github.com/keybase/pipeliner 12``` 13 14## Background 15 16Often you want do network requests with bounded parallelism. Let's say you have 171,000 DNS queries to make, and don't want to wait for them to complete in serial, 18but don't want to blast your server with 1,000 simultaneous requests. In this case, 19*bounded parallelism* makes sense. Make 1,000 requests with only 10 outstanding 20at any one time. 21 22At this point, I usually Google for it, and come up with [this blog post](https://blog.golang.org/pipelines), and I become slightly sad, because that is a lot of code to digest and 23understand to do something that should be rather simple. It's not really the fault 24of the language, but more so the library. Here is a library that makes it a lot 25easier: 26 27## Example 28 29```go 30import ( 31 "context" 32 "github.com/keybase/pipeliner" 33 "sync" 34 "time" 35) 36 37// See example_request_test.go for a runnable example. 38 39type Request struct{ i int } 40type Result struct{ i int } 41 42func (r Request) Do() (Result, error) { 43 time.Sleep(time.Millisecond) 44 return Result{r.i}, nil 45} 46 47// makeRequests calls `Do` on all of the given requests, with only `window` outstanding 48// at any given time. It puts the results in `results`, and errors out on the first 49// failure. 50func makeRequests(ctx context.Context, requests []Request, window int) (results []Result, err error) { 51 52 var resultsLock sync.Mutex 53 results = make([]Result, len(requests)) 54 55 pipeliner := pipeliner.NewPipeliner(window) 56 57 worker := func(ctx context.Context, i int) error { 58 res, err := requests[i].Do() 59 resultsLock.Lock() 60 results[i] = res 61 resultsLock.Unlock() 62 return err // the first error will kill the pipeline 63 } 64 65 for i := range requests { 66 err := pipeliner.WaitForRoom(ctx) 67 if err != nil { 68 return nil, err 69 } 70 go func(i int) { pipeliner.CompleteOne(worker(ctx, i)) }(i) 71 } 72 return results, pipeliner.Flush(ctx) 73} 74``` 75