summaryrefslogtreecommitdiff
path: root/wr.go
diff options
context:
space:
mode:
Diffstat (limited to 'wr.go')
-rw-r--r--wr.go211
1 files changed, 211 insertions, 0 deletions
diff --git a/wr.go b/wr.go
new file mode 100644
index 0000000..d2c5ebe
--- /dev/null
+++ b/wr.go
@@ -0,0 +1,211 @@
+package asyncwr
+
+import (
+ "errors"
+ "io"
+ "sync"
+)
+
+var ErrBufferFull = errors.New("asyncwr: buffer full")
+
+type Writer struct {
+ lock sync.Mutex
+ wr io.Writer
+ errch chan error
+ err error
+ maxSize int
+ flushch chan bool
+ buf []byte
+}
+
+// New creates a new asynchronous wrapper to a writer.
+// The maxSize parameter controls the maximum allowed size of the buffer, in
+// bytes. If the buffer grows beyond this size, the 'ErrBufferFull' error is
+// returned and the writer switches to an error state.
+func New(wr io.Writer, maxSize int) *Writer {
+ return &Writer{wr: wr, maxSize: maxSize, errch: make(chan error, 1)}
+}
+
+// Buffered returns the number of bytes currently in the buffer.
+// Note that this may not accurately reflect the number of bytes that are not
+// yet written to the underlying writer. It is possible that some data has
+// already been written, but that the buffer has not updated yet.
+func (w *Writer) Buffered() int {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+ return len(w.buf)
+}
+
+// Available returns the number of bytes before the buffer is full.
+func (w *Writer) Available() int {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+ return w.maxSize - len(w.buf)
+}
+
+// Assumes the lock is held.
+func (w *Writer) setError(e error) {
+ if w.err != nil {
+ w.err = e
+ w.errch <- e
+ close(w.flushch)
+ w.flushch = nil
+ // Get rid of the buffer, frees some memory.
+ w.buf = nil
+ }
+}
+
+// HasError returns whether the writer is in an error state.
+// If an error has occured, the buffer will switch to an error state, in which
+// every further operation will return an error.
+func (w *Writer) HasError() error {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+ return w.err
+}
+
+// ErrorCh returns a channel on which an error will be written when one occurs.
+// Once an error happens, the error will be written to this channel. If the
+// writer is already in an error state, the error can be read from the channel
+// immediately.
+//
+// Multiple calls to ErrorCh() may return the same channel, and the error is
+// only sent to this channel once. This means that only a single goroutine
+// should be reading on this channel at any point in time.
+func (w *Writer) ErrorCh() <-chan error {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+ if w.err != nil {
+ ch := make(chan error, 1)
+ ch <- w.err
+ return ch
+ }
+ return w.errch
+}
+
+// Flush blocks until the buffer has been flushed.
+func (w *Writer) Flush() error {
+ <-w.FlushCh()
+ return w.HasError()
+}
+
+// FlushCh returns a channel that will be closed once the buffer is empty.
+// If the buffer is empty when this method is called, it will return an already
+// closed channel. Otherwise, the returned channel will stay open until the
+// buffer has been flushed or when an error has occured.
+func (w *Writer) FlushCh() <-chan bool {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+ if w.flushch == nil {
+ ch := make(chan bool)
+ close(ch)
+ return ch
+ }
+ return w.flushch
+}
+
+// Write appends data to the write buffer.
+// Behaves similarly to any other Write() method, except it doesn't block and
+// you can call this method from multiple goroutines.
+//
+// One word of warning, however: a single Write call is an "atomic" operation.
+// If you call Write() from multiple goroutines, then the bytes of one call
+// will not be mangled with those of concurrent call. However, if you perform
+// multiple Write calls from one goroutine, then any concurrent Write may
+// insert data in between your calls. For example:
+//
+// wr.Write([]byte{'a'})
+// wr.Write([]byte{'b'})
+//
+// Is not equivalent to:
+//
+// wr.Write([]byte("ab"))
+//
+// In the first example, another goroutine may insert data to the buffer
+// between "a" and "b", whereas in the second example it is guaranteed that "a"
+// and "b" appear without any other data in between.
+//
+// In particular, be wary of passing a Writer object to a function that may
+// call Write() multiple times. For example:
+//
+// fmt.Fprintf(wr, "Key = %s\n", "value")
+//
+// fmt.Fprintf() might internally implement this as a sequence of calls to
+// Write(), e.g.:
+//
+// wr.Write([]byte("Number = "))
+// wr.Write([]byte("value"))
+// wr.Write([]byte{'\n'})
+//
+// Thus making it possible that any other data is inserted in between. Note
+// that this example is an artificial one, the current implementation of
+// fmt.Fprintf() does, in fact, use only a single call to Write(). If you do
+// run into a situation where you wish to pass the writer to external code of
+// which you don't know how many calls to Write() it will make, you can use the
+// bufio package:
+//
+// bwr := bufio.NewWriter(wr)
+// fmt.Fprintf(bwr, "Key = %s\n", "value")
+// bwr.Flush()
+func (w *Writer) Write(b []byte) (int, error) {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+ if w.err != nil {
+ return 0, w.err
+ }
+ l := len(b)
+ old := len(w.buf)
+ if l+len(w.buf) > w.maxSize {
+ l = w.maxSize - len(w.buf)
+ w.setError(ErrBufferFull)
+ return 0, ErrBufferFull
+ }
+ if l > 0 {
+ w.buf = append(w.buf, b[:l]...)
+ }
+ if old == 0 {
+ w.flushch = make(chan bool)
+ go w.flusher()
+ }
+ return l, nil
+}
+
+func (w *Writer) flusher() {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+ for {
+ buf := w.buf
+ size := len(buf)
+ // Don't give too many data to a single Write(), otherwise it
+ // may block for too long without giving any feedback on how
+ // many data we've written so far.
+ if size > 10240 {
+ size = 10240
+ }
+
+ // Note: We're giving a shared buffer to Write() without having the
+ // lock. This should not be a problem, however: Only the first part of
+ // the buffer is read, while any other concurrent methods will only
+ // append data to the buffer without modifying existing data. (Such an
+ // append may result in the buffer being copied into a larger memory
+ // block, but that doesn't affect the buffer we are working with here,
+ // as it will keep pointing to the old block)
+ // TODO: Set a (configurable?) timeout on this.
+ w.lock.Unlock()
+ wr, e := w.wr.Write(buf[:size])
+ w.lock.Lock()
+
+ if e != nil {
+ w.setError(e)
+ } else if w.err == nil && wr > 0 {
+ // TODO: as a performance optimisation, it's possible to only use
+ // copy() occasionally, and just slice the underlying buffer in
+ // other cases.
+ i := copy(w.buf, w.buf[wr:])
+ w.buf = w.buf[:i]
+ }
+ }
+ if w.flushch != nil {
+ close(w.flushch)
+ }
+}