diff options
author | Yorhel <git@yorhel.nl> | 2012-04-13 18:32:52 +0200 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2012-04-13 18:32:52 +0200 |
commit | cbce146adb7d281cfd0e14d03e77eab5af08748a (patch) | |
tree | f2508c559619535dc3aa9141a18bbfe6c6104d30 /wr.go |
Initial commit
Diffstat (limited to 'wr.go')
-rw-r--r-- | wr.go | 211 |
1 files changed, 211 insertions, 0 deletions
@@ -0,0 +1,211 @@ +package asyncwr + +import ( + "errors" + "io" + "sync" +) + +var ErrBufferFull = errors.New("asyncwr: buffer full") + +type Writer struct { + lock sync.Mutex + wr io.Writer + errch chan error + err error + maxSize int + flushch chan bool + buf []byte +} + +// New creates a new asynchronous wrapper to a writer. +// The maxSize parameter controls the maximum allowed size of the buffer, in +// bytes. If the buffer grows beyond this size, the 'ErrBufferFull' error is +// returned and the writer switches to an error state. +func New(wr io.Writer, maxSize int) *Writer { + return &Writer{wr: wr, maxSize: maxSize, errch: make(chan error, 1)} +} + +// Buffered returns the number of bytes currently in the buffer. +// Note that this may not accurately reflect the number of bytes that are not +// yet written to the underlying writer. It is possible that some data has +// already been written, but that the buffer has not updated yet. +func (w *Writer) Buffered() int { + w.lock.Lock() + defer w.lock.Unlock() + return len(w.buf) +} + +// Available returns the number of bytes before the buffer is full. +func (w *Writer) Available() int { + w.lock.Lock() + defer w.lock.Unlock() + return w.maxSize - len(w.buf) +} + +// Assumes the lock is held. +func (w *Writer) setError(e error) { + if w.err != nil { + w.err = e + w.errch <- e + close(w.flushch) + w.flushch = nil + // Get rid of the buffer, frees some memory. + w.buf = nil + } +} + +// HasError returns whether the writer is in an error state. +// If an error has occured, the buffer will switch to an error state, in which +// every further operation will return an error. +func (w *Writer) HasError() error { + w.lock.Lock() + defer w.lock.Unlock() + return w.err +} + +// ErrorCh returns a channel on which an error will be written when one occurs. +// Once an error happens, the error will be written to this channel. If the +// writer is already in an error state, the error can be read from the channel +// immediately. +// +// Multiple calls to ErrorCh() may return the same channel, and the error is +// only sent to this channel once. This means that only a single goroutine +// should be reading on this channel at any point in time. +func (w *Writer) ErrorCh() <-chan error { + w.lock.Lock() + defer w.lock.Unlock() + if w.err != nil { + ch := make(chan error, 1) + ch <- w.err + return ch + } + return w.errch +} + +// Flush blocks until the buffer has been flushed. +func (w *Writer) Flush() error { + <-w.FlushCh() + return w.HasError() +} + +// FlushCh returns a channel that will be closed once the buffer is empty. +// If the buffer is empty when this method is called, it will return an already +// closed channel. Otherwise, the returned channel will stay open until the +// buffer has been flushed or when an error has occured. +func (w *Writer) FlushCh() <-chan bool { + w.lock.Lock() + defer w.lock.Unlock() + if w.flushch == nil { + ch := make(chan bool) + close(ch) + return ch + } + return w.flushch +} + +// Write appends data to the write buffer. +// Behaves similarly to any other Write() method, except it doesn't block and +// you can call this method from multiple goroutines. +// +// One word of warning, however: a single Write call is an "atomic" operation. +// If you call Write() from multiple goroutines, then the bytes of one call +// will not be mangled with those of concurrent call. However, if you perform +// multiple Write calls from one goroutine, then any concurrent Write may +// insert data in between your calls. For example: +// +// wr.Write([]byte{'a'}) +// wr.Write([]byte{'b'}) +// +// Is not equivalent to: +// +// wr.Write([]byte("ab")) +// +// In the first example, another goroutine may insert data to the buffer +// between "a" and "b", whereas in the second example it is guaranteed that "a" +// and "b" appear without any other data in between. +// +// In particular, be wary of passing a Writer object to a function that may +// call Write() multiple times. For example: +// +// fmt.Fprintf(wr, "Key = %s\n", "value") +// +// fmt.Fprintf() might internally implement this as a sequence of calls to +// Write(), e.g.: +// +// wr.Write([]byte("Number = ")) +// wr.Write([]byte("value")) +// wr.Write([]byte{'\n'}) +// +// Thus making it possible that any other data is inserted in between. Note +// that this example is an artificial one, the current implementation of +// fmt.Fprintf() does, in fact, use only a single call to Write(). If you do +// run into a situation where you wish to pass the writer to external code of +// which you don't know how many calls to Write() it will make, you can use the +// bufio package: +// +// bwr := bufio.NewWriter(wr) +// fmt.Fprintf(bwr, "Key = %s\n", "value") +// bwr.Flush() +func (w *Writer) Write(b []byte) (int, error) { + w.lock.Lock() + defer w.lock.Unlock() + if w.err != nil { + return 0, w.err + } + l := len(b) + old := len(w.buf) + if l+len(w.buf) > w.maxSize { + l = w.maxSize - len(w.buf) + w.setError(ErrBufferFull) + return 0, ErrBufferFull + } + if l > 0 { + w.buf = append(w.buf, b[:l]...) + } + if old == 0 { + w.flushch = make(chan bool) + go w.flusher() + } + return l, nil +} + +func (w *Writer) flusher() { + w.lock.Lock() + defer w.lock.Unlock() + for { + buf := w.buf + size := len(buf) + // Don't give too many data to a single Write(), otherwise it + // may block for too long without giving any feedback on how + // many data we've written so far. + if size > 10240 { + size = 10240 + } + + // Note: We're giving a shared buffer to Write() without having the + // lock. This should not be a problem, however: Only the first part of + // the buffer is read, while any other concurrent methods will only + // append data to the buffer without modifying existing data. (Such an + // append may result in the buffer being copied into a larger memory + // block, but that doesn't affect the buffer we are working with here, + // as it will keep pointing to the old block) + // TODO: Set a (configurable?) timeout on this. + w.lock.Unlock() + wr, e := w.wr.Write(buf[:size]) + w.lock.Lock() + + if e != nil { + w.setError(e) + } else if w.err == nil && wr > 0 { + // TODO: as a performance optimisation, it's possible to only use + // copy() occasionally, and just slice the underlying buffer in + // other cases. + i := copy(w.buf, w.buf[wr:]) + w.buf = w.buf[:i] + } + } + if w.flushch != nil { + close(w.flushch) + } +} |