diff options
author | Ondřej Surý <ondrej@sury.org> | 2011-09-13 13:13:40 +0200 |
---|---|---|
committer | Ondřej Surý <ondrej@sury.org> | 2011-09-13 13:13:40 +0200 |
commit | 5ff4c17907d5b19510a62e08fd8d3b11e62b431d (patch) | |
tree | c0650497e988f47be9c6f2324fa692a52dea82e1 /src/pkg/io/pipe.go | |
parent | 80f18fc933cf3f3e829c5455a1023d69f7b86e52 (diff) | |
download | golang-upstream/60.tar.gz |
Imported Upstream version 60upstream/60
Diffstat (limited to 'src/pkg/io/pipe.go')
-rw-r--r-- | src/pkg/io/pipe.go | 182 |
1 files changed, 182 insertions, 0 deletions
diff --git a/src/pkg/io/pipe.go b/src/pkg/io/pipe.go new file mode 100644 index 000000000..00be8efa2 --- /dev/null +++ b/src/pkg/io/pipe.go @@ -0,0 +1,182 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Pipe adapter to connect code expecting an io.Reader +// with code expecting an io.Writer. + +package io + +import ( + "os" + "sync" +) + +type pipeResult struct { + n int + err os.Error +} + +// A pipe is the shared pipe structure underlying PipeReader and PipeWriter. +type pipe struct { + rl sync.Mutex // gates readers one at a time + wl sync.Mutex // gates writers one at a time + l sync.Mutex // protects remaining fields + data []byte // data remaining in pending write + rwait sync.Cond // waiting reader + wwait sync.Cond // waiting writer + rerr os.Error // if reader closed, error to give writes + werr os.Error // if writer closed, error to give reads +} + +func (p *pipe) read(b []byte) (n int, err os.Error) { + // One reader at a time. + p.rl.Lock() + defer p.rl.Unlock() + + p.l.Lock() + defer p.l.Unlock() + for { + if p.rerr != nil { + return 0, os.EINVAL + } + if p.data != nil { + break + } + if p.werr != nil { + return 0, p.werr + } + p.rwait.Wait() + } + n = copy(b, p.data) + p.data = p.data[n:] + if len(p.data) == 0 { + p.data = nil + p.wwait.Signal() + } + return +} + +var zero [0]byte + +func (p *pipe) write(b []byte) (n int, err os.Error) { + // pipe uses nil to mean not available + if b == nil { + b = zero[:] + } + + // One writer at a time. + p.wl.Lock() + defer p.wl.Unlock() + + p.l.Lock() + defer p.l.Unlock() + p.data = b + p.rwait.Signal() + for { + if p.data == nil { + break + } + if p.rerr != nil { + err = p.rerr + break + } + if p.werr != nil { + err = os.EINVAL + } + p.wwait.Wait() + } + n = len(b) - len(p.data) + p.data = nil // in case of rerr or werr + return +} + +func (p *pipe) rclose(err os.Error) { + if err == nil { + err = os.EPIPE + } + p.l.Lock() + defer p.l.Unlock() + p.rerr = err + p.rwait.Signal() + p.wwait.Signal() +} + +func (p *pipe) wclose(err os.Error) { + if err == nil { + err = os.EOF + } + p.l.Lock() + defer p.l.Unlock() + p.werr = err + p.rwait.Signal() + p.wwait.Signal() +} + +// A PipeReader is the read half of a pipe. +type PipeReader struct { + p *pipe +} + +// Read implements the standard Read interface: +// it reads data from the pipe, blocking until a writer +// arrives or the write end is closed. +// If the write end is closed with an error, that error is +// returned as err; otherwise err is os.EOF. +func (r *PipeReader) Read(data []byte) (n int, err os.Error) { + return r.p.read(data) +} + +// Close closes the reader; subsequent writes to the +// write half of the pipe will return the error os.EPIPE. +func (r *PipeReader) Close() os.Error { + return r.CloseWithError(nil) +} + +// CloseWithError closes the reader; subsequent writes +// to the write half of the pipe will return the error err. +func (r *PipeReader) CloseWithError(err os.Error) os.Error { + r.p.rclose(err) + return nil +} + +// A PipeWriter is the write half of a pipe. +type PipeWriter struct { + p *pipe +} + +// Write implements the standard Write interface: +// it writes data to the pipe, blocking until readers +// have consumed all the data or the read end is closed. +// If the read end is closed with an error, that err is +// returned as err; otherwise err is os.EPIPE. +func (w *PipeWriter) Write(data []byte) (n int, err os.Error) { + return w.p.write(data) +} + +// Close closes the writer; subsequent reads from the +// read half of the pipe will return no bytes and os.EOF. +func (w *PipeWriter) Close() os.Error { + return w.CloseWithError(nil) +} + +// CloseWithError closes the writer; subsequent reads from the +// read half of the pipe will return no bytes and the error err. +func (w *PipeWriter) CloseWithError(err os.Error) os.Error { + w.p.wclose(err) + return nil +} + +// Pipe creates a synchronous in-memory pipe. +// It can be used to connect code expecting an io.Reader +// with code expecting an io.Writer. +// Reads on one end are matched with writes on the other, +// copying data directly between the two; there is no internal buffering. +func Pipe() (*PipeReader, *PipeWriter) { + p := new(pipe) + p.rwait.L = &p.l + p.wwait.L = &p.l + r := &PipeReader{p} + w := &PipeWriter{p} + return r, w +} |