summaryrefslogtreecommitdiff
path: root/src/pkg/io/pipe.go
diff options
context:
space:
mode:
authorOndřej Surý <ondrej@sury.org>2011-09-13 13:13:40 +0200
committerOndřej Surý <ondrej@sury.org>2011-09-13 13:13:40 +0200
commit5ff4c17907d5b19510a62e08fd8d3b11e62b431d (patch)
treec0650497e988f47be9c6f2324fa692a52dea82e1 /src/pkg/io/pipe.go
parent80f18fc933cf3f3e829c5455a1023d69f7b86e52 (diff)
downloadgolang-upstream/60.tar.gz
Imported Upstream version 60upstream/60
Diffstat (limited to 'src/pkg/io/pipe.go')
-rw-r--r--src/pkg/io/pipe.go182
1 files changed, 182 insertions, 0 deletions
diff --git a/src/pkg/io/pipe.go b/src/pkg/io/pipe.go
new file mode 100644
index 000000000..00be8efa2
--- /dev/null
+++ b/src/pkg/io/pipe.go
@@ -0,0 +1,182 @@
+// Copyright 2009 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Pipe adapter to connect code expecting an io.Reader
+// with code expecting an io.Writer.
+
+package io
+
+import (
+ "os"
+ "sync"
+)
+
+type pipeResult struct {
+ n int
+ err os.Error
+}
+
+// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
+type pipe struct {
+ rl sync.Mutex // gates readers one at a time
+ wl sync.Mutex // gates writers one at a time
+ l sync.Mutex // protects remaining fields
+ data []byte // data remaining in pending write
+ rwait sync.Cond // waiting reader
+ wwait sync.Cond // waiting writer
+ rerr os.Error // if reader closed, error to give writes
+ werr os.Error // if writer closed, error to give reads
+}
+
+func (p *pipe) read(b []byte) (n int, err os.Error) {
+ // One reader at a time.
+ p.rl.Lock()
+ defer p.rl.Unlock()
+
+ p.l.Lock()
+ defer p.l.Unlock()
+ for {
+ if p.rerr != nil {
+ return 0, os.EINVAL
+ }
+ if p.data != nil {
+ break
+ }
+ if p.werr != nil {
+ return 0, p.werr
+ }
+ p.rwait.Wait()
+ }
+ n = copy(b, p.data)
+ p.data = p.data[n:]
+ if len(p.data) == 0 {
+ p.data = nil
+ p.wwait.Signal()
+ }
+ return
+}
+
+var zero [0]byte
+
+func (p *pipe) write(b []byte) (n int, err os.Error) {
+ // pipe uses nil to mean not available
+ if b == nil {
+ b = zero[:]
+ }
+
+ // One writer at a time.
+ p.wl.Lock()
+ defer p.wl.Unlock()
+
+ p.l.Lock()
+ defer p.l.Unlock()
+ p.data = b
+ p.rwait.Signal()
+ for {
+ if p.data == nil {
+ break
+ }
+ if p.rerr != nil {
+ err = p.rerr
+ break
+ }
+ if p.werr != nil {
+ err = os.EINVAL
+ }
+ p.wwait.Wait()
+ }
+ n = len(b) - len(p.data)
+ p.data = nil // in case of rerr or werr
+ return
+}
+
+func (p *pipe) rclose(err os.Error) {
+ if err == nil {
+ err = os.EPIPE
+ }
+ p.l.Lock()
+ defer p.l.Unlock()
+ p.rerr = err
+ p.rwait.Signal()
+ p.wwait.Signal()
+}
+
+func (p *pipe) wclose(err os.Error) {
+ if err == nil {
+ err = os.EOF
+ }
+ p.l.Lock()
+ defer p.l.Unlock()
+ p.werr = err
+ p.rwait.Signal()
+ p.wwait.Signal()
+}
+
+// A PipeReader is the read half of a pipe.
+type PipeReader struct {
+ p *pipe
+}
+
+// Read implements the standard Read interface:
+// it reads data from the pipe, blocking until a writer
+// arrives or the write end is closed.
+// If the write end is closed with an error, that error is
+// returned as err; otherwise err is os.EOF.
+func (r *PipeReader) Read(data []byte) (n int, err os.Error) {
+ return r.p.read(data)
+}
+
+// Close closes the reader; subsequent writes to the
+// write half of the pipe will return the error os.EPIPE.
+func (r *PipeReader) Close() os.Error {
+ return r.CloseWithError(nil)
+}
+
+// CloseWithError closes the reader; subsequent writes
+// to the write half of the pipe will return the error err.
+func (r *PipeReader) CloseWithError(err os.Error) os.Error {
+ r.p.rclose(err)
+ return nil
+}
+
+// A PipeWriter is the write half of a pipe.
+type PipeWriter struct {
+ p *pipe
+}
+
+// Write implements the standard Write interface:
+// it writes data to the pipe, blocking until readers
+// have consumed all the data or the read end is closed.
+// If the read end is closed with an error, that err is
+// returned as err; otherwise err is os.EPIPE.
+func (w *PipeWriter) Write(data []byte) (n int, err os.Error) {
+ return w.p.write(data)
+}
+
+// Close closes the writer; subsequent reads from the
+// read half of the pipe will return no bytes and os.EOF.
+func (w *PipeWriter) Close() os.Error {
+ return w.CloseWithError(nil)
+}
+
+// CloseWithError closes the writer; subsequent reads from the
+// read half of the pipe will return no bytes and the error err.
+func (w *PipeWriter) CloseWithError(err os.Error) os.Error {
+ w.p.wclose(err)
+ return nil
+}
+
+// Pipe creates a synchronous in-memory pipe.
+// It can be used to connect code expecting an io.Reader
+// with code expecting an io.Writer.
+// Reads on one end are matched with writes on the other,
+// copying data directly between the two; there is no internal buffering.
+func Pipe() (*PipeReader, *PipeWriter) {
+ p := new(pipe)
+ p.rwait.L = &p.l
+ p.wwait.L = &p.l
+ r := &PipeReader{p}
+ w := &PipeWriter{p}
+ return r, w
+}