package asyncwr import ( "errors" "io" "sync" ) var ErrBufferFull = errors.New("asyncwr: buffer full") type Writer struct { lock sync.Mutex wr io.Writer errch chan error err error maxSize int flushch chan bool buf []byte } // New creates a new asynchronous wrapper to a writer. // The maxSize parameter controls the maximum allowed size of the buffer, in // bytes. If the buffer grows beyond this size, the 'ErrBufferFull' error is // returned and the writer switches to an error state. func New(wr io.Writer, maxSize int) *Writer { return &Writer{wr: wr, maxSize: maxSize, errch: make(chan error, 1)} } // Buffered returns the number of bytes currently in the buffer. // Note that this may not accurately reflect the number of bytes that are not // yet written to the underlying writer. It is possible that some data has // already been written, but that the buffer has not updated yet. func (w *Writer) Buffered() int { w.lock.Lock() defer w.lock.Unlock() return len(w.buf) } // Available returns the number of bytes before the buffer is full. func (w *Writer) Available() int { w.lock.Lock() defer w.lock.Unlock() return w.maxSize - len(w.buf) } // Assumes the lock is held. func (w *Writer) setError(e error) { if w.err != nil { w.err = e w.errch <- e close(w.flushch) w.flushch = nil // Get rid of the buffer, frees some memory. w.buf = nil } } // HasError returns whether the writer is in an error state. // If an error has occured, the buffer will switch to an error state, in which // every further operation will return an error. func (w *Writer) HasError() error { w.lock.Lock() defer w.lock.Unlock() return w.err } // ErrorCh returns a channel on which an error will be written when one occurs. // Once an error happens, the error will be written to this channel. If the // writer is already in an error state, the error can be read from the channel // immediately. // // Multiple calls to ErrorCh() may return the same channel, and the error is // only sent to this channel once. This means that only a single goroutine // should be reading on this channel at any point in time. func (w *Writer) ErrorCh() <-chan error { w.lock.Lock() defer w.lock.Unlock() if w.err != nil { ch := make(chan error, 1) ch <- w.err return ch } return w.errch } // Flush blocks until the buffer has been flushed. func (w *Writer) Flush() error { <-w.FlushCh() return w.HasError() } // FlushCh returns a channel that will be closed once the buffer is empty. // If the buffer is empty when this method is called, it will return an already // closed channel. Otherwise, the returned channel will stay open until the // buffer has been flushed or when an error has occured. func (w *Writer) FlushCh() <-chan bool { w.lock.Lock() defer w.lock.Unlock() if w.flushch == nil { ch := make(chan bool) close(ch) return ch } return w.flushch } // Write appends data to the write buffer. // Behaves similarly to any other Write() method, except it doesn't block and // you can call this method from multiple goroutines. // // One word of warning, however: a single Write call is an "atomic" operation. // If you call Write() from multiple goroutines, then the bytes of one call // will not be mangled with those of concurrent call. However, if you perform // multiple Write calls from one goroutine, then any concurrent Write may // insert data in between your calls. For example: // // wr.Write([]byte{'a'}) // wr.Write([]byte{'b'}) // // Is not equivalent to: // // wr.Write([]byte("ab")) // // In the first example, another goroutine may insert data to the buffer // between "a" and "b", whereas in the second example it is guaranteed that "a" // and "b" appear without any other data in between. // // In particular, be wary of passing a Writer object to a function that may // call Write() multiple times. For example: // // fmt.Fprintf(wr, "Key = %s\n", "value") // // fmt.Fprintf() might internally implement this as a sequence of calls to // Write(), e.g.: // // wr.Write([]byte("Number = ")) // wr.Write([]byte("value")) // wr.Write([]byte{'\n'}) // // Thus making it possible that any other data is inserted in between. Note // that this example is an artificial one, the current implementation of // fmt.Fprintf() does, in fact, use only a single call to Write(). If you do // run into a situation where you wish to pass the writer to external code of // which you don't know how many calls to Write() it will make, you can use the // bufio package: // // bwr := bufio.NewWriter(wr) // fmt.Fprintf(bwr, "Key = %s\n", "value") // bwr.Flush() func (w *Writer) Write(b []byte) (int, error) { w.lock.Lock() defer w.lock.Unlock() if w.err != nil { return 0, w.err } l := len(b) old := len(w.buf) if l+len(w.buf) > w.maxSize { l = w.maxSize - len(w.buf) w.setError(ErrBufferFull) return 0, ErrBufferFull } if l > 0 { w.buf = append(w.buf, b[:l]...) } if old == 0 { w.flushch = make(chan bool) go w.flusher() } return l, nil } func (w *Writer) flusher() { w.lock.Lock() defer w.lock.Unlock() for len(w.buf) > 0 { buf := w.buf size := len(buf) // Don't give too many data to a single Write(), otherwise it // may block for too long without giving any feedback on how // many data we've written so far. if size > 10240 { size = 10240 } // Note: We're giving a shared buffer to Write() without having the // lock. This should not be a problem, however: Only the first part of // the buffer is read, while any other concurrent methods will only // append data to the buffer without modifying existing data. (Such an // append may result in the buffer being copied into a larger memory // block, but that doesn't affect the buffer we are working with here, // as it will keep pointing to the old block) // TODO: Set a (configurable?) timeout on this. w.lock.Unlock() wr, e := w.wr.Write(buf[:size]) w.lock.Lock() if e != nil { w.setError(e) } else if w.err == nil && wr > 0 { // TODO: as a performance optimisation, it's possible to only use // copy() occasionally, and just slice the underlying buffer in // other cases. i := copy(w.buf, w.buf[wr:]) w.buf = w.buf[:i] } } if w.flushch != nil { close(w.flushch) } }