56 lines
1 KiB
Go
56 lines
1 KiB
Go
package rabbit
|
|
|
|
import (
|
|
"errors"
|
|
"log"
|
|
"time"
|
|
)
|
|
|
|
type Publisher interface {
|
|
Start(chanLen uint) <-chan []byte
|
|
}
|
|
|
|
type pubHandler struct {
|
|
client *Client
|
|
}
|
|
|
|
func (p *pubHandler) Start(chanLen uint) <-chan []byte {
|
|
ch := make(chan []byte, chanLen)
|
|
go func() {
|
|
for {
|
|
select {
|
|
case msg := <-ch:
|
|
if err := p.push(msg); err != nil {
|
|
log.Printf("Error publishing message: %s", err)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
return ch
|
|
}
|
|
|
|
func (p *pubHandler) push(data []byte) error {
|
|
p.client.mutex.Lock()
|
|
if !p.client.isReady {
|
|
p.client.mutex.Unlock()
|
|
return errors.New("failed to push: not connected")
|
|
}
|
|
p.client.mutex.Unlock()
|
|
for {
|
|
err := p.client.unsafePush(data)
|
|
if err != nil {
|
|
p.client.logger.Println("Push failed. Retrying...")
|
|
select {
|
|
case <-p.client.done:
|
|
return errShutdown
|
|
case <-time.After(p.client.opts.resendDelay):
|
|
}
|
|
continue
|
|
}
|
|
confirm := <-p.client.notifyConfirm
|
|
if confirm.Ack {
|
|
p.client.logger.Printf("Push confirmed [%d]!", confirm.DeliveryTag)
|
|
return nil
|
|
}
|
|
}
|
|
}
|