diff --git a/.gitignore b/.gitignore deleted file mode 100644 index 485dee6..0000000 --- a/.gitignore +++ /dev/null @@ -1 +0,0 @@ -.idea diff --git a/address.go b/address.go deleted file mode 100644 index dadf5d3..0000000 --- a/address.go +++ /dev/null @@ -1,34 +0,0 @@ -package rabbit - -import ( - "errors" - "fmt" - "net/url" -) - -type Address struct { - Username string - Password string - Host string - Port uint16 - Vhost string -} - -func (a *Address) makeAddr() (string, error) { - if a.Host == "" { - return "", errors.New("no host provided") - } - - if a.Vhost == "" { - return "", errors.New("no vhost provided") - } - - //"amqp://username:password@host:port/vhost" - return fmt.Sprintf("amqp://%v:%v@%v:%v/%v", - url.QueryEscape(a.Username), - url.QueryEscape(a.Password), - a.Host, - a.Port, - url.PathEscape(a.Vhost), - ), nil -} diff --git a/consumer.go b/consumer.go index 5c68bf6..2127b65 100644 --- a/consumer.go +++ b/consumer.go @@ -3,111 +3,52 @@ package rabbit import ( "context" amqp "github.com/rabbitmq/amqp091-go" - "golang.org/x/time/rate" + "log" "time" ) -type Consumer interface { - Start(ctx context.Context, chanLen uint) <-chan []byte -} - -type consumeHandler struct { - client *Client -} - -func (c *consumeHandler) Start(ctx context.Context, chanLen uint) <-chan []byte { - msgCh := make(chan []byte, chanLen) - go runConsumer(ctx, c.client, msgCh) - - return msgCh -} - -func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { +func runConsumer(ctx context.Context, queue *Client, msgCh chan []byte) { runCtx, cancel := context.WithCancel(ctx) defer cancel() - limiter := rate.NewLimiter(rate.Every(client.opts.consumerRateLimit), client.opts.consumerBurstSize) - - reconnectSignal := make(chan struct{}, 1) - reconnectTrigger := func() { - select { - case reconnectSignal <- struct{}{}: - default: - } - } - - // initial consume - deliveries, err := client.consume() + deliveries, err := queue.Consume() if err != nil { - client.logger.Printf("Could not start consuming: %s\n", err) + log.Printf("Could not start consuming: %s\n", err) return } chClosedCh := make(chan *amqp.Error, 1) - client.Channel.NotifyClose(chClosedCh) + queue.Channel.NotifyClose(chClosedCh) for { select { - case <-runCtx.Done(): - err = client.Close() + err = queue.Close() if err != nil { - client.logger.Printf("Close failed: %s\n", err) + log.Printf("Close failed: %s\n", err) } return - case <-reconnectSignal: - select { - case <-runCtx.Done(): - return - case <-time.After(client.opts.reconnectDelay): - } - - deliveries, err = client.consume() - if err != nil { - client.logger.Printf("Consume reconnect failed, retry: %s\n", err) - reconnectTrigger() - continue - } - - // re-create closing chan - chClosedCh = make(chan *amqp.Error, 1) - client.Channel.NotifyClose(chClosedCh) - client.logger.Println("Consume reconnect success") - case amqErr := <-chClosedCh: - client.logger.Printf("AMQP Channel closed due to: %s Reconnecting...\n", amqErr) - reconnectTrigger() + log.Printf("AMQP Channel closed due to: %s\n", amqErr) - case delivery, ok := <-deliveries: - if !ok { - client.logger.Println("Deliveries channel closed unexpectedly") - reconnectTrigger() + deliveries, err = queue.Consume() + if err != nil { + log.Println("Error trying to consume, will try again") continue } - if err = limiter.Wait(runCtx); err != nil { - client.logger.Printf("Wait limiter failed: %s\n", err) - } - - select { - case <-runCtx.Done(): - if err = delivery.Nack(false, true); err != nil { - client.logger.Printf("Error nacking message: %s\n", err) - } - - err = client.Close() - if err != nil { - client.logger.Printf("Close failed: %s\n", err) - } - return - - case msgCh <- delivery.Body: - client.logger.Printf("Received message: %s\n", delivery.Body) - if err = delivery.Ack(false); err != nil { - client.logger.Printf("Error acknowledging message: %s\n", err) - } + chClosedCh = make(chan *amqp.Error, 1) + queue.Channel.NotifyClose(chClosedCh) + + case delivery := <-deliveries: + msgCh <- delivery.Body + log.Printf("Received message: %s\n", delivery.Body) + + if err = delivery.Ack(false); err != nil { + log.Printf("Error acknowledging message: %s\n", err) } + <-time.After(time.Second * 2) } } } diff --git a/go.mod b/go.mod index 0a3c615..d18bf43 100644 --- a/go.mod +++ b/go.mod @@ -3,5 +3,3 @@ module repo.nqws.ru/merch-tracker-v2/mt-rabbit go 1.25.0 require github.com/rabbitmq/amqp091-go v1.10.0 - -require golang.org/x/time v0.14.0 // indirect diff --git a/go.sum b/go.sum index 64cd225..024eebe 100644 --- a/go.sum +++ b/go.sum @@ -2,5 +2,3 @@ github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzuk github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= -golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= diff --git a/handler.go b/handler.go index 6a8dd3f..e50dd94 100644 --- a/handler.go +++ b/handler.go @@ -1,10 +1,9 @@ package rabbit import ( + "context" "errors" - "fmt" amqp "github.com/rabbitmq/amqp091-go" - "io" "log" "os" "sync" @@ -13,7 +12,7 @@ import ( type Client struct { mutex *sync.Mutex - queueOptions *QueueOpts + queueName string logger *log.Logger connection *amqp.Connection Channel *amqp.Channel @@ -22,109 +21,50 @@ type Client struct { notifyChanClose chan *amqp.Error notifyConfirm chan amqp.Confirmation isReady bool - opts options - connected chan struct{} + reconnectDelay time.Duration + reInitDelay time.Duration + resendDelay time.Duration } -type options struct { - connectTimeout time.Duration - reconnectDelay time.Duration - reInitDelay time.Duration - resendDelay time.Duration - consumerRateLimit time.Duration - consumerBurstSize int - logger *log.Logger +//const ( +// reconnectDelay = 5 * time.Second +// reInitDelay = 2 * time.Second +// resendDelay = 5 * time.Second +//) + +type Deps struct { + ReconnectDelay time.Duration + ReInitDelay time.Duration + ResendDelay time.Duration + QueueName string + Addr string } -type QueueOpts struct { - QueueName string - Durable bool - AutoDelete bool - Exclusive bool - NoWait bool - Args amqp.Table -} - -func NewClient(address Address, queueOpts QueueOpts, opts ...Option) (*Client, error) { - l := log.New(os.Stdout, "", log.LstdFlags) - - addr, err := address.makeAddr() - if err != nil { - return nil, errors.Join(errBadAddr, err) - } - - if queueOpts.QueueName == "" { - l.Fatal(errNoQueue) - } +var ( + errNotConnected = errors.New("not connected to a server") + errAlreadyClosed = errors.New("already closed: not connected to the server") + errShutdown = errors.New("client is shutting down") +) +func NewClient(deps Deps) *Client { client := Client{ - mutex: &sync.Mutex{}, - queueOptions: &queueOpts, - done: make(chan bool), - connected: make(chan struct{}), - logger: l, + mutex: &sync.Mutex{}, + logger: log.New(os.Stdout, "", log.LstdFlags), + queueName: deps.QueueName, + done: make(chan bool), + reconnectDelay: deps.ReconnectDelay, + reInitDelay: deps.ReInitDelay, + resendDelay: deps.ResendDelay, } - o := options{ - connectTimeout: 15 * time.Second, - reconnectDelay: 5 * time.Second, - reInitDelay: 2 * time.Second, - resendDelay: 5 * time.Second, - consumerRateLimit: time.Millisecond * 500, - consumerBurstSize: 10, - logger: log.New(io.Discard, "", 0), - } + go client.handleReconnect(deps.Addr) - for _, opt := range opts { - opt(&o) - } - - client.logger = o.logger - - if err = client.connectAndSignal(addr, o.connectTimeout); err != nil { - return nil, fmt.Errorf("failed to connect: %w", err) - } - - go client.handleReconnect(addr) - - return &client, nil + return &client } -func NewPublisher(client *Client) Publisher { - return &pubHandler{client: client} -} - -func NewConsumer(client *Client) Consumer { - return &consumeHandler{client: client} -} - -func (c *Client) connectAndSignal(addr string, timeout time.Duration) error { - type result struct { - conn *amqp.Connection - err error - } - resCh := make(chan result, 1) - - go func() { - conn, err := amqp.Dial(addr) - resCh <- result{conn, err} - }() - - select { - case <-time.After(timeout): - return fmt.Errorf("connection timeout after %v", timeout) - case res := <-resCh: - if res.err != nil { - return res.err - } - - c.changeConnection(res.conn) - if err := c.init(res.conn); err != nil { - res.conn.Close() - return fmt.Errorf("init failed: %w", err) - } - - close(c.connected) - return nil - } +func StartConsumer(ctx context.Context, client *Client, chanLen int) chan []byte { + msgCh := make(chan []byte, chanLen) + go runConsumer(ctx, client) + + return msgCh } diff --git a/messages.go b/messages.go deleted file mode 100644 index fd422ab..0000000 --- a/messages.go +++ /dev/null @@ -1,11 +0,0 @@ -package rabbit - -import "errors" - -var ( - errBadAddr = errors.New("bad address") - errNoQueue = errors.New("no queue") - errNotConnected = errors.New("not connected to a server") - errAlreadyClosed = errors.New("already closed: not connected to the server") - errShutdown = errors.New("client is shutting down") -) diff --git a/options.go b/options.go deleted file mode 100644 index 3e84f95..0000000 --- a/options.go +++ /dev/null @@ -1,60 +0,0 @@ -package rabbit - -import ( - "io" - "log" - "os" - "time" -) - -type Option func(*options) - -func WithConnectTimeout(t time.Duration) Option { - return func(op *options) { - op.connectTimeout = t - } -} - -func WithReconnectDelay(t time.Duration) Option { - return func(op *options) { - op.reconnectDelay = t - } -} - -func WithReInitDelay(t time.Duration) Option { - return func(op *options) { - op.reInitDelay = t - } -} - -func WithResendDelay(t time.Duration) Option { - return func(op *options) { - op.resendDelay = t - } -} - -func WithConsumerRateLimit(t time.Duration) Option { - return func(op *options) { - op.consumerRateLimit = t - } -} - -func WithConsumerBurstSize(t int) Option { - return func(op *options) { - op.consumerBurstSize = t - } -} - -func WithLogger(l *log.Logger) Option { - return func(op *options) { op.logger = l } -} - -func WithLogging(enabled bool) Option { - return func(op *options) { - if enabled { - op.logger = log.New(os.Stdout, "", log.LstdFlags) - } else { - op.logger = log.New(io.Discard, "", 0) - } - } -} diff --git a/publisher.go b/publisher.go deleted file mode 100644 index c938eb5..0000000 --- a/publisher.go +++ /dev/null @@ -1,65 +0,0 @@ -package rabbit - -import ( - "context" - "errors" - "time" -) - -type Publisher interface { - Start(ctx context.Context, chanLen uint) chan<- []byte -} - -type pubHandler struct { - client *Client -} - -func (p *pubHandler) Start(ctx context.Context, chanLen uint) chan<- []byte { - ch := make(chan []byte, chanLen) - go func() { - for { - select { - case <-ctx.Done(): - for msg := range ch { - if err := p.push(msg); err != nil { - p.client.logger.Printf("Error publishing message (shutdown): %s", err) - } - } - p.client.logger.Println("Publisher stopped") - return - - case msg := <-ch: - if err := p.push(msg); err != nil { - p.client.logger.Printf("Error publishing message: %s", err) - } - } - } - }() - return ch -} - -func (p *pubHandler) push(data []byte) error { - p.client.mutex.Lock() - if !p.client.isReady { - p.client.mutex.Unlock() - return errors.New("failed to push: not connected") - } - p.client.mutex.Unlock() - for { - err := p.client.unsafePush(data) - if err != nil { - p.client.logger.Println("Push failed. Retrying...") - select { - case <-p.client.done: - return errShutdown - case <-time.After(p.client.opts.resendDelay): - } - continue - } - confirm := <-p.client.notifyConfirm - if confirm.Ack { - p.client.logger.Printf("Push confirmed [%d]!", confirm.DeliveryTag) - return nil - } - } -} diff --git a/service.go b/service.go index c3f9ec8..622e282 100644 --- a/service.go +++ b/service.go @@ -2,12 +2,17 @@ package rabbit import ( "context" + "errors" amqp "github.com/rabbitmq/amqp091-go" "time" ) func (c *Client) handleReconnect(addr string) { for { + c.mutex.Lock() + c.isReady = false + c.mutex.Unlock() + c.logger.Println("Connecting to server...") conn, err := c.connect(addr) @@ -17,7 +22,7 @@ func (c *Client) handleReconnect(addr string) { select { case <-c.done: return - case <-time.After(c.opts.reconnectDelay): + case <-time.After(c.reconnectDelay): } continue } @@ -41,6 +46,10 @@ func (c *Client) connect(addr string) (*amqp.Connection, error) { func (c *Client) handleReInit(conn *amqp.Connection) bool { for { + c.mutex.Lock() + c.isReady = false + c.mutex.Unlock() + if err := c.init(conn); err != nil { c.logger.Printf("Failed to initialize connection: %s", err) @@ -50,7 +59,7 @@ func (c *Client) handleReInit(conn *amqp.Connection) bool { case <-c.notifyConnClose: c.logger.Println("Connection closed. Reconnecting...") return false - case <-time.After(c.opts.reInitDelay): + case <-time.After(c.reInitDelay): } continue } @@ -78,16 +87,7 @@ func (c *Client) init(conn *amqp.Connection) error { return err } - //_, err = ch.QueueDeclare(c.queueName, false, false, false, false, nil) - _, err = ch.QueueDeclare( - c.queueOptions.QueueName, - c.queueOptions.Durable, - c.queueOptions.AutoDelete, - c.queueOptions.Exclusive, - c.queueOptions.NoWait, - c.queueOptions.Args, - ) - + _, err = ch.QueueDeclare(c.queueName, false, false, false, false, nil) if err != nil { return err } @@ -115,7 +115,33 @@ func (c *Client) changeChannel(channel *amqp.Channel) { c.Channel.NotifyPublish(c.notifyConfirm) } -func (c *Client) unsafePush(data []byte) error { +func (c *Client) Push(data []byte) error { + c.mutex.Lock() + if !c.isReady { + c.mutex.Unlock() + return errors.New("failed to push: not connected") + } + c.mutex.Unlock() + for { + err := c.UnsafePush(data) + if err != nil { + c.logger.Println("Push failed. Retrying...") + select { + case <-c.done: + return errShutdown + case <-time.After(c.resendDelay): + } + continue + } + confirm := <-c.notifyConfirm + if confirm.Ack { + c.logger.Printf("Push confirmed [%d]!", confirm.DeliveryTag) + return nil + } + } +} + +func (c *Client) UnsafePush(data []byte) error { c.mutex.Lock() if !c.isReady { c.mutex.Unlock() @@ -129,7 +155,7 @@ func (c *Client) unsafePush(data []byte) error { return c.Channel.PublishWithContext( ctx, "", - c.queueOptions.QueueName, + c.queueName, false, false, amqp.Publishing{ @@ -139,7 +165,7 @@ func (c *Client) unsafePush(data []byte) error { ) } -func (c *Client) consume() (<-chan amqp.Delivery, error) { +func (c *Client) Consume() (<-chan amqp.Delivery, error) { c.mutex.Lock() if !c.isReady { c.mutex.Unlock() @@ -156,7 +182,7 @@ func (c *Client) consume() (<-chan amqp.Delivery, error) { } return c.Channel.Consume( - c.queueOptions.QueueName, + c.queueName, "", false, false,