// Copyright 2009 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. // Pipe adapter to connect code expecting an io.Read // with code expecting an io.Write. package io import ( "os" "sync" ) type pipeReturn struct { n int err os.Error } // Shared pipe structure. type pipe struct { rclosed bool // Read end closed? rerr os.Error // Error supplied to CloseReader wclosed bool // Write end closed? werr os.Error // Error supplied to CloseWriter wpend []byte // Written data waiting to be read. wtot int // Bytes consumed so far in current write. cr chan []byte // Write sends data here... cw chan pipeReturn // ... and reads the n, err back from here. } func (p *pipe) Read(data []byte) (n int, err os.Error) { if p == nil || p.rclosed { return 0, os.EINVAL } // Wait for next write block if necessary. if p.wpend == nil { if !p.wclosed { p.wpend = <-p.cr } if p.wpend == nil { return 0, p.werr } p.wtot = 0 } // Read from current write block. n = len(data) if n > len(p.wpend) { n = len(p.wpend) } for i := 0; i < n; i++ { data[i] = p.wpend[i] } p.wtot += n p.wpend = p.wpend[n:] // If write block is done, finish the write. if len(p.wpend) == 0 { p.wpend = nil p.cw <- pipeReturn{p.wtot, nil} p.wtot = 0 } return n, nil } func (p *pipe) Write(data []byte) (n int, err os.Error) { if p == nil || p.wclosed { return 0, os.EINVAL } if p.rclosed { return 0, p.rerr } // Send data to reader. p.cr <- data // Wait for reader to finish copying it. res := <-p.cw return res.n, res.err } func (p *pipe) CloseReader(rerr os.Error) os.Error { if p == nil || p.rclosed { return os.EINVAL } // Stop any future writes. p.rclosed = true if rerr == nil { rerr = os.EPIPE } p.rerr = rerr // Stop the current write. if !p.wclosed { p.cw <- pipeReturn{p.wtot, rerr} } return nil } func (p *pipe) CloseWriter(werr os.Error) os.Error { if werr == nil { werr = os.EOF } if p == nil || p.wclosed { return os.EINVAL } // Stop any future reads. p.wclosed = true p.werr = werr // Stop the current read. if !p.rclosed { p.cr <- nil } return nil } // Read/write halves of the pipe. // They are separate structures for two reasons: // 1. If one end becomes garbage without being Closed, // its finisher can Close so that the other end // does not hang indefinitely. // 2. Clients cannot use interface conversions on the // read end to find the Write method, and vice versa. // A PipeReader is the read half of a pipe. type PipeReader struct { lock sync.Mutex p *pipe } // Read implements the standard Read interface: // it reads data from the pipe, blocking until a writer // arrives or the write end is closed. // If the write end is closed with an error, that error is // returned as err; otherwise err is nil. func (r *PipeReader) Read(data []byte) (n int, err os.Error) { r.lock.Lock() defer r.lock.Unlock() return r.p.Read(data) } // Close closes the reader; subsequent writes to the // write half of the pipe will return the error os.EPIPE. func (r *PipeReader) Close() os.Error { r.lock.Lock() defer r.lock.Unlock() return r.p.CloseReader(nil) } // CloseWithError closes the reader; subsequent writes // to the write half of the pipe will return the error rerr. func (r *PipeReader) CloseWithError(rerr os.Error) os.Error { r.lock.Lock() defer r.lock.Unlock() return r.p.CloseReader(rerr) } func (r *PipeReader) finish() { r.Close() } // Write half of pipe. type PipeWriter struct { lock sync.Mutex p *pipe } // Write implements the standard Write interface: // it writes data to the pipe, blocking until readers // have consumed all the data or the read end is closed. // If the read end is closed with an error, that err is // returned as err; otherwise err is os.EPIPE. func (w *PipeWriter) Write(data []byte) (n int, err os.Error) { w.lock.Lock() defer w.lock.Unlock() return w.p.Write(data) } // Close closes the writer; subsequent reads from the // read half of the pipe will return no bytes and a nil error. func (w *PipeWriter) Close() os.Error { w.lock.Lock() defer w.lock.Unlock() return w.p.CloseWriter(nil) } // CloseWithError closes the writer; subsequent reads from the // read half of the pipe will return no bytes and the error werr. func (w *PipeWriter) CloseWithError(werr os.Error) os.Error { w.lock.Lock() defer w.lock.Unlock() return w.p.CloseWriter(werr) } func (w *PipeWriter) finish() { w.Close() } // Pipe creates a synchronous in-memory pipe. // It can be used to connect code expecting an io.Reader // with code expecting an io.Writer. // Reads on one end are matched with writes on the other, // copying data directly between the two; there is no internal buffering. func Pipe() (*PipeReader, *PipeWriter) { p := new(pipe) p.cr = make(chan []byte, 1) p.cw = make(chan pipeReturn, 1) r := new(PipeReader) r.p = p w := new(PipeWriter) w.p = p return r, w }