Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic interfaces #387

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

krzysztof-gzocha
Copy link

Short description

This PR aims to demonstrate the idea in issue #383 and to add interfaces for basic structures like amqp.Channel and amqp.Connection, so it developers that are using this library won't have to write them on their own when mocking channel or connection in tests.

What's inside

  • ChannelInterface, which consists of several other smaller interfaces like (NotifyChannel, TxChannel, ConsumeOnlyChannel, TxConsumeOnlyChannel, PublishOnlyChannel, TxPublishOnlyChannel and ServerConfigureChannel)
  • ConnectionInterface, which consists of one smaller interface NotifyConnection

Is it finished?

Nope, just sharing the idea to collect all the comments about naming or which methods should go to which interface.

Any other important struct is missing its interface?

@krzysztof-gzocha krzysztof-gzocha marked this pull request as ready for review February 21, 2019 16:54
@kpurdon
Copy link
Contributor

kpurdon commented Feb 21, 2019

Of note another project that wraps streadway/amqp has a pretty solid set of interfaces: https://github.com/NeowayLabs/wabbit/blob/master/amqp.go. Perhaps they could be of use. Generally ++ on prototyping this, it could be VERY useful.

@michaelklishin
Copy link
Collaborator

I like this. I think the biggest hurdle to adopting this is this client's philosophy of keeping the API backwards-compatible for consumers of master (see #312).

@streadway @gerhard any thoughts on at least adopting interfaces?

@streadway
Copy link
Owner

I think it's finally time to add some interfaces because it's a frequent enough request to reduce the typing needed to properly write testable producers/consumers.

wabbit does a pretty good job of wrapping this library already but we could also do something similar.

I can think of two ways of slicing the interfaces, one on the protocol message class, and another on the separation of concerns. I initially chose neither for this package to give the author that uses this package the freedom to shape their own integration. This PR slices the interfaces a little bit too coarse grained on both the message class and use case to hand write mocks for testings.

Also, when interface types include Interface in the name it's usually a sign we need a different package to scope these types. I'll put together another package with some interface combinations based on use cases like Publisher, Consumer for discussion.

@streadway
Copy link
Owner

Adding interfaces is a backwards compatible change, so #312 doesn't yet apply.

@krzysztof-gzocha
Copy link
Author

@streadway Any news about this topic? Need any help?

@streadway
Copy link
Owner

Without some more concrete examples, it's hard for me to measure whether these interfaces would be along the right boundaries. For example, putting all the notify methods in one interface doesn't seem like the right separation. Do you have some examples of how this change makes some of your application code simpler?

"Accept interfaces, return structs" is something that I've found to work well in Go and this change moves away from that.

Is this something that needs to be within the amqp package? If this is to support testing, would it make mores sense to add an amqptest package or if this is for generalizing implementations, would it fit in an amqpiface package?

As for the interface separation, I think it'd make more sense to define smaller interfaces so applications can compose them into the types they need. For example:

type Closer interface {
	Close() error
}

type Publisher interface {
	Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
}

type Consumer interface {
	Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
	Cancel(consumer string, noWait bool) error
}

type Acknowledger interface {
	Ack(tag uint64, multiple bool) error
	Nack(tag uint64, multiple bool, requeue bool) error
}

type Getter interface {
	Get(queue string, autoAck bool) (msg amqp.Delivery, ok bool, err error)
}

type NotifyPublisher interface {
	NotifyPublish(confirm chan amqp.Confirmation) chan amqp.Confirmation
}

type NotifyCanceller interface {
	NotifyCancel(c chan string) chan string
}

type NotifyCloser interface {
	NotifyClose(c chan *amqp.Error) chan *amqp.Error
}

type NotifyFlower interface {
	NotifyFlow(c chan bool) chan bool
}

type NotifyReturner interface {
	NotifyReturn(c chan amqp.Return) chan amqp.Return
}

type NotifyBlocker interface {
	NotifyBlocked(receiver chan amqp.Blocking) chan amqp.Blocking
}

type PublishConfirmer interface {
	Confirm(noWait bool) error
	// PublishedDeliveryTag() uint64
}

type ExchangeDeclarer interface {
	ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
}

type ExchangeBinder interface {
	ExchangeBind(destination, key, source string, noWait bool, args amqp.Table) error
}

type ExchangeDeleter interface {
	ExchangeDelete(name string, ifUnused, noWait bool) error
}

type ExchangeUnbinder interface {
	ExchangeUnbind(destination, key, source string, noWait bool, args amqp.Table) error
}

