commit 434df5b6406ab397e08edf0c4ff7378eb8c3fd0c Author: nquidox Date: Fri Feb 20 14:20:04 2026 +0300 initial diff --git a/consumer.go b/consumer.go new file mode 100644 index 0000000..2127b65 --- /dev/null +++ b/consumer.go @@ -0,0 +1,54 @@ +package rabbit + +import ( + "context" + amqp "github.com/rabbitmq/amqp091-go" + "log" + "time" +) + +func runConsumer(ctx context.Context, queue *Client, msgCh chan []byte) { + runCtx, cancel := context.WithCancel(ctx) + defer cancel() + + deliveries, err := queue.Consume() + if err != nil { + log.Printf("Could not start consuming: %s\n", err) + return + } + + chClosedCh := make(chan *amqp.Error, 1) + queue.Channel.NotifyClose(chClosedCh) + + for { + select { + case <-runCtx.Done(): + err = queue.Close() + if err != nil { + log.Printf("Close failed: %s\n", err) + } + return + + case amqErr := <-chClosedCh: + log.Printf("AMQP Channel closed due to: %s\n", amqErr) + + deliveries, err = queue.Consume() + if err != nil { + log.Println("Error trying to consume, will try again") + continue + } + + chClosedCh = make(chan *amqp.Error, 1) + queue.Channel.NotifyClose(chClosedCh) + + case delivery := <-deliveries: + msgCh <- delivery.Body + log.Printf("Received message: %s\n", delivery.Body) + + if err = delivery.Ack(false); err != nil { + log.Printf("Error acknowledging message: %s\n", err) + } + <-time.After(time.Second * 2) + } + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..d18bf43 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module repo.nqws.ru/merch-tracker-v2/mt-rabbit + +go 1.25.0 + +require github.com/rabbitmq/amqp091-go v1.10.0 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..024eebe --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= diff --git a/handler.go b/handler.go new file mode 100644 index 0000000..e50dd94 --- /dev/null +++ b/handler.go @@ -0,0 +1,70 @@ +package rabbit + +import ( + "context" + "errors" + amqp "github.com/rabbitmq/amqp091-go" + "log" + "os" + "sync" + "time" +) + +type Client struct { + mutex *sync.Mutex + queueName string + logger *log.Logger + connection *amqp.Connection + Channel *amqp.Channel + done chan bool + notifyConnClose chan *amqp.Error + notifyChanClose chan *amqp.Error + notifyConfirm chan amqp.Confirmation + isReady bool + reconnectDelay time.Duration + reInitDelay time.Duration + resendDelay time.Duration +} + +//const ( +// reconnectDelay = 5 * time.Second +// reInitDelay = 2 * time.Second +// resendDelay = 5 * time.Second +//) + +type Deps struct { + ReconnectDelay time.Duration + ReInitDelay time.Duration + ResendDelay time.Duration + QueueName string + Addr string +} + +var ( + errNotConnected = errors.New("not connected to a server") + errAlreadyClosed = errors.New("already closed: not connected to the server") + errShutdown = errors.New("client is shutting down") +) + +func NewClient(deps Deps) *Client { + client := Client{ + mutex: &sync.Mutex{}, + logger: log.New(os.Stdout, "", log.LstdFlags), + queueName: deps.QueueName, + done: make(chan bool), + reconnectDelay: deps.ReconnectDelay, + reInitDelay: deps.ReInitDelay, + resendDelay: deps.ResendDelay, + } + + go client.handleReconnect(deps.Addr) + + return &client +} + +func StartConsumer(ctx context.Context, client *Client, chanLen int) chan []byte { + msgCh := make(chan []byte, chanLen) + go runConsumer(ctx, client) + + return msgCh +} diff --git a/service.go b/service.go new file mode 100644 index 0000000..622e282 --- /dev/null +++ b/service.go @@ -0,0 +1,215 @@ +package rabbit + +import ( + "context" + "errors" + amqp "github.com/rabbitmq/amqp091-go" + "time" +) + +func (c *Client) handleReconnect(addr string) { + for { + c.mutex.Lock() + c.isReady = false + c.mutex.Unlock() + + c.logger.Println("Connecting to server...") + + conn, err := c.connect(addr) + if err != nil { + c.logger.Printf("Failed to connect to server: %s, retrying...", err) + + select { + case <-c.done: + return + case <-time.After(c.reconnectDelay): + } + continue + } + + if done := c.handleReInit(conn); done { + break + } + } +} + +func (c *Client) connect(addr string) (*amqp.Connection, error) { + conn, err := amqp.Dial(addr) + if err != nil { + return nil, err + } + + c.changeConnection(conn) + c.logger.Println("Connected to server") + return conn, nil +} + +func (c *Client) handleReInit(conn *amqp.Connection) bool { + for { + c.mutex.Lock() + c.isReady = false + c.mutex.Unlock() + + if err := c.init(conn); err != nil { + c.logger.Printf("Failed to initialize connection: %s", err) + + select { + case <-c.done: + return true + case <-c.notifyConnClose: + c.logger.Println("Connection closed. Reconnecting...") + return false + case <-time.After(c.reInitDelay): + } + continue + } + + select { + case <-c.done: + return true + case <-c.notifyConnClose: + c.logger.Println("Connection closed. Reconnecting...") + return false + case <-c.notifyChanClose: + c.logger.Println("Channel closed. Re-running init...") + } + } +} + +func (c *Client) init(conn *amqp.Connection) error { + ch, err := conn.Channel() + if err != nil { + return err + } + + err = ch.Confirm(false) + if err != nil { + return err + } + + _, err = ch.QueueDeclare(c.queueName, false, false, false, false, nil) + if err != nil { + return err + } + + c.changeChannel(ch) + c.mutex.Lock() + c.isReady = true + c.mutex.Unlock() + c.logger.Println("Setup") + + return nil +} + +func (c *Client) changeConnection(connection *amqp.Connection) { + c.connection = connection + c.notifyConnClose = make(chan *amqp.Error, 1) + c.connection.NotifyClose(c.notifyConnClose) +} + +func (c *Client) changeChannel(channel *amqp.Channel) { + c.Channel = channel + c.notifyChanClose = make(chan *amqp.Error, 1) + c.notifyConfirm = make(chan amqp.Confirmation, 1) + c.Channel.NotifyClose(c.notifyChanClose) + c.Channel.NotifyPublish(c.notifyConfirm) +} + +func (c *Client) Push(data []byte) error { + c.mutex.Lock() + if !c.isReady { + c.mutex.Unlock() + return errors.New("failed to push: not connected") + } + c.mutex.Unlock() + for { + err := c.UnsafePush(data) + if err != nil { + c.logger.Println("Push failed. Retrying...") + select { + case <-c.done: + return errShutdown + case <-time.After(c.resendDelay): + } + continue + } + confirm := <-c.notifyConfirm + if confirm.Ack { + c.logger.Printf("Push confirmed [%d]!", confirm.DeliveryTag) + return nil + } + } +} + +func (c *Client) UnsafePush(data []byte) error { + c.mutex.Lock() + if !c.isReady { + c.mutex.Unlock() + return errNotConnected + } + c.mutex.Unlock() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + return c.Channel.PublishWithContext( + ctx, + "", + c.queueName, + false, + false, + amqp.Publishing{ + ContentType: "text/plain", + Body: data, + }, + ) +} + +func (c *Client) Consume() (<-chan amqp.Delivery, error) { + c.mutex.Lock() + if !c.isReady { + c.mutex.Unlock() + return nil, errNotConnected + } + c.mutex.Unlock() + + if err := c.Channel.Qos( + 1, + 0, + false, + ); err != nil { + return nil, err + } + + return c.Channel.Consume( + c.queueName, + "", + false, + false, + false, + false, + nil, + ) +} + +func (c *Client) Close() error { + c.mutex.Lock() + + defer c.mutex.Unlock() + + if !c.isReady { + return errAlreadyClosed + } + close(c.done) + err := c.Channel.Close() + if err != nil { + return err + } + err = c.connection.Close() + if err != nil { + return err + } + + c.isReady = false + return nil +}