package rabbit import ( "errors" "log" "time" ) type Publisher interface { Start(chanLen uint) <-chan []byte } type pubHandler struct { client *Client } func (p *pubHandler) Start(chanLen uint) <-chan []byte { ch := make(chan []byte, chanLen) go func() { for { select { case msg := <-ch: if err := p.push(msg); err != nil { log.Printf("Error publishing message: %s", err) } } } }() return ch } func (p *pubHandler) push(data []byte) error { p.client.mutex.Lock() if !p.client.isReady { p.client.mutex.Unlock() return errors.New("failed to push: not connected") } p.client.mutex.Unlock() for { err := p.client.unsafePush(data) if err != nil { p.client.logger.Println("Push failed. Retrying...") select { case <-p.client.done: return errShutdown case <-time.After(p.client.opts.resendDelay): } continue } confirm := <-p.client.notifyConfirm if confirm.Ack { p.client.logger.Printf("Push confirmed [%d]!", confirm.DeliveryTag) return nil } } }