diff --git a/channel.go b/channel.go index cd19ce7e..7f8b3edd 100644 --- a/channel.go +++ b/channel.go @@ -229,21 +229,18 @@ func (ch *Channel) sendOpen(msg message) (err error) { } else { size = len(body) } + var frames [3]frame - if err = ch.connection.send(&methodFrame{ + frames[0] = &methodFrame{ ChannelId: ch.id, Method: content, - }); err != nil { - return } - if err = ch.connection.send(&headerFrame{ + frames[1] = &headerFrame{ ChannelId: ch.id, ClassId: class, Size: uint64(len(body)), Properties: props, - }); err != nil { - return } // chunk body into size (max frame size - frame header size) @@ -252,11 +249,22 @@ func (ch *Channel) sendOpen(msg message) (err error) { j = len(body) } - if err = ch.connection.send(&bodyFrame{ - ChannelId: ch.id, - Body: body[i:j], - }); err != nil { - return + // Send first body frame together with the publish and header frame + if i == 0 { + frames[2] = &bodyFrame{ + ChannelId: ch.id, + Body: body[i:j], + } + if err = ch.connection.sendFrames(frames); err != nil { + return + } + } else { + if err = ch.connection.send(&bodyFrame{ + ChannelId: ch.id, + Body: body[i:j], + }); err != nil { + return + } } } } else { diff --git a/connection.go b/connection.go index b9d8e8ee..50ae8b07 100644 --- a/connection.go +++ b/connection.go @@ -189,6 +189,7 @@ func DialConfig(url string, config Config) (*Connection, error) { if err != nil { return nil, err } + conn.(*net.TCPConn).SetNoDelay(false) if uri.Scheme == "amqps" { if config.TLSClientConfig == nil { @@ -383,6 +384,34 @@ func (c *Connection) send(f frame) error { return err } +func (c *Connection) sendFrames(frames [3]frame) error { + if c.IsClosed() { + return ErrClosed + } + + c.sendM.Lock() + err := c.writer.WriteFrames(frames) + c.sendM.Unlock() + + if err != nil { + // shutdown could be re-entrant from signaling notify chans + go c.shutdown(&Error{ + Code: FrameError, + Reason: err.Error(), + }) + } else { + // Broadcast we sent a frame, reducing heartbeats, only + // if there is something that can receive - like a non-reentrant + // call or if the heartbeater isn't running + select { + case c.sends <- time.Now(): + default: + } + } + + return err +} + func (c *Connection) shutdown(err *Error) { atomic.StoreInt32(&c.closed, 1) diff --git a/write.go b/write.go index 94a46d11..27cec27d 100644 --- a/write.go +++ b/write.go @@ -27,6 +27,20 @@ func (w *writer) WriteFrame(frame frame) (err error) { return } +func (w *writer) WriteFrames(frames [3]frame) (err error) { + for _, frame := range frames { + if err = frame.write(w.w); err != nil { + return + } + } + + if buf, ok := w.w.(*bufio.Writer); ok { + err = buf.Flush() + } + + return +} + func (f *methodFrame) write(w io.Writer) (err error) { var payload bytes.Buffer