summaryrefslogtreecommitdiff
path: root/wr.go
blob: 9de5419be8eec08281338d09f142edd63f680361 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
package asyncwr

import (
	"errors"
	"io"
	"sync"
)

var ErrBufferFull = errors.New("asyncwr: buffer full")

type Writer struct {
	lock    sync.Mutex
	wr      io.Writer
	errch   chan error
	err     error
	maxSize int
	flushch chan bool
	buf     []byte
}

// New creates a new asynchronous wrapper to a writer.
// The maxSize parameter controls the maximum allowed size of the buffer, in
// bytes. If the buffer grows beyond this size, the 'ErrBufferFull' error is
// returned and the writer switches to an error state.
func New(wr io.Writer, maxSize int) *Writer {
	return &Writer{wr: wr, maxSize: maxSize, errch: make(chan error, 1)}
}

// Buffered returns the number of bytes currently in the buffer.
// Note that this may not accurately reflect the number of bytes that are not
// yet written to the underlying writer. It is possible that some data has
// already been written, but that the buffer has not updated yet.
func (w *Writer) Buffered() int {
	w.lock.Lock()
	defer w.lock.Unlock()
	return len(w.buf)
}

// Available returns the number of bytes before the buffer is full.
func (w *Writer) Available() int {
	w.lock.Lock()
	defer w.lock.Unlock()
	return w.maxSize - len(w.buf)
}

// Assumes the lock is held.
func (w *Writer) setError(e error) {
	if w.err != nil {
		w.err = e
		w.errch <- e
		close(w.flushch)
		w.flushch = nil
		// Get rid of the buffer, frees some memory.
		w.buf = nil
	}
}

// HasError returns whether the writer is in an error state.
// If an error has occured, the buffer will switch to an error state, in which
// every further operation will return an error.
func (w *Writer) HasError() error {
	w.lock.Lock()
	defer w.lock.Unlock()
	return w.err
}

// ErrorCh returns a channel on which an error will be written when one occurs.
// Once an error happens, the error will be written to this channel. If the
// writer is already in an error state, the error can be read from the channel
// immediately.
//
// Multiple calls to ErrorCh() may return the same channel, and the error is
// only sent to this channel once. This means that only a single goroutine
// should be reading on this channel at any point in time.
func (w *Writer) ErrorCh() <-chan error {
	w.lock.Lock()
	defer w.lock.Unlock()
	if w.err != nil {
		ch := make(chan error, 1)
		ch <- w.err
		return ch
	}
	return w.errch
}

// Flush blocks until the buffer has been flushed.
func (w *Writer) Flush() error {
	<-w.FlushCh()
	return w.HasError()
}

// FlushCh returns a channel that will be closed once the buffer is empty.
// If the buffer is empty when this method is called, it will return an already
// closed channel. Otherwise, the returned channel will stay open until the
// buffer has been flushed or when an error has occured.
func (w *Writer) FlushCh() <-chan bool {
	w.lock.Lock()
	defer w.lock.Unlock()
	if w.flushch == nil {
		ch := make(chan bool)
		close(ch)
		return ch
	}
	return w.flushch
}

// Write appends data to the write buffer.
// Behaves similarly to any other Write() method, except it doesn't block and
// you can call this method from multiple goroutines.
//
// One word of warning, however: a single Write call is an "atomic" operation.
// If you call Write() from multiple goroutines, then the bytes of one call
// will not be mangled with those of concurrent call. However, if you perform
// multiple Write calls from one goroutine, then any concurrent Write may
// insert data in between your calls. For example:
//
//   wr.Write([]byte{'a'})
//   wr.Write([]byte{'b'})
//
// Is not equivalent to:
//
//   wr.Write([]byte("ab"))
//
// In the first example, another goroutine may insert data to the buffer
// between "a" and "b", whereas in the second example it is guaranteed that "a"
// and "b" appear without any other data in between.
//
// In particular, be wary of passing a Writer object to a function that may
// call Write() multiple times. For example:
//
//   fmt.Fprintf(wr, "Key = %s\n", "value")
//
// fmt.Fprintf() might internally implement this as a sequence of calls to
// Write(), e.g.:
//
//   wr.Write([]byte("Number = "))
//   wr.Write([]byte("value"))
//   wr.Write([]byte{'\n'})
//
// Thus making it possible that any other data is inserted in between. Note
// that this example is an artificial one, the current implementation of
// fmt.Fprintf() does, in fact, use only a single call to Write(). If you do
// run into a situation where you wish to pass the writer to external code of
// which you don't know how many calls to Write() it will make, you can use the
// bufio package:
//
//   bwr := bufio.NewWriter(wr)
//   fmt.Fprintf(bwr, "Key = %s\n", "value")
//   bwr.Flush()
func (w *Writer) Write(b []byte) (int, error) {
	w.lock.Lock()
	defer w.lock.Unlock()
	if w.err != nil {
		return 0, w.err
	}
	l := len(b)
	old := len(w.buf)
	if l+len(w.buf) > w.maxSize {
		l = w.maxSize - len(w.buf)
		w.setError(ErrBufferFull)
		return 0, ErrBufferFull
	}
	if l > 0 {
		w.buf = append(w.buf, b[:l]...)
	}
	if old == 0 {
		w.flushch = make(chan bool)
		go w.flusher()
	}
	return l, nil
}

func (w *Writer) flusher() {
	w.lock.Lock()
	defer w.lock.Unlock()
	for len(w.buf) > 0 {
		buf := w.buf
		size := len(buf)
		// Don't give too many data to a single Write(), otherwise it
		// may block for too long without giving any feedback on how
		// many data we've written so far.
		if size > 10240 {
			size = 10240
		}

		// Note: We're giving a shared buffer to Write() without having the
		// lock. This should not be a problem, however: Only the first part of
		// the buffer is read, while any other concurrent methods will only
		// append data to the buffer without modifying existing data. (Such an
		// append may result in the buffer being copied into a larger memory
		// block, but that doesn't affect the buffer we are working with here,
		// as it will keep pointing to the old block)
		// TODO: Set a (configurable?) timeout on this.
		w.lock.Unlock()
		wr, e := w.wr.Write(buf[:size])
		w.lock.Lock()

		if e != nil {
			w.setError(e)
		} else if w.err == nil && wr > 0 {
			// TODO: as a performance optimisation, it's possible to only use
			// copy() occasionally, and just slice the underlying buffer in
			// other cases.
			i := copy(w.buf, w.buf[wr:])
			w.buf = w.buf[:i]
		}
	}
	if w.flushch != nil {
		close(w.flushch)
	}
}