diff options
Diffstat (limited to 'src/io/pipe.go')
-rw-r--r-- | src/io/pipe.go | 193 |
1 files changed, 193 insertions, 0 deletions
diff --git a/src/io/pipe.go b/src/io/pipe.go new file mode 100644 index 000000000..f65354a7f --- /dev/null +++ b/src/io/pipe.go @@ -0,0 +1,193 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Pipe adapter to connect code expecting an io.Reader +// with code expecting an io.Writer. + +package io + +import ( + "errors" + "sync" +) + +// ErrClosedPipe is the error used for read or write operations on a closed pipe. +var ErrClosedPipe = errors.New("io: read/write on closed pipe") + +type pipeResult struct { + n int + err error +} + +// A pipe is the shared pipe structure underlying PipeReader and PipeWriter. +type pipe struct { + rl sync.Mutex // gates readers one at a time + wl sync.Mutex // gates writers one at a time + l sync.Mutex // protects remaining fields + data []byte // data remaining in pending write + rwait sync.Cond // waiting reader + wwait sync.Cond // waiting writer + rerr error // if reader closed, error to give writes + werr error // if writer closed, error to give reads +} + +func (p *pipe) read(b []byte) (n int, err error) { + // One reader at a time. + p.rl.Lock() + defer p.rl.Unlock() + + p.l.Lock() + defer p.l.Unlock() + for { + if p.rerr != nil { + return 0, ErrClosedPipe + } + if p.data != nil { + break + } + if p.werr != nil { + return 0, p.werr + } + p.rwait.Wait() + } + n = copy(b, p.data) + p.data = p.data[n:] + if len(p.data) == 0 { + p.data = nil + p.wwait.Signal() + } + return +} + +var zero [0]byte + +func (p *pipe) write(b []byte) (n int, err error) { + // pipe uses nil to mean not available + if b == nil { + b = zero[:] + } + + // One writer at a time. + p.wl.Lock() + defer p.wl.Unlock() + + p.l.Lock() + defer p.l.Unlock() + if p.werr != nil { + err = ErrClosedPipe + return + } + p.data = b + p.rwait.Signal() + for { + if p.data == nil { + break + } + if p.rerr != nil { + err = p.rerr + break + } + if p.werr != nil { + err = ErrClosedPipe + } + p.wwait.Wait() + } + n = len(b) - len(p.data) + p.data = nil // in case of rerr or werr + return +} + +func (p *pipe) rclose(err error) { + if err == nil { + err = ErrClosedPipe + } + p.l.Lock() + defer p.l.Unlock() + p.rerr = err + p.rwait.Signal() + p.wwait.Signal() +} + +func (p *pipe) wclose(err error) { + if err == nil { + err = EOF + } + p.l.Lock() + defer p.l.Unlock() + p.werr = err + p.rwait.Signal() + p.wwait.Signal() +} + +// A PipeReader is the read half of a pipe. +type PipeReader struct { + p *pipe +} + +// Read implements the standard Read interface: +// it reads data from the pipe, blocking until a writer +// arrives or the write end is closed. +// If the write end is closed with an error, that error is +// returned as err; otherwise err is EOF. +func (r *PipeReader) Read(data []byte) (n int, err error) { + return r.p.read(data) +} + +// Close closes the reader; subsequent writes to the +// write half of the pipe will return the error ErrClosedPipe. +func (r *PipeReader) Close() error { + return r.CloseWithError(nil) +} + +// CloseWithError closes the reader; subsequent writes +// to the write half of the pipe will return the error err. +func (r *PipeReader) CloseWithError(err error) error { + r.p.rclose(err) + return nil +} + +// A PipeWriter is the write half of a pipe. +type PipeWriter struct { + p *pipe +} + +// Write implements the standard Write interface: +// it writes data to the pipe, blocking until readers +// have consumed all the data or the read end is closed. +// If the read end is closed with an error, that err is +// returned as err; otherwise err is ErrClosedPipe. +func (w *PipeWriter) Write(data []byte) (n int, err error) { + return w.p.write(data) +} + +// Close closes the writer; subsequent reads from the +// read half of the pipe will return no bytes and EOF. +func (w *PipeWriter) Close() error { + return w.CloseWithError(nil) +} + +// CloseWithError closes the writer; subsequent reads from the +// read half of the pipe will return no bytes and the error err. +func (w *PipeWriter) CloseWithError(err error) error { + w.p.wclose(err) + return nil +} + +// Pipe creates a synchronous in-memory pipe. +// It can be used to connect code expecting an io.Reader +// with code expecting an io.Writer. +// Reads on one end are matched with writes on the other, +// copying data directly between the two; there is no internal buffering. +// It is safe to call Read and Write in parallel with each other or with +// Close. Close will complete once pending I/O is done. Parallel calls to +// Read, and parallel calls to Write, are also safe: +// the individual calls will be gated sequentially. +func Pipe() (*PipeReader, *PipeWriter) { + p := new(pipe) + p.rwait.L = &p.l + p.wwait.L = &p.l + r := &PipeReader{p} + w := &PipeWriter{p} + return r, w +} |