1# pipeliner
2
3[![Build Status](https://travis-ci.org/keybase/pipeliner.svg?branch=master)](https://travis-ci.org/keybase/pipeliner)
4[![GoDoc](https://godoc.org/github.com/keybase/pipeliner?status.svg)](https://godoc.org/github.com/keybase/pipeliner)
5
6A simplified pipline library, for parallel requests with bounded parallelism.
7
8## Getting
9
10```sh
11go get github.com/keybase/pipeliner
12```
13
14## Background
15
16Often you want do network requests with bounded parallelism. Let's say you have
171,000 DNS queries to make, and don't want to wait for them to complete in serial,
18but don't want to blast your server with 1,000 simultaneous requests. In this case,
19*bounded parallelism* makes sense. Make 1,000 requests with only 10 outstanding
20at any one time.
21
22At this point, I usually Google for it, and come up with [this blog post](https://blog.golang.org/pipelines), and I become slightly sad, because that is a lot of code to digest and
23understand to do something that should be rather simple. It's not really the fault
24of the language, but more so the library. Here is a library that makes it a lot
25easier:
26
27## Example
28
29```go
30import (
31	"context"
32	"github.com/keybase/pipeliner"
33	"sync"
34	"time"
35)
36
37// See example_request_test.go for a runnable example.
38
39type Request struct{ i int }
40type Result struct{ i int }
41
42func (r Request) Do() (Result, error) {
43	time.Sleep(time.Millisecond)
44	return Result{r.i}, nil
45}
46
47// makeRequests calls `Do` on all of the given requests, with only `window` outstanding
48// at any given time. It puts the results in `results`, and errors out on the first
49// failure.
50func makeRequests(ctx context.Context, requests []Request, window int) (results []Result, err error) {
51
52	var resultsLock sync.Mutex
53	results = make([]Result, len(requests))
54
55	pipeliner := pipeliner.NewPipeliner(window)
56
57	worker := func(ctx context.Context, i int) error {
58		res, err := requests[i].Do()
59		resultsLock.Lock()
60		results[i] = res
61		resultsLock.Unlock()
62		return err // the first error will kill the pipeline
63	}
64
65	for i := range requests {
66		err := pipeliner.WaitForRoom(ctx)
67		if err != nil {
68			return nil, err
69		}
70		go func(i int) { pipeliner.CompleteOne(worker(ctx, i)) }(i)
71	}
72	return results, pipeliner.Flush(ctx)
73}
74```
75