private methods + push move
This commit is contained in:
parent
3f0401de1d
commit
6addcca08d
2 changed files with 4 additions and 31 deletions
|
|
@ -13,7 +13,7 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) {
|
||||||
|
|
||||||
limiter := rate.NewLimiter(rate.Every(client.opts.consumerRateLimit), client.opts.consumerBurstSize)
|
limiter := rate.NewLimiter(rate.Every(client.opts.consumerRateLimit), client.opts.consumerBurstSize)
|
||||||
|
|
||||||
deliveries, err := client.Consume()
|
deliveries, err := client.consume()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Could not start consuming: %s\n", err)
|
log.Printf("Could not start consuming: %s\n", err)
|
||||||
return
|
return
|
||||||
|
|
@ -34,7 +34,7 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) {
|
||||||
case amqErr := <-chClosedCh:
|
case amqErr := <-chClosedCh:
|
||||||
log.Printf("AMQP Channel closed due to: %s\n", amqErr)
|
log.Printf("AMQP Channel closed due to: %s\n", amqErr)
|
||||||
|
|
||||||
deliveries, err = client.Consume()
|
deliveries, err = client.consume()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("Error trying to consume, will try again")
|
log.Println("Error trying to consume, will try again")
|
||||||
continue
|
continue
|
||||||
|
|
|
||||||
31
service.go
31
service.go
|
|
@ -2,7 +2,6 @@ package rabbit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
amqp "github.com/rabbitmq/amqp091-go"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
@ -115,33 +114,7 @@ func (c *Client) changeChannel(channel *amqp.Channel) {
|
||||||
c.Channel.NotifyPublish(c.notifyConfirm)
|
c.Channel.NotifyPublish(c.notifyConfirm)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Push(data []byte) error {
|
func (c *Client) unsafePush(data []byte) error {
|
||||||
c.mutex.Lock()
|
|
||||||
if !c.isReady {
|
|
||||||
c.mutex.Unlock()
|
|
||||||
return errors.New("failed to push: not connected")
|
|
||||||
}
|
|
||||||
c.mutex.Unlock()
|
|
||||||
for {
|
|
||||||
err := c.UnsafePush(data)
|
|
||||||
if err != nil {
|
|
||||||
c.logger.Println("Push failed. Retrying...")
|
|
||||||
select {
|
|
||||||
case <-c.done:
|
|
||||||
return errShutdown
|
|
||||||
case <-time.After(c.opts.resendDelay):
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
confirm := <-c.notifyConfirm
|
|
||||||
if confirm.Ack {
|
|
||||||
c.logger.Printf("Push confirmed [%d]!", confirm.DeliveryTag)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) UnsafePush(data []byte) error {
|
|
||||||
c.mutex.Lock()
|
c.mutex.Lock()
|
||||||
if !c.isReady {
|
if !c.isReady {
|
||||||
c.mutex.Unlock()
|
c.mutex.Unlock()
|
||||||
|
|
@ -165,7 +138,7 @@ func (c *Client) UnsafePush(data []byte) error {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Consume() (<-chan amqp.Delivery, error) {
|
func (c *Client) consume() (<-chan amqp.Delivery, error) {
|
||||||
c.mutex.Lock()
|
c.mutex.Lock()
|
||||||
if !c.isReady {
|
if !c.isReady {
|
||||||
c.mutex.Unlock()
|
c.mutex.Unlock()
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue