summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2012-04-13 18:32:52 +0200
committerYorhel <git@yorhel.nl>2012-04-13 18:32:52 +0200
commitcbce146adb7d281cfd0e14d03e77eab5af08748a (patch)
treef2508c559619535dc3aa9141a18bbfe6c6104d30
Initial commit
-rw-r--r--COPYING20
-rw-r--r--README86
-rw-r--r--wr.go211
3 files changed, 317 insertions, 0 deletions
diff --git a/COPYING b/COPYING
new file mode 100644
index 0000000..251b5a4
--- /dev/null
+++ b/COPYING
@@ -0,0 +1,20 @@
+Copyright (c) 2012 Yoran Heling
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be included
+in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/README b/README
new file mode 100644
index 0000000..b8995eb
--- /dev/null
+++ b/README
@@ -0,0 +1,86 @@
+asyncwr - An asynchronous writer package for Go
+
+
+WHAT IS THIS?
+
+ When writing Go code, there are times when I just want to write a message to
+ some network socket or other destination, but don't particularly care about
+ when that write is going to happen.
+
+ This package implements an asynchronous implementation of the io.Writer
+ interface. It can be used like any other writer, except that calls to Write()
+ do not block and you do not need an explicit call to some blocking Flush().
+ Anything that is written to the buffer is flushed automatically in the
+ background.
+
+ Additionally, this package allows multiple goroutines to append data to the
+ buffer concurrently, and provides a notification channel to allow for
+ separating the error handling from the writing code.
+
+
+IMPORT PATH
+
+ import "blicky.net/asyncwr"
+
+
+USAGE
+
+ import (
+ "blicky.net/asyncwr"
+ "fmt"
+ "net"
+ )
+
+ wr := asyncwr.New(os.Stdout, 1024*1024)
+
+ // Async error handling
+ go func() {
+ if e := <-wr.ErrorCh(); e != nil {
+ fmt.Fprintf(os.Stderr, "Write error: %s.\n", e.Error())
+ os.Exit(1)
+ }
+ }()
+
+ wr.Write([]byte("Data\n"))
+
+ // Do some other work
+
+ <-wr.FlushCh()
+
+
+MINI-FAQ:
+
+ Q: Asynchronous behaviour!? This goes against the fundamental ideas of Go!
+
+ Indeed. But I hate it when a programming language forces you into thinking
+ that a single paradigm is enough for every situation you will ever
+ encounter in your life. I prefer to have the freedom to do whatever I want
+ to do, and luckily Go doesn't make that impossible.
+
+ Q: What about an asynchronous reader?
+
+ The idiomatic way to do asynchronous reads is to create a separate
+ goroutine to (blocking) read from a connection and pass the results over a
+ channel. I've not found any problems with that yet. :-)
+
+ Q: What about performance?
+
+ I haven't done any benchmarks, but I suppose this writer will be a bit
+ slower than a bufio.Writer.
+
+ Q: You're using locks instead of channels!
+
+ I just happened to find a mutex a relatively easy synchronisation method
+ for this purpose. It may also be a bit faster than an implementation based
+ on channels, but I can't really comment on that without benchmarks.
+
+ Q: Your usage example is bullshit!
+
+ It's just an example. Suggestions for a better example are always welcome.
+
+
+TODO
+
+ - Tests!
+ - Configuring and handling timeouts?
+ - Call .Flush() on the underlying writer if it supports that?
diff --git a/wr.go b/wr.go
new file mode 100644
index 0000000..d2c5ebe
--- /dev/null
+++ b/wr.go
@@ -0,0 +1,211 @@
+package asyncwr
+
+import (
+ "errors"
+ "io"
+ "sync"
+)
+
+var ErrBufferFull = errors.New("asyncwr: buffer full")
+
+type Writer struct {
+ lock sync.Mutex
+ wr io.Writer
+ errch chan error
+ err error
+ maxSize int
+ flushch chan bool
+ buf []byte
+}
+
+// New creates a new asynchronous wrapper to a writer.
+// The maxSize parameter controls the maximum allowed size of the buffer, in
+// bytes. If the buffer grows beyond this size, the 'ErrBufferFull' error is
+// returned and the writer switches to an error state.
+func New(wr io.Writer, maxSize int) *Writer {
+ return &Writer{wr: wr, maxSize: maxSize, errch: make(chan error, 1)}
+}
+
+// Buffered returns the number of bytes currently in the buffer.
+// Note that this may not accurately reflect the number of bytes that are not
+// yet written to the underlying writer. It is possible that some data has
+// already been written, but that the buffer has not updated yet.
+func (w *Writer) Buffered() int {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+ return len(w.buf)
+}
+
+// Available returns the number of bytes before the buffer is full.
+func (w *Writer) Available() int {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+ return w.maxSize - len(w.buf)
+}
+
+// Assumes the lock is held.
+func (w *Writer) setError(e error) {
+ if w.err != nil {
+ w.err = e
+ w.errch <- e
+ close(w.flushch)
+ w.flushch = nil
+ // Get rid of the buffer, frees some memory.
+ w.buf = nil
+ }
+}
+
+// HasError returns whether the writer is in an error state.
+// If an error has occured, the buffer will switch to an error state, in which
+// every further operation will return an error.
+func (w *Writer) HasError() error {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+ return w.err
+}
+
+// ErrorCh returns a channel on which an error will be written when one occurs.
+// Once an error happens, the error will be written to this channel. If the
+// writer is already in an error state, the error can be read from the channel
+// immediately.
+//
+// Multiple calls to ErrorCh() may return the same channel, and the error is
+// only sent to this channel once. This means that only a single goroutine
+// should be reading on this channel at any point in time.
+func (w *Writer) ErrorCh() <-chan error {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+ if w.err != nil {
+ ch := make(chan error, 1)
+ ch <- w.err
+ return ch
+ }
+ return w.errch
+}
+
+// Flush blocks until the buffer has been flushed.
+func (w *Writer) Flush() error {
+ <-w.FlushCh()
+ return w.HasError()
+}
+
+// FlushCh returns a channel that will be closed once the buffer is empty.
+// If the buffer is empty when this method is called, it will return an already
+// closed channel. Otherwise, the returned channel will stay open until the
+// buffer has been flushed or when an error has occured.
+func (w *Writer) FlushCh() <-chan bool {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+ if w.flushch == nil {
+ ch := make(chan bool)
+ close(ch)
+ return ch
+ }
+ return w.flushch
+}
+
+// Write appends data to the write buffer.
+// Behaves similarly to any other Write() method, except it doesn't block and
+// you can call this method from multiple goroutines.
+//
+// One word of warning, however: a single Write call is an "atomic" operation.
+// If you call Write() from multiple goroutines, then the bytes of one call
+// will not be mangled with those of concurrent call. However, if you perform
+// multiple Write calls from one goroutine, then any concurrent Write may
+// insert data in between your calls. For example:
+//
+// wr.Write([]byte{'a'})
+// wr.Write([]byte{'b'})
+//
+// Is not equivalent to:
+//
+// wr.Write([]byte("ab"))
+//
+// In the first example, another goroutine may insert data to the buffer
+// between "a" and "b", whereas in the second example it is guaranteed that "a"
+// and "b" appear without any other data in between.
+//
+// In particular, be wary of passing a Writer object to a function that may
+// call Write() multiple times. For example:
+//
+// fmt.Fprintf(wr, "Key = %s\n", "value")
+//
+// fmt.Fprintf() might internally implement this as a sequence of calls to
+// Write(), e.g.:
+//
+// wr.Write([]byte("Number = "))
+// wr.Write([]byte("value"))
+// wr.Write([]byte{'\n'})
+//
+// Thus making it possible that any other data is inserted in between. Note
+// that this example is an artificial one, the current implementation of
+// fmt.Fprintf() does, in fact, use only a single call to Write(). If you do
+// run into a situation where you wish to pass the writer to external code of
+// which you don't know how many calls to Write() it will make, you can use the
+// bufio package:
+//
+// bwr := bufio.NewWriter(wr)
+// fmt.Fprintf(bwr, "Key = %s\n", "value")
+// bwr.Flush()
+func (w *Writer) Write(b []byte) (int, error) {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+ if w.err != nil {
+ return 0, w.err
+ }
+ l := len(b)
+ old := len(w.buf)
+ if l+len(w.buf) > w.maxSize {
+ l = w.maxSize - len(w.buf)
+ w.setError(ErrBufferFull)
+ return 0, ErrBufferFull
+ }
+ if l > 0 {
+ w.buf = append(w.buf, b[:l]...)
+ }
+ if old == 0 {
+ w.flushch = make(chan bool)
+ go w.flusher()
+ }
+ return l, nil
+}
+
+func (w *Writer) flusher() {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+ for {
+ buf := w.buf
+ size := len(buf)
+ // Don't give too many data to a single Write(), otherwise it
+ // may block for too long without giving any feedback on how
+ // many data we've written so far.
+ if size > 10240 {
+ size = 10240
+ }
+
+ // Note: We're giving a shared buffer to Write() without having the
+ // lock. This should not be a problem, however: Only the first part of
+ // the buffer is read, while any other concurrent methods will only
+ // append data to the buffer without modifying existing data. (Such an
+ // append may result in the buffer being copied into a larger memory
+ // block, but that doesn't affect the buffer we are working with here,
+ // as it will keep pointing to the old block)
+ // TODO: Set a (configurable?) timeout on this.
+ w.lock.Unlock()
+ wr, e := w.wr.Write(buf[:size])
+ w.lock.Lock()
+
+ if e != nil {
+ w.setError(e)
+ } else if w.err == nil && wr > 0 {
+ // TODO: as a performance optimisation, it's possible to only use
+ // copy() occasionally, and just slice the underlying buffer in
+ // other cases.
+ i := copy(w.buf, w.buf[wr:])
+ w.buf = w.buf[:i]
+ }
+ }
+ if w.flushch != nil {
+ close(w.flushch)
+ }
+}