publisher added
This commit is contained in:
parent
6addcca08d
commit
9366cbbf6d
2 changed files with 44 additions and 0 deletions
|
|
@ -70,3 +70,7 @@ func StartConsumer(ctx context.Context, client *Client, chanLen int) chan []byte
|
||||||
|
|
||||||
return msgCh
|
return msgCh
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewPublisher(client *Client) Publisher {
|
||||||
|
return &PubHandler{client: client}
|
||||||
|
}
|
||||||
|
|
|
||||||
40
publisher.go
Normal file
40
publisher.go
Normal file
|
|
@ -0,0 +1,40 @@
|
||||||
|
package rabbit
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Publisher interface {
|
||||||
|
Push(data []byte) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type PubHandler struct {
|
||||||
|
client *Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PubHandler) Push(data []byte) error {
|
||||||
|
p.client.mutex.Lock()
|
||||||
|
if !p.client.isReady {
|
||||||
|
p.client.mutex.Unlock()
|
||||||
|
return errors.New("failed to push: not connected")
|
||||||
|
}
|
||||||
|
p.client.mutex.Unlock()
|
||||||
|
for {
|
||||||
|
err := p.client.unsafePush(data)
|
||||||
|
if err != nil {
|
||||||
|
p.client.logger.Println("Push failed. Retrying...")
|
||||||
|
select {
|
||||||
|
case <-p.client.done:
|
||||||
|
return errShutdown
|
||||||
|
case <-time.After(p.client.opts.resendDelay):
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
confirm := <-p.client.notifyConfirm
|
||||||
|
if confirm.Ack {
|
||||||
|
p.client.logger.Printf("Push confirmed [%d]!", confirm.DeliveryTag)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue