From 02794e8f7d4c671f3b12af0ba54c0c5d0ca44e49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cxuefeng=E2=80=9D?= Date: Thu, 7 Sep 2023 10:37:28 +0800 Subject: [PATCH 1/2] dialUri --- connection.go | 85 ++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 71 insertions(+), 14 deletions(-) diff --git a/connection.go b/connection.go index 252852e8..db86f7ce 100644 --- a/connection.go +++ b/connection.go @@ -214,11 +214,68 @@ func DialConfig(url string, config Config) (*Connection, error) { return Open(conn, config) } +// DialUri accepts a string in the AMQP URI format and returns a new Connection +// To resolve a password with special characters, such as #, the url.parseURI reports an error +// scheme. It is equivalent to calling Dial. +func DialUri(uri *URI) (*Connection, error) { + return DialUriConfig(uri, Config{ + Heartbeat: defaultHeartbeat, + Locale: defaultLocale, + }) +} + +func DialUriConfig(uri *URI, config Config) (*Connection, error) { + var err error + var conn net.Conn + + if config.SASL == nil { + config.SASL = []Authentication{uri.PlainAuth()} + } + + if config.Vhost == "" { + config.Vhost = uri.Vhost + } + + addr := net.JoinHostPort(uri.Host, strconv.FormatInt(int64(uri.Port), 10)) + + dialer := config.Dial + if dialer == nil { + dialer = DefaultDial(defaultConnectionTimeout) + } + + conn, err = dialer("tcp", addr) + if err != nil { + return nil, err + } + + if uri.Scheme == "amqps" { + if config.TLSClientConfig == nil { + config.TLSClientConfig = new(tls.Config) + } + + // If ServerName has not been specified in TLSClientConfig, + // set it to the URI host used for this connection. + if config.TLSClientConfig.ServerName == "" { + config.TLSClientConfig.ServerName = uri.Host + } + + client := tls.Client(conn, config.TLSClientConfig) + if err := client.Handshake(); err != nil { + + conn.Close() + return nil, err + } + + conn = client + } + + return Open(conn, config) +} + /* Open accepts an already established connection, or other io.ReadWriteCloser as a transport. Use this method if you have established a TLS connection or wish to use your own custom transport. - */ func Open(conn io.ReadWriteCloser, config Config) (*Connection, error) { c := &Connection{ @@ -267,7 +324,6 @@ On normal shutdowns, the chan will be closed. To reconnect after a transport or protocol error, register a listener here and re-run your setup process. - */ func (c *Connection) NotifyClose(receiver chan *Error) chan *Error { c.m.Lock() @@ -291,7 +347,6 @@ become free again. This optional extension is supported by the server when the "connection.blocked" server capability key is true. - */ func (c *Connection) NotifyBlocked(receiver chan Blocking) chan Blocking { c.m.Lock() @@ -652,7 +707,6 @@ func (c *Connection) closeChannel(ch *Channel, e *Error) { Channel opens a unique, concurrent server channel to process the bulk of AMQP messages. Any error from methods on this receiver will render the receiver invalid and a new Channel should be opened. - */ func (c *Connection) Channel() (*Channel, error) { return c.openChannel() @@ -690,16 +744,19 @@ func (c *Connection) call(req message, res ...message) error { // unreachable } -// Connection = open-Connection *use-Connection close-Connection -// open-Connection = C:protocol-header -// S:START C:START-OK -// *challenge -// S:TUNE C:TUNE-OK -// C:OPEN S:OPEN-OK -// challenge = S:SECURE C:SECURE-OK -// use-Connection = *channel -// close-Connection = C:CLOSE S:CLOSE-OK -// / S:CLOSE C:CLOSE-OK +// Connection = open-Connection *use-Connection close-Connection +// open-Connection = C:protocol-header +// +// S:START C:START-OK +// *challenge +// S:TUNE C:TUNE-OK +// C:OPEN S:OPEN-OK +// +// challenge = S:SECURE C:SECURE-OK +// use-Connection = *channel +// close-Connection = C:CLOSE S:CLOSE-OK +// +// / S:CLOSE C:CLOSE-OK func (c *Connection) open(config Config) error { if err := c.send(&protocolHeader{}); err != nil { return err From b52de2c2eb725e0062678573d56e0d69e7ba0014 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cxuefeng=E2=80=9D?= Date: Thu, 7 Sep 2023 10:48:17 +0800 Subject: [PATCH 2/2] merge request --- connection.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/connection.go b/connection.go index db86f7ce..d08fffef 100644 --- a/connection.go +++ b/connection.go @@ -224,6 +224,10 @@ func DialUri(uri *URI) (*Connection, error) { }) } +// DialUriConfig accepts a string in the AMQP URI format and a configuration for +// the transport and connection setup, returning a new Connection. Defaults to +// a server heartbeat interval of 10 seconds and sets the initial read deadline +// to 30 seconds. func DialUriConfig(uri *URI, config Config) (*Connection, error) { var err error var conn net.Conn