type QueueDeclarer interface {
	QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
}

type QueueBinder interface {
	QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error
}

type QueueDeleter interface {
	QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)
}

type QueuePurger interface {
	QueuePurge(name string, noWait bool) (int, error)
}

type QueueUnbinder interface {
	QueueUnbind(name, key, exchange string, args amqp.Table) error
}

type Transactioner interface {
	Tx() error
	TxCommit() error
	TxRollback() error
}

type Connection interface {
	Closer
	Channel() (*amqp.Channel, error)
}

type Channel interface {
	Closer
	Qos(prefetchCount, prefetchSize int, global bool) error
}

// A wide interface for those used to interfaces in inheritance based languages.
type Client interface {
	Channel
	ExchangeDeclarer
	ExchangeBinder
	QueueDeclarer
	QueueBinder
	Consumer
	Publisher
}

In a package main an application could then compose their AMQP client interface to only the methods that they need:

package main

type AMQPClient interface {
  amqpiface.QueueDeclarer
  amqpiface.QueueBinder
  amqpiface.Consumer
}

I think the right abstraction for interfaces would rather be the application specific adapter to the broker, rather than on the implementation of the adapter. Something like this:

type Broker interface {
  Broadcast(event BusinessEvent) error
}

func DoLogic(brokerDependency Broker, ...dependencies OtherAdapter) {
  // perform logic
  if err := brokerDependency.Broadcast(LogicPerformed{...}); err != nil {
    // fail/rollback transaction for the logic
  }
}

type rabbit struct {
  url string
  channel *amqp.Channel
}

func NewRabbit(url string) *rabbit { return &rabbit{ url: url } }

func (mq *rabbit) Broadcast(event BusinessEvent) error {
  // lazy reconnect channel
  if err := mq.channel.Publish(...); err != nil {
    return translateErrorToOurDomain(err)
  }
}

type mockBroker struct {
  messages []BusinessEvent
}

func (mq *mockBroker) Broadcast(msg BusinessEvent) {
  mq.messages = append(mq.messages, msg)
}

func (mq *mockBroker) assertBroadcasted(t *testing.T, expected ...BusinessEvent) {
  if want, got := len(expected), len(mq.messages); want < got {
    t.Errorf("expected to broadcast at least %d messages, only broadcasted %d", want, got)
  }
  for i := range expected {
    if want, got := expected[i], mq.broadcasted[i]; !reflect.DeepEqual(want, got) {
      t.Errorf("expected to broadcast %#v, but broadcasted %#v instead", want, got)
    }
  }
}

func TestDoLogic(t *testing.T) {
  broker := &mockBroker{}
  if err := DoLogic(broker, ...); err != nil {
    t.Fatal(err)
  }
  broker.assertBroadcasted(t, LogicPerformed{...})
}

Then I would exercise the adapter implementation through the normal integration/end-to-end tests, since that's where the most confidence would be gained because the adapter code depends on prior broker state to be correct.

WDYT?

@michaelklishin
Copy link
Collaborator

michaelklishin commented Mar 13, 2019

I'd combine all topology management (queue.declare, queue.delete, queue.bind, exchange.unbind and so on) into a single interface.

What is grouped into the Client interface above is Channel is most other clients (that follow the protocol, unfortunately, there are clients with their own made up terminology).

Having interfaces mostly built around the protocol shouldn't make it significantly harder to build application-specific interfaces on top. So I'd go with something like

type Closer interface {
	Close() error
}

type Publisher interface {
	Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
}

type Consumer interface {
	Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
	Cancel(consumer string, noWait bool) error
}

type Acknowledger interface {
	Ack(tag uint64, multiple bool) error
	Nack(tag uint64, multiple bool, requeue bool) error
        Qos(prefetchCount, prefetchSize int, global bool) error
}

type NotifyPublisher interface {
	NotifyPublish(confirm chan amqp.Confirmation) chan amqp.Confirmation
}

type NotifyCanceller interface {
	NotifyCancel(c chan string) chan string
}

type NotifyCloser interface {
	NotifyClose(c chan *amqp.Error) chan *amqp.Error
}

type NotifyFlower interface {
	NotifyFlow(c chan bool) chan bool
}

type NotifyReturner interface {
	NotifyReturn(c chan amqp.Return) chan amqp.Return
}

type NotifyBlocker interface {
	NotifyBlocked(receiver chan amqp.Blocking) chan amqp.Blocking
}

