diff --git a/handler.go b/handler.go index 6109124..a203b98 100644 --- a/handler.go +++ b/handler.go @@ -70,3 +70,7 @@ func StartConsumer(ctx context.Context, client *Client, chanLen int) chan []byte return msgCh } + +func NewPublisher(client *Client) Publisher { + return &PubHandler{client: client} +} diff --git a/publisher.go b/publisher.go new file mode 100644 index 0000000..69abeae --- /dev/null +++ b/publisher.go @@ -0,0 +1,40 @@ +package rabbit + +import ( + "errors" + "time" +) + +type Publisher interface { + Push(data []byte) error +} + +type PubHandler struct { + client *Client +} + +func (p *PubHandler) Push(data []byte) error { + p.client.mutex.Lock() + if !p.client.isReady { + p.client.mutex.Unlock() + return errors.New("failed to push: not connected") + } + p.client.mutex.Unlock() + for { + err := p.client.unsafePush(data) + if err != nil { + p.client.logger.Println("Push failed. Retrying...") + select { + case <-p.client.done: + return errShutdown + case <-time.After(p.client.opts.resendDelay): + } + continue + } + confirm := <-p.client.notifyConfirm + if confirm.Ack { + p.client.logger.Printf("Push confirmed [%d]!", confirm.DeliveryTag) + return nil + } + } +}