// Copyright 2009 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. // Pipe adapter to connect code expecting an io.Reader // with code expecting an io.Writer. package io import ( "os" "sync" ) type pipeResult struct { n int err os.Error } // A pipe is the shared pipe structure underlying PipeReader and PipeWriter. type pipe struct { rl sync.Mutex // gates readers one at a time wl sync.Mutex // gates writers one at a time l sync.Mutex // protects remaining fields data []byte // data remaining in pending write rwait sync.Cond // waiting reader wwait sync.Cond // waiting writer rerr os.Error // if reader closed, error to give writes werr os.Error // if writer closed, error to give reads } func (p *pipe) read(b []byte) (n int, err os.Error) { // One reader at a time. p.rl.Lock() defer p.rl.Unlock() p.l.Lock() defer p.l.Unlock() for { if p.rerr != nil { return 0, os.EINVAL } if p.data != nil { break } if p.werr != nil { return 0, p.werr } p.rwait.Wait() } n = copy(b, p.data) p.data = p.data[n:] if len(p.data) == 0 { p.data = nil p.wwait.Signal() } return } var zero [0]byte func (p *pipe) write(b []byte) (n int, err os.Error) { // pipe uses nil to mean not available if b == nil { b = zero[:] } // One writer at a time. p.wl.Lock() defer p.wl.Unlock() p.l.Lock() defer p.l.Unlock() p.data = b p.rwait.Signal() for { if p.data == nil { break } if p.rerr != nil { err = p.rerr break } if p.werr != nil { err = os.EINVAL } p.wwait.Wait() } n = len(b) - len(p.data) p.data = nil // in case of rerr or werr return } func (p *pipe) rclose(err os.Error) { if err == nil { err = os.EPIPE } p.l.Lock() defer p.l.Unlock() p.rerr = err p.rwait.Signal() p.wwait.Signal() } func (p *pipe) wclose(err os.Error) { if err == nil { err = os.EOF } p.l.Lock() defer p.l.Unlock() p.werr = err p.rwait.Signal() p.wwait.Signal() } // A PipeReader is the read half of a pipe. type PipeReader struct { p *pipe } // Read implements the standard Read interface: // it reads data from the pipe, blocking until a writer // arrives or the write end is closed. // If the write end is closed with an error, that error is // returned as err; otherwise err is os.EOF. func (r *PipeReader) Read(data []byte) (n int, err os.Error) { return r.p.read(data) } // Close closes the reader; subsequent writes to the // write half of the pipe will return the error os.EPIPE. func (r *PipeReader) Close() os.Error { return r.CloseWithError(nil) } // CloseWithError closes the reader; subsequent writes // to the write half of the pipe will return the error err. func (r *PipeReader) CloseWithError(err os.Error) os.Error { r.p.rclose(err) return nil } // A PipeWriter is the write half of a pipe. type PipeWriter struct { p *pipe } // Write implements the standard Write interface: // it writes data to the pipe, blocking until readers // have consumed all the data or the read end is closed. // If the read end is closed with an error, that err is // returned as err; otherwise err is os.EPIPE. func (w *PipeWriter) Write(data []byte) (n int, err os.Error) { return w.p.write(data) } // Close closes the writer; subsequent reads from the // read half of the pipe will return no bytes and os.EOF. func (w *PipeWriter) Close() os.Error { return w.CloseWithError(nil) } // CloseWithError closes the writer; subsequent reads from the // read half of the pipe will return no bytes and the error err. func (w *PipeWriter) CloseWithError(err os.Error) os.Error { w.p.wclose(err) return nil } // Pipe creates a synchronous in-memory pipe. // It can be used to connect code expecting an io.Reader // with code expecting an io.Writer. // Reads on one end are matched with writes on the other, // copying data directly between the two; there is no internal buffering. func Pipe() (*PipeReader, *PipeWriter) { p := new(pipe) p.rwait.L = &p.l p.wwait.L = &p.l r := &PipeReader{p} w := &PipeWriter{p} return r, w }