type PublishConfirmer interface {
	Confirm(noWait bool) error
	// PublishedDeliveryTag() uint64
}

type ExchangeDeclarer interface {
	ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
}

type ExchangeBinder interface {
	ExchangeBind(destination, key, source string, noWait bool, args amqp.Table) error
}

type ExchangeDeleter interface {
	ExchangeDelete(name string, ifUnused, noWait bool) error
}

type ExchangeUnbinder interface {
	ExchangeUnbind(destination, key, source string, noWait bool, args amqp.Table) error
}

type QueueDeclarer interface {
	QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
}

type QueueBinder interface {
	QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error
}

type QueueDeleter interface {
	QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)
}

type QueuePurger interface {
	QueuePurge(name string, noWait bool) (int, error)
}

type QueueUnbinder interface {
	QueueUnbind(name, key, exchange string, args amqp.Table) error
}

type Connection interface {
	Closer
	Channel() (*amqp.Channel, error)
}

type Transactions interface {
        Tx() error
	TxCommit() error
	TxRollback() error
}

type PollingConsumer interface {
        Get(queue string, autoAck bool) (msg amqp.Delivery, ok bool, err error)
}

type Channel interface {
       Publisher
       Consumer
       Acknowledger
       PublishConfirmer

	ExchangeDeclarer
	ExchangeBinder
	QueueDeclarer
	QueueBinder

        PollingConsumer

	Closer
}

I can see how fine-grained interfaces around notifications would be useful, and possibly publishing/consumption but not most other channel operations.

@michaelklishin
Copy link
Collaborator

@krzysztof-gzocha should we update this PR to use the above interfaces? I don't mean to imply that my suggestion is superior but we have to settle on something. Interface design is a matter of opinion :) I'd like to keep the momentum in this PR while @streadway has some cycles to dedicate to this client ❤️

@streadway
Copy link
Owner

It's easier to add methods to interfaces, than it is to remove them. The complete method list as @michaelklishin proposes does not seem to fit the strengths of the Go type system where interface composition is simple.

It doesn't make sense for me to share a type for distinctly different purposes like Publishing and Consuming, or even ExchangeDeclare as part of application lifecycle and ExchangeDelete as part of topology administration.

This is where it'd be good to get an answer to the question, ideally with an example to "what is the purpose of these package interfaces". In #383 it's the smallest set of interfaces for mocking, and I don't see how a large Channel interface would help ease the effort in mocking compared to an application defined interface definition.

@streadway
Copy link
Owner

#389 should be merged before this because it will add a new set of context based methods.

@krzysztof-gzocha
Copy link
Author

I just find small interfaces like Consumer, Publisher and Acknowledger useful for my use case (mocking and testing) and I don't really see an issue with using smaller interfaces to construct something bigger like Channel interface.

I would happily implement interfaces as suggested by @michaelklishin.

About #389: yes, it would be helpful to first merge every PR that is changing methods definition.

@streadway
Copy link
Owner

My concern is that every symbol exported must have a reason to support it, and I don't yet see a reason to export the large type Channel interface.

@krzysztof-gzocha
Copy link
Author

krzysztof-gzocha commented Mar 15, 2019

Channel struct just have many responsibilities and you could simply do a lot with it. We are already returning large Channel struct, so returning evenly large interface would not bother me that much.

It would work even without big Channel interface, right? We would have some smaller interfaces that we could use for tests and mocks. I don't know any use-case for Channel interface, but I don't see a reason why to skip it.

@streadway
Copy link
Owner

Channel struct just have many responsibilities and you could simply do a lot with it. We are already returning large Channel struct, so returning evenly large interface would not bother me that much.

I guess this sums up how I think about structs vs interfaces. Structs describe what something can do, and Interfaces describe what part of that is used. This is why "accept interfaces, return structs" makes sense - you declare what you intend to use with interfaces, and you satisfy that with structs.

Interfaces in Go don't describe what something can do, which is why it makes more sense to declare interfaces in the consuming packages rather than the providing packages.

The one example of returning interfaces I can think of is for factories that can vary its implementation based on its input like Dial("udp", ...) vs Dial("tcp", ...).

@krzysztof-gzocha
Copy link
Author

So in your opinion it would be better to not add any interface at all and make developers implement them on their own each time they need them? How about just the small interfaces?

@streadway
Copy link
Owner

I think the smaller interfaces, with an example of how they are composed (like the AMQPClient application type above) would be valuable.

This sounds like it's a step towards the goal of saving time when writing tests that use this client.

