mt-rabbit/publisher.go

67 lines
1.3 KiB
Go
Raw Normal View History

2026-02-20 16:05:12 +03:00
package rabbit
import (
2026-02-21 15:54:50 +03:00
"context"
2026-02-20 16:05:12 +03:00
"errors"
2026-02-21 15:03:43 +03:00
"log"
2026-02-20 16:05:12 +03:00
"time"
)
type Publisher interface {
2026-02-21 16:07:19 +03:00
Start(ctx context.Context, chanLen uint) chan<- []byte
2026-02-20 16:05:12 +03:00
}
2026-02-20 16:20:19 +03:00
type pubHandler struct {
2026-02-20 16:05:12 +03:00
client *Client
}
2026-02-21 15:54:50 +03:00
func (p *pubHandler) Start(ctx context.Context, chanLen uint) chan<- []byte {
2026-02-21 15:03:43 +03:00
ch := make(chan []byte, chanLen)
go func() {
for {
select {
2026-02-21 15:54:50 +03:00
case <-ctx.Done():
for msg := range ch {
if err := p.push(msg); err != nil {
log.Printf("Error publishing message (shutdown): %s", err)
}
}
log.Println("Publisher stopped")
return
2026-02-21 15:03:43 +03:00
case msg := <-ch:
if err := p.push(msg); err != nil {
log.Printf("Error publishing message: %s", err)
}
}
}
}()
return ch
}
func (p *pubHandler) push(data []byte) error {
2026-02-20 16:05:12 +03:00
p.client.mutex.Lock()
if !p.client.isReady {
p.client.mutex.Unlock()
return errors.New("failed to push: not connected")
}
p.client.mutex.Unlock()
for {
err := p.client.unsafePush(data)
if err != nil {
p.client.logger.Println("Push failed. Retrying...")
select {
case <-p.client.done:
return errShutdown
case <-time.After(p.client.opts.resendDelay):
}
continue
}
confirm := <-p.client.notifyConfirm
if confirm.Ack {
p.client.logger.Printf("Push confirmed [%d]!", confirm.DeliveryTag)
return nil
}
}
}