1// Package ctxreadseeker wraps a io.ReadSeeker and optionally io.Closer to make it context aware 2// Warning: this might leak a go routine and a reader if underlaying reader can block forever. 3// Only use if it's not an issue, your going to exit soon anyway or there is some other mechism for 4// cleaning up. 5package ctxreadseeker 6 7import ( 8 "context" 9 "io" 10) 11 12type Reader struct { 13 rs io.ReadSeeker 14 ctx context.Context 15 fnCh chan func() 16 waitCh chan struct{} 17} 18 19func New(ctx context.Context, rs io.ReadSeeker) *Reader { 20 r := &Reader{ 21 rs: rs, 22 ctx: ctx, 23 fnCh: make(chan func()), 24 waitCh: make(chan struct{}), 25 } 26 go r.loop() 27 return r 28} 29 30func (r *Reader) loop() { 31 for { 32 select { 33 case <-r.ctx.Done(): 34 if c, ok := r.rs.(io.Closer); ok { 35 c.Close() 36 } 37 return 38 case fn, ok := <-r.fnCh: 39 if !ok { 40 panic("unreachable") 41 } 42 fn() 43 r.waitCh <- struct{}{} 44 } 45 } 46} 47 48func (r *Reader) callWait(fn func()) error { 49 select { 50 case <-r.ctx.Done(): 51 return r.ctx.Err() 52 case r.fnCh <- fn: 53 select { 54 case <-r.ctx.Done(): 55 return r.ctx.Err() 56 case <-r.waitCh: 57 } 58 } 59 return nil 60} 61 62func (r *Reader) Read(p []byte) (n int, err error) { 63 if err := r.callWait(func() { 64 n, err = r.rs.Read(p) 65 }); err != nil { 66 return 0, err 67 } 68 return n, err 69} 70 71func (r *Reader) Seek(offset int64, whence int) (n int64, err error) { 72 if err := r.callWait(func() { 73 n, err = r.rs.Seek(offset, whence) 74 }); err != nil { 75 return 0, err 76 } 77 return n, err 78} 79 80func (r *Reader) Close() (err error) { 81 if err := r.callWait(func() { 82 if c, ok := r.rs.(io.Closer); ok { 83 err = c.Close() 84 } 85 }); err != nil { 86 return err 87 } 88 return err 89} 90