@streadway
Copy link
Owner

This post also applies some of the SOLID design principles to Go, including Interface Segregation: https://dave.cheney.net/2016/08/20/solid-go-design

Clients should not be forced to depend on methods they do not use.
–Robert C. Martin

@michaelklishin
Copy link
Collaborator

I discussed this with @streadway on a Hangout and we settled on something like this (@streadway, feel free to edit some more):

type Closer interface {
	Close() error
}

type Publisher interface {
	Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
}

type Consumer interface {
	Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
	Cancel(consumer string, noWait bool) error
}

type Acknowledger interface {
	Ack(tag uint64, multiple bool) error
	Nack(tag uint64, multiple bool, requeue bool) error
        Qos(prefetchCount, prefetchSize int, global bool) error
}

type NotifyPublisher interface {
	NotifyPublish(confirm chan amqp.Confirmation) chan amqp.Confirmation
}

type NotifyCanceller interface {
	NotifyCancel(c chan string) chan string
}

type NotifyCloser interface {
	NotifyClose(c chan *amqp.Error) chan *amqp.Error
}

type NotifyFlower interface {
	NotifyFlow(c chan bool) chan bool
}

type NotifyReturner interface {
	NotifyReturn(c chan amqp.Return) chan amqp.Return
}

type NotifyBlocker interface {
	NotifyBlocked(receiver chan amqp.Blocking) chan amqp.Blocking
}

type PublishConfirmer interface {
	Confirm(noWait bool) error
	// PublishedDeliveryTag() uint64
}

type ExchangeDeclarer interface {
	ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
}

type ExchangeBinder interface {
	ExchangeBind(destination, key, source string, noWait bool, args amqp.Table) error
}

type ExchangeDeleter interface {
	ExchangeDelete(name string, ifUnused, noWait bool) error
}

type ExchangeUnbinder interface {
	ExchangeUnbind(destination, key, source string, noWait bool, args amqp.Table) error
}

type QueueDeclarer interface {
	QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
}

type QueueBinder interface {
	QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error
}

type QueueDeleter interface {
	QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)
}

type QueuePurger interface {
	QueuePurge(name string, noWait bool) (int, error)
}

type QueueUnbinder interface {
	QueueUnbind(name, key, exchange string, args amqp.Table) error
}

type Connection interface {
	Closer
	Channel() (*amqp.Channel, error)
}

type Transactions interface {
        Tx() error
	TxCommit() error
	TxRollback() error
}

type PollingConsumer interface {
        Get(queue string, autoAck bool) (msg amqp.Delivery, ok bool, err error)
}

type Channel interface {
       Publisher
       Consumer
       Acknowledger
       PublishConfirmer

	ExchangeDeclarer
	ExchangeBinder
	QueueDeclarer
	QueueBinder

        PollingConsumer

	Closer
}

And it doesn't have to wait for #389. @krzysztof-gzocha WDYT?

@krzysztof-gzocha
Copy link
Author

krzysztof-gzocha commented Mar 15, 2019

I like it. Good job! Will soon try to implement this in this PR.

@krzysztof-gzocha
Copy link
Author

@michaelklishin
Few comments:

  • If we would like to use main amqp package, then we would need to rename Channel and Connection structs (which we can't because it's breaking change) or change the name of the Channel and Connection interfaces. You guys don't like the idea of ChannelInterface so maybe you have any other ideas? ReadWriteChannel, ChannelManager, anything?
  • If we would like to use new interfaces package then we might want to move Acknowledger and Authentication interfaces to it as well (which would be again a breaking change).
  • How about to change Connection interface from Channel() (*amqp.Channel, error) into Channel() (ChannelInterface, error)?

@streadway
Copy link
Owner

If we would like to use main amqp package, then we would need to rename Channel and Connection structs (which we can't because it's breaking change) or change the name of the Channel and Connection interfaces. You guys don't like the idea of ChannelInterface so maybe you have any other ideas? ReadWriteChannel, ChannelManager, anything?

Let's use github.com/streadway/amqp/amqpiface instead of amqp and only add the minimal interfaces you need, we can grow that package example by example.

If we would like to use new interfaces package then we might want to move Acknowledger and Authentication interfaces to it as well (which would be again a breaking change).

These interfaces are for Liskov substitution (input parameters with implementations provided outside the library) rather than interface separation (input parameters for the application) so they should remain in amqp.

How about to change Connection interface from Channel() (*amqp.Channel, error) into Channel() (ChannelInterface, error)?

