package rabbit import ( "context" amqp "github.com/rabbitmq/amqp091-go" "time" ) func (c *Client) handleReconnect(addr string) { for { c.logger.Println("Connecting to server...") conn, err := c.connect(addr) if err != nil { c.logger.Printf("Failed to connect to server: %s, retrying...", err) select { case <-c.done: return case <-time.After(c.opts.reconnectDelay): } continue } if done := c.handleReInit(conn); done { break } } } func (c *Client) connect(addr string) (*amqp.Connection, error) { conn, err := amqp.Dial(addr) if err != nil { return nil, err } c.changeConnection(conn) c.logger.Println("Connected to server") return conn, nil } func (c *Client) handleReInit(conn *amqp.Connection) bool { for { if err := c.init(conn); err != nil { c.logger.Printf("Failed to initialize connection: %s", err) select { case <-c.done: return true case <-c.notifyConnClose: c.logger.Println("Connection closed. Reconnecting...") return false case <-time.After(c.opts.reInitDelay): } continue } select { case <-c.done: return true case <-c.notifyConnClose: c.logger.Println("Connection closed. Reconnecting...") return false case <-c.notifyChanClose: c.logger.Println("Channel closed. Re-running init...") } } } func (c *Client) init(conn *amqp.Connection) error { ch, err := conn.Channel() if err != nil { return err } err = ch.Confirm(false) if err != nil { return err } _, err = ch.QueueDeclare(c.queueName, false, false, false, false, nil) if err != nil { return err } c.changeChannel(ch) c.mutex.Lock() c.isReady = true c.mutex.Unlock() c.logger.Println("Setup") return nil } func (c *Client) changeConnection(connection *amqp.Connection) { c.connection = connection c.notifyConnClose = make(chan *amqp.Error, 1) c.connection.NotifyClose(c.notifyConnClose) } func (c *Client) changeChannel(channel *amqp.Channel) { c.Channel = channel c.notifyChanClose = make(chan *amqp.Error, 1) c.notifyConfirm = make(chan amqp.Confirmation, 1) c.Channel.NotifyClose(c.notifyChanClose) c.Channel.NotifyPublish(c.notifyConfirm) } func (c *Client) unsafePush(data []byte) error { c.mutex.Lock() if !c.isReady { c.mutex.Unlock() return errNotConnected } c.mutex.Unlock() ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() return c.Channel.PublishWithContext( ctx, "", c.queueName, false, false, amqp.Publishing{ ContentType: "text/plain", Body: data, }, ) } func (c *Client) consume() (<-chan amqp.Delivery, error) { c.mutex.Lock() if !c.isReady { c.mutex.Unlock() return nil, errNotConnected } c.mutex.Unlock() if err := c.Channel.Qos( 1, 0, false, ); err != nil { return nil, err } return c.Channel.Consume( c.queueName, "", false, false, false, false, nil, ) } func (c *Client) Close() error { c.mutex.Lock() defer c.mutex.Unlock() if !c.isReady { return errAlreadyClosed } close(c.done) err := c.Channel.Close() if err != nil { return err } err = c.connection.Close() if err != nil { return err } c.isReady = false return nil }