summaryrefslogtreecommitdiff
path: root/src/pkg/io/pipe.go
blob: 743e4c468f907d056c7913ef1782b46a69ced225 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Pipe adapter to connect code expecting an io.Read
// with code expecting an io.Write.

package io

import (
	"os";
	"sync";
)

type pipeReturn struct {
	n	int;
	err	os.Error;
}

// Shared pipe structure.
type pipe struct {
	rclosed	bool;			// Read end closed?
	rerr	os.Error;		// Error supplied to CloseReader
	wclosed	bool;			// Write end closed?
	werr	os.Error;		// Error supplied to CloseWriter
	wpend	[]byte;			// Written data waiting to be read.
	wtot	int;			// Bytes consumed so far in current write.
	cr	chan []byte;		// Write sends data here...
	cw	chan pipeReturn;	// ... and reads the n, err back from here.
}

func (p *pipe) Read(data []byte) (n int, err os.Error) {
	if p == nil || p.rclosed {
		return 0, os.EINVAL;
	}

	// Wait for next write block if necessary.
	if p.wpend == nil {
		if !p.wclosed {
			p.wpend = <-p.cr;
		}
		if p.wpend == nil {
			return 0, p.werr;
		}
		p.wtot = 0;
	}

	// Read from current write block.
	n = len(data);
	if n > len(p.wpend) {
		n = len(p.wpend);
	}
	for i := 0; i < n; i++ {
		data[i] = p.wpend[i];
	}
	p.wtot += n;
	p.wpend = p.wpend[n:len(p.wpend)];

	// If write block is done, finish the write.
	if len(p.wpend) == 0 {
		p.wpend = nil;
		p.cw <- pipeReturn{p.wtot, nil};
		p.wtot = 0;
	}

	return n, nil;
}

func (p *pipe) Write(data []byte) (n int, err os.Error) {
	if p == nil || p.wclosed {
		return 0, os.EINVAL;
	}
	if p.rclosed {
		return 0, p.rerr;
	}

	// Send data to reader.
	p.cr <- data;

	// Wait for reader to finish copying it.
	res := <-p.cw;
	return res.n, res.err;
}

func (p *pipe) CloseReader(rerr os.Error) os.Error {
	if p == nil || p.rclosed {
		return os.EINVAL;
	}

	// Stop any future writes.
	p.rclosed = true;
	if rerr == nil {
		rerr = os.EPIPE;
	}
	p.rerr = rerr;

	// Stop the current write.
	if !p.wclosed {
		p.cw <- pipeReturn{p.wtot, rerr};
	}

	return nil;
}

func (p *pipe) CloseWriter(werr os.Error) os.Error {
	if werr == nil {
		werr = os.EOF;
	}
	if p == nil || p.wclosed {
		return os.EINVAL;
	}

	// Stop any future reads.
	p.wclosed = true;
	p.werr = werr;

	// Stop the current read.
	if !p.rclosed {
		p.cr <- nil;
	}

	return nil;
}

// Read/write halves of the pipe.
// They are separate structures for two reasons:
//  1.  If one end becomes garbage without being Closed,
//      its finisher can Close so that the other end
//      does not hang indefinitely.
//  2.  Clients cannot use interface conversions on the
//      read end to find the Write method, and vice versa.

// A PipeReader is the read half of a pipe.
type PipeReader struct {
	lock	sync.Mutex;
	p	*pipe;
}

// Read implements the standard Read interface:
// it reads data from the pipe, blocking until a writer
// arrives or the write end is closed.
// If the write end is closed with an error, that error is
// returned as err; otherwise err is nil.
func (r *PipeReader) Read(data []byte) (n int, err os.Error) {
	r.lock.Lock();
	defer r.lock.Unlock();

	return r.p.Read(data);
}

// Close closes the reader; subsequent writes to the
// write half of the pipe will return the error os.EPIPE.
func (r *PipeReader) Close() os.Error {
	r.lock.Lock();
	defer r.lock.Unlock();

	return r.p.CloseReader(nil);
}

// CloseWithError closes the reader; subsequent writes
// to the write half of the pipe will return the error rerr.
func (r *PipeReader) CloseWithError(rerr os.Error) os.Error {
	r.lock.Lock();
	defer r.lock.Unlock();

	return r.p.CloseReader(rerr);
}

func (r *PipeReader) finish()	{ r.Close() }

// Write half of pipe.
type PipeWriter struct {
	lock	sync.Mutex;
	p	*pipe;
}

// Write implements the standard Write interface:
// it writes data to the pipe, blocking until readers
// have consumed all the data or the read end is closed.
// If the read end is closed with an error, that err is
// returned as err; otherwise err is os.EPIPE.
func (w *PipeWriter) Write(data []byte) (n int, err os.Error) {
	w.lock.Lock();
	defer w.lock.Unlock();

	return w.p.Write(data);
}

// Close closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and a nil error.
func (w *PipeWriter) Close() os.Error {
	w.lock.Lock();
	defer w.lock.Unlock();

	return w.p.CloseWriter(nil);
}

// CloseWithError closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and the error werr.
func (w *PipeWriter) CloseWithError(werr os.Error) os.Error {
	w.lock.Lock();
	defer w.lock.Unlock();

	return w.p.CloseWriter(werr);
}

func (w *PipeWriter) finish()	{ w.Close() }

// Pipe creates a synchronous in-memory pipe.
// It can be used to connect code expecting an io.Reader
// with code expecting an io.Writer.
// Reads on one end are matched with writes on the other,
// copying data directly between the two; there is no internal buffering.
func Pipe() (*PipeReader, *PipeWriter) {
	p := new(pipe);
	p.cr = make(chan []byte, 1);
	p.cw = make(chan pipeReturn, 1);
	r := new(PipeReader);
	r.p = p;
	w := new(PipeWriter);
	w.p = p;
	return r, w;
}