Let's answer #387 (comment) with some examples before discussing changing the Channel() method.

I still would like to understand the interest to mock infrastructure and I value examples to understand better. I can imagine mocking your application's interfaces, but I can't example where real confidence is gained by mocking this library as opposed to using an integration test against a real server. It's kind of like providing a mock MySQL adapter rather than providing a mock to your Repository pattern. From my experience and advice from others it's usually an application level abstraction missing that makes testing obvious rather than incompletely mocking/simulating the infrastructure.

@krzysztof-gzocha
Copy link
Author

Thank you. I will start with those smaller interfaces.
That actually good arguments about Channel() method 👍
Let me think about it

@michaelklishin
Copy link
Collaborator

May I suggest amqp/interfaces instead of amqp/amqpiface?

@streadway
Copy link
Owner

streadway commented Mar 19, 2019

May I suggest amqp/interfaces instead of amqp/amqpiface?

In Go, imported package names are required to be referenced.

When using interfaces as the package name, the Channel interface would be referenced with interfaces.Channel. Using interfaces doesn't carry any information because Channel is already an interface and interfaces doesn't refine the meaning of Channel.

From an application point of view, it's possible to rename package names during import, so when using github.com/streadway/amqp/interfaces you'd likely want to rename the import anyway to something that makes the symbols obvious like import amqpi "github.com/streadway/amqp/interfaces to be able to tell an amqp.Channel apart from an amqpi.Channel.

Let's not use generic package names like interfaces and follow the stdlib like http and httptest or the AWS client like aws and awsiface. If anything I'd be up for amqp and amqpi.

@michaelklishin
Copy link
Collaborator

@streadway makes sense. amqpiface is a reasonable choice then.

@krzysztof-gzocha
Copy link
Author

@michaelklishin 2nd version is ready for CR

"connection.blocked" server capability key is true.

*/
NotifyBlocked(receiver chan amqp.Blocking) chan amqp.Blocking
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FTR, a single channel of notifications is used for both connection.blocked and connection.unblocked. See #75 for details.

@michaelklishin
Copy link
Collaborator

Looks like this is blocked by the revert of #397 due to #398.

@krzysztof-gzocha
Copy link
Author

@michaelklishin Any progress?

@andytsnowden
Copy link

Is this still being worked on? I use this library heavily and would love making unit test easier.

@kheraud
Copy link

kheraud commented Aug 10, 2020

+1

@bombsimon bombsimon mentioned this pull request Sep 1, 2020
@michaelklishin
Copy link
Collaborator

Hey folks,

I'm posting this on behalf of the core team.

As you have noticed, this client hasn't seen a lot of activity recently.
Many users are unhappy about that and we fully recognize that it's a popular
library that should be maintained more actively. There are also many community
members who have contributed pull requests and haven't been merged for various reasons.

Because this client has a long tradition of "no breaking public API changes", certain
reasonable changes will likely never be accepted. This is frustrating to those who
have put in their time and effort into trying to improve this library.

We would like to thank @streadway
for developing this client and maintaining it for a decade — that's a remarkable contribution
to the RabbitMQ ecosystem. We this now is a good time to get more contributors
involved.

Team RabbitMQ has adopted a "hard fork" of this client
in order to give the community a place to evolve the API. Several RabbitMQ core team members
will participate but we think it very much should be a community-driven effort.

What do we mean by "hard fork" and what does it mean for you? The entire history of the project
is retained in the new repository but it is not a GitHub fork by design. The license remains the same
2-clause BSD. The contribution process won't change much (except that we hope to review and accept PRs
reasonably quickly).

What does change is that this new fork will accept reasonable breaking API changes according
to Semantic Versioning (or at least our understanding of it). At the moment the API is identical
to that of streadway/amqp but the package name is different. We will begin reviewing PRs
and merging them if they make sense in the upcoming weeks.

If your PR hasn't been accepted or reviewed, you are welcome to re-submit it for rabbitmq/amqp091-go.
RabbitMQ core team members will evaluate the PRs currently open for streadway/amqp as time allows,
and pull those that don't have any conflicts. We cannot promise that every PR would be accepted
but at least we are open to changing the API going forward.

Note that it is a high season for holidays in some parts of the world, so we may be slower
to respond in the next few weeks but otherwise, we are eager to review as many currently open PRs
as practically possible soon.

Thank you for using RabbitMQ and contributing to this client. On behalf of the RabbitMQ core team,
@ChunyiLyu and @michaelklishin.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants