1// Package ctxreadseeker wraps a io.ReadSeeker and optionally io.Closer to make it context aware
2// Warning: this might leak a go routine and a reader if underlaying reader can block forever.
3// Only use if it's not an issue, your going to exit soon anyway or there is some other mechism for
4// cleaning up.
5package ctxreadseeker
6
7import (
8	"context"
9	"io"
10)
11
12type Reader struct {
13	rs     io.ReadSeeker
14	ctx    context.Context
15	fnCh   chan func()
16	waitCh chan struct{}
17}
18
19func New(ctx context.Context, rs io.ReadSeeker) *Reader {
20	r := &Reader{
21		rs:     rs,
22		ctx:    ctx,
23		fnCh:   make(chan func()),
24		waitCh: make(chan struct{}),
25	}
26	go r.loop()
27	return r
28}
29
30func (r *Reader) loop() {
31	for {
32		select {
33		case <-r.ctx.Done():
34			if c, ok := r.rs.(io.Closer); ok {
35				c.Close()
36			}
37			return
38		case fn, ok := <-r.fnCh:
39			if !ok {
40				panic("unreachable")
41			}
42			fn()
43			r.waitCh <- struct{}{}
44		}
45	}
46}
47
48func (r *Reader) callWait(fn func()) error {
49	select {
50	case <-r.ctx.Done():
51		return r.ctx.Err()
52	case r.fnCh <- fn:
53		select {
54		case <-r.ctx.Done():
55			return r.ctx.Err()
56		case <-r.waitCh:
57		}
58	}
59	return nil
60}
61
62func (r *Reader) Read(p []byte) (n int, err error) {
63	if err := r.callWait(func() {
64		n, err = r.rs.Read(p)
65	}); err != nil {
66		return 0, err
67	}
68	return n, err
69}
70
71func (r *Reader) Seek(offset int64, whence int) (n int64, err error) {
72	if err := r.callWait(func() {
73		n, err = r.rs.Seek(offset, whence)
74	}); err != nil {
75		return 0, err
76	}
77	return n, err
78}
79
80func (r *Reader) Close() (err error) {
81	if err := r.callWait(func() {
82		if c, ok := r.rs.(io.Closer); ok {
83			err = c.Close()
84		}
85	}); err != nil {
86		return err
87	}
88	return err
89}
90